blob: d324113f7692badb6ad5dd32dc29180aecc35976 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
Joshua Hesketh642824b2014-07-01 17:54:59 +1000261 def addApproval(self, category, value, username='reviewer_john',
262 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000280 'author': {'email': 'author@example.com',
281 'name': 'Patchset Author',
282 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000286 'owner': {'email': 'owner@example.com',
287 'name': 'Change Owner',
288 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000293 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700381 log = logging.getLogger("zuul.test.FakeGerrit")
382
Joshua Hesketh642824b2014-07-01 17:54:59 +1000383 def __init__(self, hostname, username, port=29418, keyfile=None,
384 changes_dbs={}):
385 self.hostname = hostname
386 self.username = username
387 self.port = port
388 self.keyfile = keyfile
Clark Boylanb640e052014-04-03 16:41:46 -0700389 self.event_queue = Queue.Queue()
390 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
391 self.change_number = 0
Joshua Hesketh642824b2014-07-01 17:54:59 +1000392 self.changes = changes_dbs.get(hostname, {})
James E. Blairf8ff9932014-08-15 15:24:24 -0700393 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700394
395 def addFakeChange(self, project, branch, subject, status='NEW'):
396 self.change_number += 1
397 c = FakeChange(self, self.change_number, project, branch, subject,
398 upstream_root=self.upstream_root,
399 status=status)
400 self.changes[self.change_number] = c
401 return c
402
403 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700404 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700405
406 def getEvent(self):
407 return self.event_queue.get()
408
409 def eventDone(self):
410 self.event_queue.task_done()
411
412 def review(self, project, changeid, message, action):
413 number, ps = changeid.split(',')
414 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000415
416 # Add the approval back onto the change (ie simulate what gerrit would
417 # do).
418 # Usually when zuul leaves a review it'll create a feedback loop where
419 # zuul's review enters another gerrit event (which is then picked up by
420 # zuul). However, we can't mimic this behaviour (by adding this
421 # approval event into the queue) as it stops jobs from checking what
422 # happens before this event is triggered. If a job needs to see what
423 # happens they can add their own verified event into the queue.
424 # Nevertheless, we can update change with the new review in gerrit.
425
426 for cat in ['CRVW', 'VRFY', 'APRV']:
427 if cat in action:
428 change.addApproval(cat, action[cat], username=self.username)
429
430 if 'label' in action:
431 parts = action['label'].split('=')
432 change.addApproval(parts[0], parts[2], username=self.username)
433
Clark Boylanb640e052014-04-03 16:41:46 -0700434 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000435
Clark Boylanb640e052014-04-03 16:41:46 -0700436 if 'submit' in action:
437 change.setMerged()
438 if message:
439 change.setReported()
440
441 def query(self, number):
442 change = self.changes.get(int(number))
443 if change:
444 return change.query()
445 return {}
446
James E. Blairc494d542014-08-06 09:23:52 -0700447 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700448 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700449 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800450 if query.startswith('change:'):
451 # Query a specific changeid
452 changeid = query[len('change:'):]
453 l = [change.query() for change in self.changes.values()
454 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700455 elif query.startswith('message:'):
456 # Query the content of a commit message
457 msg = query[len('message:'):].strip()
458 l = [change.query() for change in self.changes.values()
459 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800460 else:
461 # Query all open changes
462 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700463 return l
James E. Blairc494d542014-08-06 09:23:52 -0700464
Clark Boylanb640e052014-04-03 16:41:46 -0700465 def startWatching(self, *args, **kw):
466 pass
467
468
469class BuildHistory(object):
470 def __init__(self, **kw):
471 self.__dict__.update(kw)
472
473 def __repr__(self):
474 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
475 (self.result, self.name, self.number, self.changes))
476
477
478class FakeURLOpener(object):
479 def __init__(self, upstream_root, fake_gerrit, url):
480 self.upstream_root = upstream_root
481 self.fake_gerrit = fake_gerrit
482 self.url = url
483
484 def read(self):
485 res = urlparse.urlparse(self.url)
486 path = res.path
487 project = '/'.join(path.split('/')[2:-2])
488 ret = '001e# service=git-upload-pack\n'
489 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
490 'multi_ack thin-pack side-band side-band-64k ofs-delta '
491 'shallow no-progress include-tag multi_ack_detailed no-done\n')
492 path = os.path.join(self.upstream_root, project)
493 repo = git.Repo(path)
494 for ref in repo.refs:
495 r = ref.object.hexsha + ' ' + ref.path + '\n'
496 ret += '%04x%s' % (len(r) + 4, r)
497 ret += '0000'
498 return ret
499
500
501class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
502 name = 'gerrit'
503
504 def __init__(self, upstream_root, *args):
505 super(FakeGerritTrigger, self).__init__(*args)
506 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700507 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700508
509 def getGitUrl(self, project):
510 return os.path.join(self.upstream_root, project.name)
511
512
513class FakeStatsd(threading.Thread):
514 def __init__(self):
515 threading.Thread.__init__(self)
516 self.daemon = True
517 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
518 self.sock.bind(('', 0))
519 self.port = self.sock.getsockname()[1]
520 self.wake_read, self.wake_write = os.pipe()
521 self.stats = []
522
523 def run(self):
524 while True:
525 poll = select.poll()
526 poll.register(self.sock, select.POLLIN)
527 poll.register(self.wake_read, select.POLLIN)
528 ret = poll.poll()
529 for (fd, event) in ret:
530 if fd == self.sock.fileno():
531 data = self.sock.recvfrom(1024)
532 if not data:
533 return
534 self.stats.append(data[0])
535 if fd == self.wake_read:
536 return
537
538 def stop(self):
539 os.write(self.wake_write, '1\n')
540
541
542class FakeBuild(threading.Thread):
543 log = logging.getLogger("zuul.test")
544
545 def __init__(self, worker, job, number, node):
546 threading.Thread.__init__(self)
547 self.daemon = True
548 self.worker = worker
549 self.job = job
550 self.name = job.name.split(':')[1]
551 self.number = number
552 self.node = node
553 self.parameters = json.loads(job.arguments)
554 self.unique = self.parameters['ZUUL_UUID']
555 self.wait_condition = threading.Condition()
556 self.waiting = False
557 self.aborted = False
558 self.created = time.time()
559 self.description = ''
560 self.run_error = False
561
562 def release(self):
563 self.wait_condition.acquire()
564 self.wait_condition.notify()
565 self.waiting = False
566 self.log.debug("Build %s released" % self.unique)
567 self.wait_condition.release()
568
569 def isWaiting(self):
570 self.wait_condition.acquire()
571 if self.waiting:
572 ret = True
573 else:
574 ret = False
575 self.wait_condition.release()
576 return ret
577
578 def _wait(self):
579 self.wait_condition.acquire()
580 self.waiting = True
581 self.log.debug("Build %s waiting" % self.unique)
582 self.wait_condition.wait()
583 self.wait_condition.release()
584
585 def run(self):
586 data = {
587 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
588 'name': self.name,
589 'number': self.number,
590 'manager': self.worker.worker_id,
591 'worker_name': 'My Worker',
592 'worker_hostname': 'localhost',
593 'worker_ips': ['127.0.0.1', '192.168.1.1'],
594 'worker_fqdn': 'zuul.example.org',
595 'worker_program': 'FakeBuilder',
596 'worker_version': 'v1.1',
597 'worker_extra': {'something': 'else'}
598 }
599
600 self.log.debug('Running build %s' % self.unique)
601
602 self.job.sendWorkData(json.dumps(data))
603 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
604 self.job.sendWorkStatus(0, 100)
605
606 if self.worker.hold_jobs_in_build:
607 self.log.debug('Holding build %s' % self.unique)
608 self._wait()
609 self.log.debug("Build %s continuing" % self.unique)
610
611 self.worker.lock.acquire()
612
613 result = 'SUCCESS'
614 if (('ZUUL_REF' in self.parameters) and
615 self.worker.shouldFailTest(self.name,
616 self.parameters['ZUUL_REF'])):
617 result = 'FAILURE'
618 if self.aborted:
619 result = 'ABORTED'
620
621 if self.run_error:
622 work_fail = True
623 result = 'RUN_ERROR'
624 else:
625 data['result'] = result
626 work_fail = False
627
628 changes = None
629 if 'ZUUL_CHANGE_IDS' in self.parameters:
630 changes = self.parameters['ZUUL_CHANGE_IDS']
631
632 self.worker.build_history.append(
633 BuildHistory(name=self.name, number=self.number,
634 result=result, changes=changes, node=self.node,
635 uuid=self.unique, description=self.description,
636 pipeline=self.parameters['ZUUL_PIPELINE'])
637 )
638
639 self.job.sendWorkData(json.dumps(data))
640 if work_fail:
641 self.job.sendWorkFail()
642 else:
643 self.job.sendWorkComplete(json.dumps(data))
644 del self.worker.gearman_jobs[self.job.unique]
645 self.worker.running_builds.remove(self)
646 self.worker.lock.release()
647
648
649class FakeWorker(gear.Worker):
650 def __init__(self, worker_id, test):
651 super(FakeWorker, self).__init__(worker_id)
652 self.gearman_jobs = {}
653 self.build_history = []
654 self.running_builds = []
655 self.build_counter = 0
656 self.fail_tests = {}
657 self.test = test
658
659 self.hold_jobs_in_build = False
660 self.lock = threading.Lock()
661 self.__work_thread = threading.Thread(target=self.work)
662 self.__work_thread.daemon = True
663 self.__work_thread.start()
664
665 def handleJob(self, job):
666 parts = job.name.split(":")
667 cmd = parts[0]
668 name = parts[1]
669 if len(parts) > 2:
670 node = parts[2]
671 else:
672 node = None
673 if cmd == 'build':
674 self.handleBuild(job, name, node)
675 elif cmd == 'stop':
676 self.handleStop(job, name)
677 elif cmd == 'set_description':
678 self.handleSetDescription(job, name)
679
680 def handleBuild(self, job, name, node):
681 build = FakeBuild(self, job, self.build_counter, node)
682 job.build = build
683 self.gearman_jobs[job.unique] = job
684 self.build_counter += 1
685
686 self.running_builds.append(build)
687 build.start()
688
689 def handleStop(self, job, name):
690 self.log.debug("handle stop")
691 parameters = json.loads(job.arguments)
692 name = parameters['name']
693 number = parameters['number']
694 for build in self.running_builds:
695 if build.name == name and build.number == number:
696 build.aborted = True
697 build.release()
698 job.sendWorkComplete()
699 return
700 job.sendWorkFail()
701
702 def handleSetDescription(self, job, name):
703 self.log.debug("handle set description")
704 parameters = json.loads(job.arguments)
705 name = parameters['name']
706 number = parameters['number']
707 descr = parameters['html_description']
708 for build in self.running_builds:
709 if build.name == name and build.number == number:
710 build.description = descr
711 job.sendWorkComplete()
712 return
713 for build in self.build_history:
714 if build.name == name and build.number == number:
715 build.description = descr
716 job.sendWorkComplete()
717 return
718 job.sendWorkFail()
719
720 def work(self):
721 while self.running:
722 try:
723 job = self.getJob()
724 except gear.InterruptedError:
725 continue
726 try:
727 self.handleJob(job)
728 except:
729 self.log.exception("Worker exception:")
730
731 def addFailTest(self, name, change):
732 l = self.fail_tests.get(name, [])
733 l.append(change)
734 self.fail_tests[name] = l
735
736 def shouldFailTest(self, name, ref):
737 l = self.fail_tests.get(name, [])
738 for change in l:
739 if self.test.ref_has_change(ref, change):
740 return True
741 return False
742
743 def release(self, regex=None):
744 builds = self.running_builds[:]
745 self.log.debug("releasing build %s (%s)" % (regex,
746 len(self.running_builds)))
747 for build in builds:
748 if not regex or re.match(regex, build.name):
749 self.log.debug("releasing build %s" %
750 (build.parameters['ZUUL_UUID']))
751 build.release()
752 else:
753 self.log.debug("not releasing build %s" %
754 (build.parameters['ZUUL_UUID']))
755 self.log.debug("done releasing builds %s (%s)" %
756 (regex, len(self.running_builds)))
757
758
759class FakeGearmanServer(gear.Server):
760 def __init__(self):
761 self.hold_jobs_in_queue = False
762 super(FakeGearmanServer, self).__init__(0)
763
764 def getJobForConnection(self, connection, peek=False):
765 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
766 for job in queue:
767 if not hasattr(job, 'waiting'):
768 if job.name.startswith('build:'):
769 job.waiting = self.hold_jobs_in_queue
770 else:
771 job.waiting = False
772 if job.waiting:
773 continue
774 if job.name in connection.functions:
775 if not peek:
776 queue.remove(job)
777 connection.related_jobs[job.handle] = job
778 job.worker_connection = connection
779 job.running = True
780 return job
781 return None
782
783 def release(self, regex=None):
784 released = False
785 qlen = (len(self.high_queue) + len(self.normal_queue) +
786 len(self.low_queue))
787 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
788 for job in self.getQueue():
789 cmd, name = job.name.split(':')
790 if cmd != 'build':
791 continue
792 if not regex or re.match(regex, name):
793 self.log.debug("releasing queued job %s" %
794 job.unique)
795 job.waiting = False
796 released = True
797 else:
798 self.log.debug("not releasing queued job %s" %
799 job.unique)
800 if released:
801 self.wakeConnections()
802 qlen = (len(self.high_queue) + len(self.normal_queue) +
803 len(self.low_queue))
804 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
805
806
807class FakeSMTP(object):
808 log = logging.getLogger('zuul.FakeSMTP')
809
810 def __init__(self, messages, server, port):
811 self.server = server
812 self.port = port
813 self.messages = messages
814
815 def sendmail(self, from_email, to_email, msg):
816 self.log.info("Sending email from %s, to %s, with msg %s" % (
817 from_email, to_email, msg))
818
819 headers = msg.split('\n\n', 1)[0]
820 body = msg.split('\n\n', 1)[1]
821
822 self.messages.append(dict(
823 from_email=from_email,
824 to_email=to_email,
825 msg=msg,
826 headers=headers,
827 body=body,
828 ))
829
830 return True
831
832 def quit(self):
833 return True
834
835
836class FakeSwiftClientConnection(swiftclient.client.Connection):
837 def post_account(self, headers):
838 # Do nothing
839 pass
840
841 def get_auth(self):
842 # Returns endpoint and (unused) auth token
843 endpoint = os.path.join('https://storage.example.org', 'V1',
844 'AUTH_account')
845 return endpoint, ''
846
847
Maru Newby3fe5f852015-01-13 04:22:14 +0000848class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700849 log = logging.getLogger("zuul.test")
850
851 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000852 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700853 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
854 try:
855 test_timeout = int(test_timeout)
856 except ValueError:
857 # If timeout value is invalid do not set a timeout.
858 test_timeout = 0
859 if test_timeout > 0:
860 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
861
862 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
863 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
864 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
865 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
866 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
867 os.environ.get('OS_STDERR_CAPTURE') == '1'):
868 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
869 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
870 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
871 os.environ.get('OS_LOG_CAPTURE') == '1'):
872 self.useFixture(fixtures.FakeLogger(
873 level=logging.DEBUG,
874 format='%(asctime)s %(name)-32s '
875 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000876
877
878class ZuulTestCase(BaseTestCase):
879
880 def setUp(self):
881 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700882 if USE_TEMPDIR:
883 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000884 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
885 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700886 else:
887 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700888 self.test_root = os.path.join(tmp_root, "zuul-test")
889 self.upstream_root = os.path.join(self.test_root, "upstream")
890 self.git_root = os.path.join(self.test_root, "git")
891
892 if os.path.exists(self.test_root):
893 shutil.rmtree(self.test_root)
894 os.makedirs(self.test_root)
895 os.makedirs(self.upstream_root)
896 os.makedirs(self.git_root)
897
898 # Make per test copy of Configuration.
899 self.setup_config()
900 self.config.set('zuul', 'layout_config',
901 os.path.join(FIXTURE_DIR, "layout.yaml"))
902 self.config.set('merger', 'git_dir', self.git_root)
903
904 # For each project in config:
905 self.init_repo("org/project")
906 self.init_repo("org/project1")
907 self.init_repo("org/project2")
908 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700909 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700910 self.init_repo("org/project5")
911 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700912 self.init_repo("org/one-job-project")
913 self.init_repo("org/nonvoting-project")
914 self.init_repo("org/templated-project")
915 self.init_repo("org/layered-project")
916 self.init_repo("org/node-project")
917 self.init_repo("org/conflict-project")
918 self.init_repo("org/noop-project")
919 self.init_repo("org/experimental-project")
920
921 self.statsd = FakeStatsd()
922 os.environ['STATSD_HOST'] = 'localhost'
923 os.environ['STATSD_PORT'] = str(self.statsd.port)
924 self.statsd.start()
925 # the statsd client object is configured in the statsd module import
926 reload(statsd)
927 reload(zuul.scheduler)
928
929 self.gearman_server = FakeGearmanServer()
930
931 self.config.set('gearman', 'port', str(self.gearman_server.port))
932
933 self.worker = FakeWorker('fake_worker', self)
934 self.worker.addServer('127.0.0.1', self.gearman_server.port)
935 self.gearman_server.worker = self.worker
936
937 self.merge_server = zuul.merger.server.MergeServer(self.config)
938 self.merge_server.start()
939
940 self.sched = zuul.scheduler.Scheduler()
941
942 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
943 FakeSwiftClientConnection))
944 self.swift = zuul.lib.swift.Swift(self.config)
945
946 def URLOpenerFactory(*args, **kw):
947 if isinstance(args[0], urllib2.Request):
948 return old_urlopen(*args, **kw)
949 args = [self.fake_gerrit] + list(args)
950 return FakeURLOpener(self.upstream_root, *args, **kw)
951
952 old_urlopen = urllib2.urlopen
953 urllib2.urlopen = URLOpenerFactory
954
955 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
956 self.swift)
957 self.merge_client = zuul.merger.client.MergeClient(
958 self.config, self.sched)
959
960 self.smtp_messages = []
961
962 def FakeSMTPFactory(*args, **kw):
963 args = [self.smtp_messages] + list(args)
964 return FakeSMTP(*args, **kw)
965
Joshua Hesketh642824b2014-07-01 17:54:59 +1000966 # Set a changes database so multiple FakeGerrit's can report back to
967 # a virtual canonical database given by the configured hostname
968 self.gerrit_changes_dbs = {
969 self.config.get('gerrit', 'server'): {}
970 }
971
972 def FakeGerritFactory(*args, **kw):
973 kw['changes_dbs'] = self.gerrit_changes_dbs
974 return FakeGerrit(*args, **kw)
975
976 self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
977 FakeGerritFactory))
978
Clark Boylanb640e052014-04-03 16:41:46 -0700979 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
980
981 self.gerrit = FakeGerritTrigger(
982 self.upstream_root, self.config, self.sched)
983 self.gerrit.replication_timeout = 1.5
984 self.gerrit.replication_retry_interval = 0.5
985 self.fake_gerrit = self.gerrit.gerrit
986 self.fake_gerrit.upstream_root = self.upstream_root
987
988 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
989 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
990
991 self.sched.setLauncher(self.launcher)
992 self.sched.setMerger(self.merge_client)
993 self.sched.registerTrigger(self.gerrit)
994 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
995 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000996 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
997 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700998 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700999
1000 self.sched.registerReporter(
1001 zuul.reporter.gerrit.Reporter(self.gerrit))
1002 self.smtp_reporter = zuul.reporter.smtp.Reporter(
1003 self.config.get('smtp', 'default_from'),
1004 self.config.get('smtp', 'default_to'),
1005 self.config.get('smtp', 'server'))
1006 self.sched.registerReporter(self.smtp_reporter)
1007
1008 self.sched.start()
1009 self.sched.reconfigure(self.config)
1010 self.sched.resume()
1011 self.webapp.start()
1012 self.rpc.start()
1013 self.launcher.gearman.waitForServer()
1014 self.registerJobs()
1015 self.builds = self.worker.running_builds
1016 self.history = self.worker.build_history
1017
1018 self.addCleanup(self.assertFinalState)
1019 self.addCleanup(self.shutdown)
1020
1021 def setup_config(self):
1022 """Per test config object. Override to set different config."""
1023 self.config = ConfigParser.ConfigParser()
1024 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
1025
1026 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001027 # Make sure that git.Repo objects have been garbage collected.
1028 repos = []
1029 gc.collect()
1030 for obj in gc.get_objects():
1031 if isinstance(obj, git.Repo):
1032 repos.append(obj)
1033 self.assertEqual(len(repos), 0)
1034 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001035 for pipeline in self.sched.layout.pipelines.values():
1036 if isinstance(pipeline.manager,
1037 zuul.scheduler.IndependentPipelineManager):
1038 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001039
1040 def shutdown(self):
1041 self.log.debug("Shutting down after tests")
1042 self.launcher.stop()
1043 self.merge_server.stop()
1044 self.merge_server.join()
1045 self.merge_client.stop()
1046 self.worker.shutdown()
1047 self.gerrit.stop()
1048 self.timer.stop()
1049 self.sched.stop()
1050 self.sched.join()
1051 self.statsd.stop()
1052 self.statsd.join()
1053 self.webapp.stop()
1054 self.webapp.join()
1055 self.rpc.stop()
1056 self.rpc.join()
1057 self.gearman_server.shutdown()
1058 threads = threading.enumerate()
1059 if len(threads) > 1:
1060 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001061
1062 def init_repo(self, project):
1063 parts = project.split('/')
1064 path = os.path.join(self.upstream_root, *parts[:-1])
1065 if not os.path.exists(path):
1066 os.makedirs(path)
1067 path = os.path.join(self.upstream_root, project)
1068 repo = git.Repo.init(path)
1069
1070 repo.config_writer().set_value('user', 'email', 'user@example.com')
1071 repo.config_writer().set_value('user', 'name', 'User Name')
1072 repo.config_writer().write()
1073
1074 fn = os.path.join(path, 'README')
1075 f = open(fn, 'w')
1076 f.write("test\n")
1077 f.close()
1078 repo.index.add([fn])
1079 repo.index.commit('initial commit')
1080 master = repo.create_head('master')
1081 repo.create_tag('init')
1082
James E. Blair97d902e2014-08-21 13:25:56 -07001083 repo.head.reference = master
1084 repo.head.reset(index=True, working_tree=True)
1085 repo.git.clean('-x', '-f', '-d')
1086
1087 self.create_branch(project, 'mp')
1088
1089 def create_branch(self, project, branch):
1090 path = os.path.join(self.upstream_root, project)
1091 repo = git.Repo.init(path)
1092 fn = os.path.join(path, 'README')
1093
1094 branch_head = repo.create_head(branch)
1095 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001096 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001097 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001098 f.close()
1099 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001100 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001101
James E. Blair97d902e2014-08-21 13:25:56 -07001102 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001103 repo.head.reset(index=True, working_tree=True)
1104 repo.git.clean('-x', '-f', '-d')
1105
1106 def ref_has_change(self, ref, change):
1107 path = os.path.join(self.git_root, change.project)
1108 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001109 try:
1110 for commit in repo.iter_commits(ref):
1111 if commit.message.strip() == ('%s-1' % change.subject):
1112 return True
1113 except GitCommandError:
1114 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001115 return False
1116
1117 def job_has_changes(self, *args):
1118 job = args[0]
1119 commits = args[1:]
1120 if isinstance(job, FakeBuild):
1121 parameters = job.parameters
1122 else:
1123 parameters = json.loads(job.arguments)
1124 project = parameters['ZUUL_PROJECT']
1125 path = os.path.join(self.git_root, project)
1126 repo = git.Repo(path)
1127 ref = parameters['ZUUL_REF']
1128 sha = parameters['ZUUL_COMMIT']
1129 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1130 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1131 commit_messages = ['%s-1' % commit.subject for commit in commits]
1132 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1133 " repo_messages %s; sha %s" % (job, commit_messages,
1134 repo_messages, sha))
1135 for msg in commit_messages:
1136 if msg not in repo_messages:
1137 self.log.debug(" messages do not match")
1138 return False
1139 if repo_shas[0] != sha:
1140 self.log.debug(" sha does not match")
1141 return False
1142 self.log.debug(" OK")
1143 return True
1144
1145 def registerJobs(self):
1146 count = 0
1147 for job in self.sched.layout.jobs.keys():
1148 self.worker.registerFunction('build:' + job)
1149 count += 1
1150 self.worker.registerFunction('stop:' + self.worker.worker_id)
1151 count += 1
1152
1153 while len(self.gearman_server.functions) < count:
1154 time.sleep(0)
1155
James E. Blairb8c16472015-05-05 14:55:26 -07001156 def orderedRelease(self):
1157 # Run one build at a time to ensure non-race order:
1158 while len(self.builds):
1159 self.release(self.builds[0])
1160 self.waitUntilSettled()
1161
Clark Boylanb640e052014-04-03 16:41:46 -07001162 def release(self, job):
1163 if isinstance(job, FakeBuild):
1164 job.release()
1165 else:
1166 job.waiting = False
1167 self.log.debug("Queued job %s released" % job.unique)
1168 self.gearman_server.wakeConnections()
1169
1170 def getParameter(self, job, name):
1171 if isinstance(job, FakeBuild):
1172 return job.parameters[name]
1173 else:
1174 parameters = json.loads(job.arguments)
1175 return parameters[name]
1176
1177 def resetGearmanServer(self):
1178 self.worker.setFunctions([])
1179 while True:
1180 done = True
1181 for connection in self.gearman_server.active_connections:
1182 if (connection.functions and
1183 connection.client_id not in ['Zuul RPC Listener',
1184 'Zuul Merger']):
1185 done = False
1186 if done:
1187 break
1188 time.sleep(0)
1189 self.gearman_server.functions = set()
1190 self.rpc.register()
1191 self.merge_server.register()
1192
1193 def haveAllBuildsReported(self):
1194 # See if Zuul is waiting on a meta job to complete
1195 if self.launcher.meta_jobs:
1196 return False
1197 # Find out if every build that the worker has completed has been
1198 # reported back to Zuul. If it hasn't then that means a Gearman
1199 # event is still in transit and the system is not stable.
1200 for build in self.worker.build_history:
1201 zbuild = self.launcher.builds.get(build.uuid)
1202 if not zbuild:
1203 # It has already been reported
1204 continue
1205 # It hasn't been reported yet.
1206 return False
1207 # Make sure that none of the worker connections are in GRAB_WAIT
1208 for connection in self.worker.active_connections:
1209 if connection.state == 'GRAB_WAIT':
1210 return False
1211 return True
1212
1213 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001214 builds = self.launcher.builds.values()
1215 for build in builds:
1216 client_job = None
1217 for conn in self.launcher.gearman.active_connections:
1218 for j in conn.related_jobs.values():
1219 if j.unique == build.uuid:
1220 client_job = j
1221 break
1222 if not client_job:
1223 self.log.debug("%s is not known to the gearman client" %
1224 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001225 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001226 if not client_job.handle:
1227 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001228 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001229 server_job = self.gearman_server.jobs.get(client_job.handle)
1230 if not server_job:
1231 self.log.debug("%s is not known to the gearman server" %
1232 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001233 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001234 if not hasattr(server_job, 'waiting'):
1235 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001236 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001237 if server_job.waiting:
1238 continue
1239 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1240 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001241 if build.number is None:
1242 self.log.debug("%s has not reported start" % worker_job)
1243 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001244 if worker_job.build.isWaiting():
1245 continue
1246 else:
1247 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001248 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001249 else:
1250 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001251 return False
1252 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001253
1254 def waitUntilSettled(self):
1255 self.log.debug("Waiting until settled...")
1256 start = time.time()
1257 while True:
1258 if time.time() - start > 10:
1259 print 'queue status:',
1260 print self.sched.trigger_event_queue.empty(),
1261 print self.sched.result_event_queue.empty(),
1262 print self.fake_gerrit.event_queue.empty(),
1263 print self.areAllBuildsWaiting()
1264 raise Exception("Timeout waiting for Zuul to settle")
1265 # Make sure no new events show up while we're checking
1266 self.worker.lock.acquire()
1267 # have all build states propogated to zuul?
1268 if self.haveAllBuildsReported():
1269 # Join ensures that the queue is empty _and_ events have been
1270 # processed
1271 self.fake_gerrit.event_queue.join()
1272 self.sched.trigger_event_queue.join()
1273 self.sched.result_event_queue.join()
1274 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001275 if (not self.merge_client.build_sets and
1276 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001277 self.sched.result_event_queue.empty() and
1278 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001279 self.haveAllBuildsReported() and
1280 self.areAllBuildsWaiting()):
1281 self.sched.run_handler_lock.release()
1282 self.worker.lock.release()
1283 self.log.debug("...settled.")
1284 return
1285 self.sched.run_handler_lock.release()
1286 self.worker.lock.release()
1287 self.sched.wake_event.wait(0.1)
1288
1289 def countJobResults(self, jobs, result):
1290 jobs = filter(lambda x: x.result == result, jobs)
1291 return len(jobs)
1292
1293 def getJobFromHistory(self, name):
1294 history = self.worker.build_history
1295 for job in history:
1296 if job.name == name:
1297 return job
1298 raise Exception("Unable to find job %s in history" % name)
1299
1300 def assertEmptyQueues(self):
1301 # Make sure there are no orphaned jobs
1302 for pipeline in self.sched.layout.pipelines.values():
1303 for queue in pipeline.queues:
1304 if len(queue.queue) != 0:
1305 print 'pipeline %s queue %s contents %s' % (
1306 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001307 self.assertEqual(len(queue.queue), 0,
1308 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001309
1310 def assertReportedStat(self, key, value=None, kind=None):
1311 start = time.time()
1312 while time.time() < (start + 5):
1313 for stat in self.statsd.stats:
1314 pprint.pprint(self.statsd.stats)
1315 k, v = stat.split(':')
1316 if key == k:
1317 if value is None and kind is None:
1318 return
1319 elif value:
1320 if value == v:
1321 return
1322 elif kind:
1323 if v.endswith('|' + kind):
1324 return
1325 time.sleep(0.1)
1326
1327 pprint.pprint(self.statsd.stats)
1328 raise Exception("Key %s not found in reported stats" % key)