blob: 842a4ff610ccad2e93a6db1e9be21164933bb863 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070050import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070051import zuul.merger.merger
52import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070053import zuul.reporter.gerrit
54import zuul.reporter.smtp
55import zuul.trigger.gerrit
56import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070057import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070058
59FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
60 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070061USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070062
63logging.basicConfig(level=logging.DEBUG,
64 format='%(asctime)s %(name)-32s '
65 '%(levelname)-8s %(message)s')
66
67
68def repack_repo(path):
69 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
70 output = subprocess.Popen(cmd, close_fds=True,
71 stdout=subprocess.PIPE,
72 stderr=subprocess.PIPE)
73 out = output.communicate()
74 if output.returncode:
75 raise Exception("git repack returned %d" % output.returncode)
76 return out
77
78
79def random_sha1():
80 return hashlib.sha1(str(random.random())).hexdigest()
81
82
James E. Blaira190f3b2015-01-05 14:56:54 -080083def iterate_timeout(max_seconds, purpose):
84 start = time.time()
85 count = 0
86 while (time.time() < start + max_seconds):
87 count += 1
88 yield count
89 time.sleep(0)
90 raise Exception("Timeout waiting for %s" % purpose)
91
92
Clark Boylanb640e052014-04-03 16:41:46 -070093class ChangeReference(git.Reference):
94 _common_path_default = "refs/changes"
95 _points_to_commits_only = True
96
97
98class FakeChange(object):
99 categories = {'APRV': ('Approved', -1, 1),
100 'CRVW': ('Code-Review', -2, 2),
101 'VRFY': ('Verified', -2, 2)}
102
103 def __init__(self, gerrit, number, project, branch, subject,
104 status='NEW', upstream_root=None):
105 self.gerrit = gerrit
106 self.reported = 0
107 self.queried = 0
108 self.patchsets = []
109 self.number = number
110 self.project = project
111 self.branch = branch
112 self.subject = subject
113 self.latest_patchset = 0
114 self.depends_on_change = None
115 self.needed_by_changes = []
116 self.fail_merge = False
117 self.messages = []
118 self.data = {
119 'branch': branch,
120 'comments': [],
121 'commitMessage': subject,
122 'createdOn': time.time(),
123 'id': 'I' + random_sha1(),
124 'lastUpdated': time.time(),
125 'number': str(number),
126 'open': status == 'NEW',
127 'owner': {'email': 'user@example.com',
128 'name': 'User Name',
129 'username': 'username'},
130 'patchSets': self.patchsets,
131 'project': project,
132 'status': status,
133 'subject': subject,
134 'submitRecords': [],
135 'url': 'https://hostname/%s' % number}
136
137 self.upstream_root = upstream_root
138 self.addPatchset()
139 self.data['submitRecords'] = self.getSubmitRecords()
140 self.open = status == 'NEW'
141
142 def add_fake_change_to_repo(self, msg, fn, large):
143 path = os.path.join(self.upstream_root, self.project)
144 repo = git.Repo(path)
145 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
146 self.latest_patchset),
147 'refs/tags/init')
148 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700149 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700150 repo.git.clean('-x', '-f', '-d')
151
152 path = os.path.join(self.upstream_root, self.project)
153 if not large:
154 fn = os.path.join(path, fn)
155 f = open(fn, 'w')
156 f.write("test %s %s %s\n" %
157 (self.branch, self.number, self.latest_patchset))
158 f.close()
159 repo.index.add([fn])
160 else:
161 for fni in range(100):
162 fn = os.path.join(path, str(fni))
163 f = open(fn, 'w')
164 for ci in range(4096):
165 f.write(random.choice(string.printable))
166 f.close()
167 repo.index.add([fn])
168
169 r = repo.index.commit(msg)
170 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700171 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700172 repo.git.clean('-x', '-f', '-d')
173 repo.heads['master'].checkout()
174 return r
175
176 def addPatchset(self, files=[], large=False):
177 self.latest_patchset += 1
178 if files:
179 fn = files[0]
180 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700181 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700182 msg = self.subject + '-' + str(self.latest_patchset)
183 c = self.add_fake_change_to_repo(msg, fn, large)
184 ps_files = [{'file': '/COMMIT_MSG',
185 'type': 'ADDED'},
186 {'file': 'README',
187 'type': 'MODIFIED'}]
188 for f in files:
189 ps_files.append({'file': f, 'type': 'ADDED'})
190 d = {'approvals': [],
191 'createdOn': time.time(),
192 'files': ps_files,
193 'number': str(self.latest_patchset),
194 'ref': 'refs/changes/1/%s/%s' % (self.number,
195 self.latest_patchset),
196 'revision': c.hexsha,
197 'uploader': {'email': 'user@example.com',
198 'name': 'User name',
199 'username': 'user'}}
200 self.data['currentPatchSet'] = d
201 self.patchsets.append(d)
202 self.data['submitRecords'] = self.getSubmitRecords()
203
204 def getPatchsetCreatedEvent(self, patchset):
205 event = {"type": "patchset-created",
206 "change": {"project": self.project,
207 "branch": self.branch,
208 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
209 "number": str(self.number),
210 "subject": self.subject,
211 "owner": {"name": "User Name"},
212 "url": "https://hostname/3"},
213 "patchSet": self.patchsets[patchset - 1],
214 "uploader": {"name": "User Name"}}
215 return event
216
217 def getChangeRestoredEvent(self):
218 event = {"type": "change-restored",
219 "change": {"project": self.project,
220 "branch": self.branch,
221 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
222 "number": str(self.number),
223 "subject": self.subject,
224 "owner": {"name": "User Name"},
225 "url": "https://hostname/3"},
226 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100227 "patchSet": self.patchsets[-1],
228 "reason": ""}
229 return event
230
231 def getChangeAbandonedEvent(self):
232 event = {"type": "change-abandoned",
233 "change": {"project": self.project,
234 "branch": self.branch,
235 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
236 "number": str(self.number),
237 "subject": self.subject,
238 "owner": {"name": "User Name"},
239 "url": "https://hostname/3"},
240 "abandoner": {"name": "User Name"},
241 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700242 "reason": ""}
243 return event
244
245 def getChangeCommentEvent(self, patchset):
246 event = {"type": "comment-added",
247 "change": {"project": self.project,
248 "branch": self.branch,
249 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
250 "number": str(self.number),
251 "subject": self.subject,
252 "owner": {"name": "User Name"},
253 "url": "https://hostname/3"},
254 "patchSet": self.patchsets[patchset - 1],
255 "author": {"name": "User Name"},
256 "approvals": [{"type": "Code-Review",
257 "description": "Code-Review",
258 "value": "0"}],
259 "comment": "This is a comment"}
260 return event
261
262 def addApproval(self, category, value, username='jenkins',
263 granted_on=None):
264 if not granted_on:
265 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000266 approval = {
267 'description': self.categories[category][0],
268 'type': category,
269 'value': str(value),
270 'by': {
271 'username': username,
272 'email': username + '@example.com',
273 },
274 'grantedOn': int(granted_on)
275 }
Clark Boylanb640e052014-04-03 16:41:46 -0700276 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
277 if x['by']['username'] == username and x['type'] == category:
278 del self.patchsets[-1]['approvals'][i]
279 self.patchsets[-1]['approvals'].append(approval)
280 event = {'approvals': [approval],
281 'author': {'email': 'user@example.com',
282 'name': 'User Name',
283 'username': 'username'},
284 'change': {'branch': self.branch,
285 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
286 'number': str(self.number),
287 'owner': {'email': 'user@example.com',
288 'name': 'User Name',
289 'username': 'username'},
290 'project': self.project,
291 'subject': self.subject,
292 'topic': 'master',
293 'url': 'https://hostname/459'},
294 'comment': '',
295 'patchSet': self.patchsets[-1],
296 'type': 'comment-added'}
297 self.data['submitRecords'] = self.getSubmitRecords()
298 return json.loads(json.dumps(event))
299
300 def getSubmitRecords(self):
301 status = {}
302 for cat in self.categories.keys():
303 status[cat] = 0
304
305 for a in self.patchsets[-1]['approvals']:
306 cur = status[a['type']]
307 cat_min, cat_max = self.categories[a['type']][1:]
308 new = int(a['value'])
309 if new == cat_min:
310 cur = new
311 elif abs(new) > abs(cur):
312 cur = new
313 status[a['type']] = cur
314
315 labels = []
316 ok = True
317 for typ, cat in self.categories.items():
318 cur = status[typ]
319 cat_min, cat_max = cat[1:]
320 if cur == cat_min:
321 value = 'REJECT'
322 ok = False
323 elif cur == cat_max:
324 value = 'OK'
325 else:
326 value = 'NEED'
327 ok = False
328 labels.append({'label': cat[0], 'status': value})
329 if ok:
330 return [{'status': 'OK'}]
331 return [{'status': 'NOT_READY',
332 'labels': labels}]
333
334 def setDependsOn(self, other, patchset):
335 self.depends_on_change = other
336 d = {'id': other.data['id'],
337 'number': other.data['number'],
338 'ref': other.patchsets[patchset - 1]['ref']
339 }
340 self.data['dependsOn'] = [d]
341
342 other.needed_by_changes.append(self)
343 needed = other.data.get('neededBy', [])
344 d = {'id': self.data['id'],
345 'number': self.data['number'],
346 'ref': self.patchsets[patchset - 1]['ref'],
347 'revision': self.patchsets[patchset - 1]['revision']
348 }
349 needed.append(d)
350 other.data['neededBy'] = needed
351
352 def query(self):
353 self.queried += 1
354 d = self.data.get('dependsOn')
355 if d:
356 d = d[0]
357 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
358 d['isCurrentPatchSet'] = True
359 else:
360 d['isCurrentPatchSet'] = False
361 return json.loads(json.dumps(self.data))
362
363 def setMerged(self):
364 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000365 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700366 return
367 if self.fail_merge:
368 return
369 self.data['status'] = 'MERGED'
370 self.open = False
371
372 path = os.path.join(self.upstream_root, self.project)
373 repo = git.Repo(path)
374 repo.heads[self.branch].commit = \
375 repo.commit(self.patchsets[-1]['revision'])
376
377 def setReported(self):
378 self.reported += 1
379
380
381class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700382 log = logging.getLogger("zuul.test.FakeGerrit")
383
Clark Boylanb640e052014-04-03 16:41:46 -0700384 def __init__(self, *args, **kw):
385 self.event_queue = Queue.Queue()
386 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
387 self.change_number = 0
388 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700389 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700390
391 def addFakeChange(self, project, branch, subject, status='NEW'):
392 self.change_number += 1
393 c = FakeChange(self, self.change_number, project, branch, subject,
394 upstream_root=self.upstream_root,
395 status=status)
396 self.changes[self.change_number] = c
397 return c
398
399 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700400 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700401
402 def getEvent(self):
403 return self.event_queue.get()
404
405 def eventDone(self):
406 self.event_queue.task_done()
407
408 def review(self, project, changeid, message, action):
409 number, ps = changeid.split(',')
410 change = self.changes[int(number)]
411 change.messages.append(message)
412 if 'submit' in action:
413 change.setMerged()
414 if message:
415 change.setReported()
416
417 def query(self, number):
418 change = self.changes.get(int(number))
419 if change:
420 return change.query()
421 return {}
422
James E. Blairc494d542014-08-06 09:23:52 -0700423 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700424 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700425 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800426 if query.startswith('change:'):
427 # Query a specific changeid
428 changeid = query[len('change:'):]
429 l = [change.query() for change in self.changes.values()
430 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700431 elif query.startswith('message:'):
432 # Query the content of a commit message
433 msg = query[len('message:'):].strip()
434 l = [change.query() for change in self.changes.values()
435 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800436 else:
437 # Query all open changes
438 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700439 return l
James E. Blairc494d542014-08-06 09:23:52 -0700440
Clark Boylanb640e052014-04-03 16:41:46 -0700441 def startWatching(self, *args, **kw):
442 pass
443
444
445class BuildHistory(object):
446 def __init__(self, **kw):
447 self.__dict__.update(kw)
448
449 def __repr__(self):
450 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
451 (self.result, self.name, self.number, self.changes))
452
453
454class FakeURLOpener(object):
455 def __init__(self, upstream_root, fake_gerrit, url):
456 self.upstream_root = upstream_root
457 self.fake_gerrit = fake_gerrit
458 self.url = url
459
460 def read(self):
461 res = urlparse.urlparse(self.url)
462 path = res.path
463 project = '/'.join(path.split('/')[2:-2])
464 ret = '001e# service=git-upload-pack\n'
465 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
466 'multi_ack thin-pack side-band side-band-64k ofs-delta '
467 'shallow no-progress include-tag multi_ack_detailed no-done\n')
468 path = os.path.join(self.upstream_root, project)
469 repo = git.Repo(path)
470 for ref in repo.refs:
471 r = ref.object.hexsha + ' ' + ref.path + '\n'
472 ret += '%04x%s' % (len(r) + 4, r)
473 ret += '0000'
474 return ret
475
476
477class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
478 name = 'gerrit'
479
480 def __init__(self, upstream_root, *args):
481 super(FakeGerritTrigger, self).__init__(*args)
482 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700483 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700484
485 def getGitUrl(self, project):
486 return os.path.join(self.upstream_root, project.name)
487
488
489class FakeStatsd(threading.Thread):
490 def __init__(self):
491 threading.Thread.__init__(self)
492 self.daemon = True
493 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
494 self.sock.bind(('', 0))
495 self.port = self.sock.getsockname()[1]
496 self.wake_read, self.wake_write = os.pipe()
497 self.stats = []
498
499 def run(self):
500 while True:
501 poll = select.poll()
502 poll.register(self.sock, select.POLLIN)
503 poll.register(self.wake_read, select.POLLIN)
504 ret = poll.poll()
505 for (fd, event) in ret:
506 if fd == self.sock.fileno():
507 data = self.sock.recvfrom(1024)
508 if not data:
509 return
510 self.stats.append(data[0])
511 if fd == self.wake_read:
512 return
513
514 def stop(self):
515 os.write(self.wake_write, '1\n')
516
517
518class FakeBuild(threading.Thread):
519 log = logging.getLogger("zuul.test")
520
521 def __init__(self, worker, job, number, node):
522 threading.Thread.__init__(self)
523 self.daemon = True
524 self.worker = worker
525 self.job = job
526 self.name = job.name.split(':')[1]
527 self.number = number
528 self.node = node
529 self.parameters = json.loads(job.arguments)
530 self.unique = self.parameters['ZUUL_UUID']
531 self.wait_condition = threading.Condition()
532 self.waiting = False
533 self.aborted = False
534 self.created = time.time()
535 self.description = ''
536 self.run_error = False
537
538 def release(self):
539 self.wait_condition.acquire()
540 self.wait_condition.notify()
541 self.waiting = False
542 self.log.debug("Build %s released" % self.unique)
543 self.wait_condition.release()
544
545 def isWaiting(self):
546 self.wait_condition.acquire()
547 if self.waiting:
548 ret = True
549 else:
550 ret = False
551 self.wait_condition.release()
552 return ret
553
554 def _wait(self):
555 self.wait_condition.acquire()
556 self.waiting = True
557 self.log.debug("Build %s waiting" % self.unique)
558 self.wait_condition.wait()
559 self.wait_condition.release()
560
561 def run(self):
562 data = {
563 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
564 'name': self.name,
565 'number': self.number,
566 'manager': self.worker.worker_id,
567 'worker_name': 'My Worker',
568 'worker_hostname': 'localhost',
569 'worker_ips': ['127.0.0.1', '192.168.1.1'],
570 'worker_fqdn': 'zuul.example.org',
571 'worker_program': 'FakeBuilder',
572 'worker_version': 'v1.1',
573 'worker_extra': {'something': 'else'}
574 }
575
576 self.log.debug('Running build %s' % self.unique)
577
578 self.job.sendWorkData(json.dumps(data))
579 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
580 self.job.sendWorkStatus(0, 100)
581
582 if self.worker.hold_jobs_in_build:
583 self.log.debug('Holding build %s' % self.unique)
584 self._wait()
585 self.log.debug("Build %s continuing" % self.unique)
586
587 self.worker.lock.acquire()
588
589 result = 'SUCCESS'
590 if (('ZUUL_REF' in self.parameters) and
591 self.worker.shouldFailTest(self.name,
592 self.parameters['ZUUL_REF'])):
593 result = 'FAILURE'
594 if self.aborted:
595 result = 'ABORTED'
596
597 if self.run_error:
598 work_fail = True
599 result = 'RUN_ERROR'
600 else:
601 data['result'] = result
602 work_fail = False
603
604 changes = None
605 if 'ZUUL_CHANGE_IDS' in self.parameters:
606 changes = self.parameters['ZUUL_CHANGE_IDS']
607
608 self.worker.build_history.append(
609 BuildHistory(name=self.name, number=self.number,
610 result=result, changes=changes, node=self.node,
611 uuid=self.unique, description=self.description,
612 pipeline=self.parameters['ZUUL_PIPELINE'])
613 )
614
615 self.job.sendWorkData(json.dumps(data))
616 if work_fail:
617 self.job.sendWorkFail()
618 else:
619 self.job.sendWorkComplete(json.dumps(data))
620 del self.worker.gearman_jobs[self.job.unique]
621 self.worker.running_builds.remove(self)
622 self.worker.lock.release()
623
624
625class FakeWorker(gear.Worker):
626 def __init__(self, worker_id, test):
627 super(FakeWorker, self).__init__(worker_id)
628 self.gearman_jobs = {}
629 self.build_history = []
630 self.running_builds = []
631 self.build_counter = 0
632 self.fail_tests = {}
633 self.test = test
634
635 self.hold_jobs_in_build = False
636 self.lock = threading.Lock()
637 self.__work_thread = threading.Thread(target=self.work)
638 self.__work_thread.daemon = True
639 self.__work_thread.start()
640
641 def handleJob(self, job):
642 parts = job.name.split(":")
643 cmd = parts[0]
644 name = parts[1]
645 if len(parts) > 2:
646 node = parts[2]
647 else:
648 node = None
649 if cmd == 'build':
650 self.handleBuild(job, name, node)
651 elif cmd == 'stop':
652 self.handleStop(job, name)
653 elif cmd == 'set_description':
654 self.handleSetDescription(job, name)
655
656 def handleBuild(self, job, name, node):
657 build = FakeBuild(self, job, self.build_counter, node)
658 job.build = build
659 self.gearman_jobs[job.unique] = job
660 self.build_counter += 1
661
662 self.running_builds.append(build)
663 build.start()
664
665 def handleStop(self, job, name):
666 self.log.debug("handle stop")
667 parameters = json.loads(job.arguments)
668 name = parameters['name']
669 number = parameters['number']
670 for build in self.running_builds:
671 if build.name == name and build.number == number:
672 build.aborted = True
673 build.release()
674 job.sendWorkComplete()
675 return
676 job.sendWorkFail()
677
678 def handleSetDescription(self, job, name):
679 self.log.debug("handle set description")
680 parameters = json.loads(job.arguments)
681 name = parameters['name']
682 number = parameters['number']
683 descr = parameters['html_description']
684 for build in self.running_builds:
685 if build.name == name and build.number == number:
686 build.description = descr
687 job.sendWorkComplete()
688 return
689 for build in self.build_history:
690 if build.name == name and build.number == number:
691 build.description = descr
692 job.sendWorkComplete()
693 return
694 job.sendWorkFail()
695
696 def work(self):
697 while self.running:
698 try:
699 job = self.getJob()
700 except gear.InterruptedError:
701 continue
702 try:
703 self.handleJob(job)
704 except:
705 self.log.exception("Worker exception:")
706
707 def addFailTest(self, name, change):
708 l = self.fail_tests.get(name, [])
709 l.append(change)
710 self.fail_tests[name] = l
711
712 def shouldFailTest(self, name, ref):
713 l = self.fail_tests.get(name, [])
714 for change in l:
715 if self.test.ref_has_change(ref, change):
716 return True
717 return False
718
719 def release(self, regex=None):
720 builds = self.running_builds[:]
721 self.log.debug("releasing build %s (%s)" % (regex,
722 len(self.running_builds)))
723 for build in builds:
724 if not regex or re.match(regex, build.name):
725 self.log.debug("releasing build %s" %
726 (build.parameters['ZUUL_UUID']))
727 build.release()
728 else:
729 self.log.debug("not releasing build %s" %
730 (build.parameters['ZUUL_UUID']))
731 self.log.debug("done releasing builds %s (%s)" %
732 (regex, len(self.running_builds)))
733
734
735class FakeGearmanServer(gear.Server):
736 def __init__(self):
737 self.hold_jobs_in_queue = False
738 super(FakeGearmanServer, self).__init__(0)
739
740 def getJobForConnection(self, connection, peek=False):
741 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
742 for job in queue:
743 if not hasattr(job, 'waiting'):
744 if job.name.startswith('build:'):
745 job.waiting = self.hold_jobs_in_queue
746 else:
747 job.waiting = False
748 if job.waiting:
749 continue
750 if job.name in connection.functions:
751 if not peek:
752 queue.remove(job)
753 connection.related_jobs[job.handle] = job
754 job.worker_connection = connection
755 job.running = True
756 return job
757 return None
758
759 def release(self, regex=None):
760 released = False
761 qlen = (len(self.high_queue) + len(self.normal_queue) +
762 len(self.low_queue))
763 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
764 for job in self.getQueue():
765 cmd, name = job.name.split(':')
766 if cmd != 'build':
767 continue
768 if not regex or re.match(regex, name):
769 self.log.debug("releasing queued job %s" %
770 job.unique)
771 job.waiting = False
772 released = True
773 else:
774 self.log.debug("not releasing queued job %s" %
775 job.unique)
776 if released:
777 self.wakeConnections()
778 qlen = (len(self.high_queue) + len(self.normal_queue) +
779 len(self.low_queue))
780 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
781
782
783class FakeSMTP(object):
784 log = logging.getLogger('zuul.FakeSMTP')
785
786 def __init__(self, messages, server, port):
787 self.server = server
788 self.port = port
789 self.messages = messages
790
791 def sendmail(self, from_email, to_email, msg):
792 self.log.info("Sending email from %s, to %s, with msg %s" % (
793 from_email, to_email, msg))
794
795 headers = msg.split('\n\n', 1)[0]
796 body = msg.split('\n\n', 1)[1]
797
798 self.messages.append(dict(
799 from_email=from_email,
800 to_email=to_email,
801 msg=msg,
802 headers=headers,
803 body=body,
804 ))
805
806 return True
807
808 def quit(self):
809 return True
810
811
812class FakeSwiftClientConnection(swiftclient.client.Connection):
813 def post_account(self, headers):
814 # Do nothing
815 pass
816
817 def get_auth(self):
818 # Returns endpoint and (unused) auth token
819 endpoint = os.path.join('https://storage.example.org', 'V1',
820 'AUTH_account')
821 return endpoint, ''
822
823
Maru Newby3fe5f852015-01-13 04:22:14 +0000824class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700825 log = logging.getLogger("zuul.test")
826
827 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000828 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700829 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
830 try:
831 test_timeout = int(test_timeout)
832 except ValueError:
833 # If timeout value is invalid do not set a timeout.
834 test_timeout = 0
835 if test_timeout > 0:
836 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
837
838 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
839 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
840 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
841 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
842 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
843 os.environ.get('OS_STDERR_CAPTURE') == '1'):
844 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
845 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
846 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
847 os.environ.get('OS_LOG_CAPTURE') == '1'):
848 self.useFixture(fixtures.FakeLogger(
849 level=logging.DEBUG,
850 format='%(asctime)s %(name)-32s '
851 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000852
853
854class ZuulTestCase(BaseTestCase):
855
856 def setUp(self):
857 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700858 if USE_TEMPDIR:
859 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000860 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
861 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700862 else:
863 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700864 self.test_root = os.path.join(tmp_root, "zuul-test")
865 self.upstream_root = os.path.join(self.test_root, "upstream")
866 self.git_root = os.path.join(self.test_root, "git")
867
868 if os.path.exists(self.test_root):
869 shutil.rmtree(self.test_root)
870 os.makedirs(self.test_root)
871 os.makedirs(self.upstream_root)
872 os.makedirs(self.git_root)
873
874 # Make per test copy of Configuration.
875 self.setup_config()
876 self.config.set('zuul', 'layout_config',
877 os.path.join(FIXTURE_DIR, "layout.yaml"))
878 self.config.set('merger', 'git_dir', self.git_root)
879
880 # For each project in config:
881 self.init_repo("org/project")
882 self.init_repo("org/project1")
883 self.init_repo("org/project2")
884 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700885 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700886 self.init_repo("org/project5")
887 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700888 self.init_repo("org/one-job-project")
889 self.init_repo("org/nonvoting-project")
890 self.init_repo("org/templated-project")
891 self.init_repo("org/layered-project")
892 self.init_repo("org/node-project")
893 self.init_repo("org/conflict-project")
894 self.init_repo("org/noop-project")
895 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000896 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700897
898 self.statsd = FakeStatsd()
899 os.environ['STATSD_HOST'] = 'localhost'
900 os.environ['STATSD_PORT'] = str(self.statsd.port)
901 self.statsd.start()
902 # the statsd client object is configured in the statsd module import
903 reload(statsd)
904 reload(zuul.scheduler)
905
906 self.gearman_server = FakeGearmanServer()
907
908 self.config.set('gearman', 'port', str(self.gearman_server.port))
909
910 self.worker = FakeWorker('fake_worker', self)
911 self.worker.addServer('127.0.0.1', self.gearman_server.port)
912 self.gearman_server.worker = self.worker
913
914 self.merge_server = zuul.merger.server.MergeServer(self.config)
915 self.merge_server.start()
916
917 self.sched = zuul.scheduler.Scheduler()
918
919 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
920 FakeSwiftClientConnection))
921 self.swift = zuul.lib.swift.Swift(self.config)
922
923 def URLOpenerFactory(*args, **kw):
924 if isinstance(args[0], urllib2.Request):
925 return old_urlopen(*args, **kw)
926 args = [self.fake_gerrit] + list(args)
927 return FakeURLOpener(self.upstream_root, *args, **kw)
928
929 old_urlopen = urllib2.urlopen
930 urllib2.urlopen = URLOpenerFactory
931
932 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
933 self.swift)
934 self.merge_client = zuul.merger.client.MergeClient(
935 self.config, self.sched)
936
937 self.smtp_messages = []
938
939 def FakeSMTPFactory(*args, **kw):
940 args = [self.smtp_messages] + list(args)
941 return FakeSMTP(*args, **kw)
942
943 zuul.lib.gerrit.Gerrit = FakeGerrit
944 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
945
946 self.gerrit = FakeGerritTrigger(
947 self.upstream_root, self.config, self.sched)
948 self.gerrit.replication_timeout = 1.5
949 self.gerrit.replication_retry_interval = 0.5
950 self.fake_gerrit = self.gerrit.gerrit
951 self.fake_gerrit.upstream_root = self.upstream_root
952
953 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
954 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
955
956 self.sched.setLauncher(self.launcher)
957 self.sched.setMerger(self.merge_client)
958 self.sched.registerTrigger(self.gerrit)
959 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
960 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000961 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
962 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700963 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700964
965 self.sched.registerReporter(
966 zuul.reporter.gerrit.Reporter(self.gerrit))
967 self.smtp_reporter = zuul.reporter.smtp.Reporter(
968 self.config.get('smtp', 'default_from'),
969 self.config.get('smtp', 'default_to'),
970 self.config.get('smtp', 'server'))
971 self.sched.registerReporter(self.smtp_reporter)
972
973 self.sched.start()
974 self.sched.reconfigure(self.config)
975 self.sched.resume()
976 self.webapp.start()
977 self.rpc.start()
978 self.launcher.gearman.waitForServer()
979 self.registerJobs()
980 self.builds = self.worker.running_builds
981 self.history = self.worker.build_history
982
983 self.addCleanup(self.assertFinalState)
984 self.addCleanup(self.shutdown)
985
986 def setup_config(self):
987 """Per test config object. Override to set different config."""
988 self.config = ConfigParser.ConfigParser()
989 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
990
991 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700992 # Make sure that git.Repo objects have been garbage collected.
993 repos = []
994 gc.collect()
995 for obj in gc.get_objects():
996 if isinstance(obj, git.Repo):
997 repos.append(obj)
998 self.assertEqual(len(repos), 0)
999 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001000 for pipeline in self.sched.layout.pipelines.values():
1001 if isinstance(pipeline.manager,
1002 zuul.scheduler.IndependentPipelineManager):
1003 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001004
1005 def shutdown(self):
1006 self.log.debug("Shutting down after tests")
1007 self.launcher.stop()
1008 self.merge_server.stop()
1009 self.merge_server.join()
1010 self.merge_client.stop()
1011 self.worker.shutdown()
1012 self.gerrit.stop()
1013 self.timer.stop()
1014 self.sched.stop()
1015 self.sched.join()
1016 self.statsd.stop()
1017 self.statsd.join()
1018 self.webapp.stop()
1019 self.webapp.join()
1020 self.rpc.stop()
1021 self.rpc.join()
1022 self.gearman_server.shutdown()
1023 threads = threading.enumerate()
1024 if len(threads) > 1:
1025 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001026
1027 def init_repo(self, project):
1028 parts = project.split('/')
1029 path = os.path.join(self.upstream_root, *parts[:-1])
1030 if not os.path.exists(path):
1031 os.makedirs(path)
1032 path = os.path.join(self.upstream_root, project)
1033 repo = git.Repo.init(path)
1034
1035 repo.config_writer().set_value('user', 'email', 'user@example.com')
1036 repo.config_writer().set_value('user', 'name', 'User Name')
1037 repo.config_writer().write()
1038
1039 fn = os.path.join(path, 'README')
1040 f = open(fn, 'w')
1041 f.write("test\n")
1042 f.close()
1043 repo.index.add([fn])
1044 repo.index.commit('initial commit')
1045 master = repo.create_head('master')
1046 repo.create_tag('init')
1047
James E. Blair97d902e2014-08-21 13:25:56 -07001048 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001049 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001050 repo.git.clean('-x', '-f', '-d')
1051
1052 self.create_branch(project, 'mp')
1053
1054 def create_branch(self, project, branch):
1055 path = os.path.join(self.upstream_root, project)
1056 repo = git.Repo.init(path)
1057 fn = os.path.join(path, 'README')
1058
1059 branch_head = repo.create_head(branch)
1060 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001061 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001062 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001063 f.close()
1064 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001065 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001066
James E. Blair97d902e2014-08-21 13:25:56 -07001067 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001068 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001069 repo.git.clean('-x', '-f', '-d')
1070
1071 def ref_has_change(self, ref, change):
1072 path = os.path.join(self.git_root, change.project)
1073 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001074 try:
1075 for commit in repo.iter_commits(ref):
1076 if commit.message.strip() == ('%s-1' % change.subject):
1077 return True
1078 except GitCommandError:
1079 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001080 return False
1081
1082 def job_has_changes(self, *args):
1083 job = args[0]
1084 commits = args[1:]
1085 if isinstance(job, FakeBuild):
1086 parameters = job.parameters
1087 else:
1088 parameters = json.loads(job.arguments)
1089 project = parameters['ZUUL_PROJECT']
1090 path = os.path.join(self.git_root, project)
1091 repo = git.Repo(path)
1092 ref = parameters['ZUUL_REF']
1093 sha = parameters['ZUUL_COMMIT']
1094 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1095 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1096 commit_messages = ['%s-1' % commit.subject for commit in commits]
1097 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1098 " repo_messages %s; sha %s" % (job, commit_messages,
1099 repo_messages, sha))
1100 for msg in commit_messages:
1101 if msg not in repo_messages:
1102 self.log.debug(" messages do not match")
1103 return False
1104 if repo_shas[0] != sha:
1105 self.log.debug(" sha does not match")
1106 return False
1107 self.log.debug(" OK")
1108 return True
1109
1110 def registerJobs(self):
1111 count = 0
1112 for job in self.sched.layout.jobs.keys():
1113 self.worker.registerFunction('build:' + job)
1114 count += 1
1115 self.worker.registerFunction('stop:' + self.worker.worker_id)
1116 count += 1
1117
1118 while len(self.gearman_server.functions) < count:
1119 time.sleep(0)
1120
James E. Blairb8c16472015-05-05 14:55:26 -07001121 def orderedRelease(self):
1122 # Run one build at a time to ensure non-race order:
1123 while len(self.builds):
1124 self.release(self.builds[0])
1125 self.waitUntilSettled()
1126
Clark Boylanb640e052014-04-03 16:41:46 -07001127 def release(self, job):
1128 if isinstance(job, FakeBuild):
1129 job.release()
1130 else:
1131 job.waiting = False
1132 self.log.debug("Queued job %s released" % job.unique)
1133 self.gearman_server.wakeConnections()
1134
1135 def getParameter(self, job, name):
1136 if isinstance(job, FakeBuild):
1137 return job.parameters[name]
1138 else:
1139 parameters = json.loads(job.arguments)
1140 return parameters[name]
1141
1142 def resetGearmanServer(self):
1143 self.worker.setFunctions([])
1144 while True:
1145 done = True
1146 for connection in self.gearman_server.active_connections:
1147 if (connection.functions and
1148 connection.client_id not in ['Zuul RPC Listener',
1149 'Zuul Merger']):
1150 done = False
1151 if done:
1152 break
1153 time.sleep(0)
1154 self.gearman_server.functions = set()
1155 self.rpc.register()
1156 self.merge_server.register()
1157
1158 def haveAllBuildsReported(self):
1159 # See if Zuul is waiting on a meta job to complete
1160 if self.launcher.meta_jobs:
1161 return False
1162 # Find out if every build that the worker has completed has been
1163 # reported back to Zuul. If it hasn't then that means a Gearman
1164 # event is still in transit and the system is not stable.
1165 for build in self.worker.build_history:
1166 zbuild = self.launcher.builds.get(build.uuid)
1167 if not zbuild:
1168 # It has already been reported
1169 continue
1170 # It hasn't been reported yet.
1171 return False
1172 # Make sure that none of the worker connections are in GRAB_WAIT
1173 for connection in self.worker.active_connections:
1174 if connection.state == 'GRAB_WAIT':
1175 return False
1176 return True
1177
1178 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001179 builds = self.launcher.builds.values()
1180 for build in builds:
1181 client_job = None
1182 for conn in self.launcher.gearman.active_connections:
1183 for j in conn.related_jobs.values():
1184 if j.unique == build.uuid:
1185 client_job = j
1186 break
1187 if not client_job:
1188 self.log.debug("%s is not known to the gearman client" %
1189 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001190 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001191 if not client_job.handle:
1192 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001193 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001194 server_job = self.gearman_server.jobs.get(client_job.handle)
1195 if not server_job:
1196 self.log.debug("%s is not known to the gearman server" %
1197 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001198 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001199 if not hasattr(server_job, 'waiting'):
1200 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001201 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001202 if server_job.waiting:
1203 continue
1204 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1205 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001206 if build.number is None:
1207 self.log.debug("%s has not reported start" % worker_job)
1208 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001209 if worker_job.build.isWaiting():
1210 continue
1211 else:
1212 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001213 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001214 else:
1215 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001216 return False
1217 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001218
1219 def waitUntilSettled(self):
1220 self.log.debug("Waiting until settled...")
1221 start = time.time()
1222 while True:
1223 if time.time() - start > 10:
1224 print 'queue status:',
1225 print self.sched.trigger_event_queue.empty(),
1226 print self.sched.result_event_queue.empty(),
1227 print self.fake_gerrit.event_queue.empty(),
1228 print self.areAllBuildsWaiting()
1229 raise Exception("Timeout waiting for Zuul to settle")
1230 # Make sure no new events show up while we're checking
1231 self.worker.lock.acquire()
1232 # have all build states propogated to zuul?
1233 if self.haveAllBuildsReported():
1234 # Join ensures that the queue is empty _and_ events have been
1235 # processed
1236 self.fake_gerrit.event_queue.join()
1237 self.sched.trigger_event_queue.join()
1238 self.sched.result_event_queue.join()
1239 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001240 if (not self.merge_client.build_sets and
1241 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001242 self.sched.result_event_queue.empty() and
1243 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001244 self.haveAllBuildsReported() and
1245 self.areAllBuildsWaiting()):
1246 self.sched.run_handler_lock.release()
1247 self.worker.lock.release()
1248 self.log.debug("...settled.")
1249 return
1250 self.sched.run_handler_lock.release()
1251 self.worker.lock.release()
1252 self.sched.wake_event.wait(0.1)
1253
1254 def countJobResults(self, jobs, result):
1255 jobs = filter(lambda x: x.result == result, jobs)
1256 return len(jobs)
1257
1258 def getJobFromHistory(self, name):
1259 history = self.worker.build_history
1260 for job in history:
1261 if job.name == name:
1262 return job
1263 raise Exception("Unable to find job %s in history" % name)
1264
1265 def assertEmptyQueues(self):
1266 # Make sure there are no orphaned jobs
1267 for pipeline in self.sched.layout.pipelines.values():
1268 for queue in pipeline.queues:
1269 if len(queue.queue) != 0:
1270 print 'pipeline %s queue %s contents %s' % (
1271 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001272 self.assertEqual(len(queue.queue), 0,
1273 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001274
1275 def assertReportedStat(self, key, value=None, kind=None):
1276 start = time.time()
1277 while time.time() < (start + 5):
1278 for stat in self.statsd.stats:
1279 pprint.pprint(self.statsd.stats)
1280 k, v = stat.split(':')
1281 if key == k:
1282 if value is None and kind is None:
1283 return
1284 elif value:
1285 if value == v:
1286 return
1287 elif kind:
1288 if v.endswith('|' + kind):
1289 return
1290 time.sleep(0.1)
1291
1292 pprint.pprint(self.statsd.stats)
1293 raise Exception("Key %s not found in reported stats" % key)