blob: 5ddb160af0c856e9b33844895433e17ad3a6a9cc [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070050import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070051import zuul.merger.merger
52import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070053import zuul.reporter.gerrit
54import zuul.reporter.smtp
55import zuul.trigger.gerrit
56import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070057import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070058
59FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
60 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070061USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070062
63logging.basicConfig(level=logging.DEBUG,
64 format='%(asctime)s %(name)-32s '
65 '%(levelname)-8s %(message)s')
66
67
68def repack_repo(path):
69 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
70 output = subprocess.Popen(cmd, close_fds=True,
71 stdout=subprocess.PIPE,
72 stderr=subprocess.PIPE)
73 out = output.communicate()
74 if output.returncode:
75 raise Exception("git repack returned %d" % output.returncode)
76 return out
77
78
79def random_sha1():
80 return hashlib.sha1(str(random.random())).hexdigest()
81
82
James E. Blaira190f3b2015-01-05 14:56:54 -080083def iterate_timeout(max_seconds, purpose):
84 start = time.time()
85 count = 0
86 while (time.time() < start + max_seconds):
87 count += 1
88 yield count
89 time.sleep(0)
90 raise Exception("Timeout waiting for %s" % purpose)
91
92
Clark Boylanb640e052014-04-03 16:41:46 -070093class ChangeReference(git.Reference):
94 _common_path_default = "refs/changes"
95 _points_to_commits_only = True
96
97
98class FakeChange(object):
99 categories = {'APRV': ('Approved', -1, 1),
100 'CRVW': ('Code-Review', -2, 2),
101 'VRFY': ('Verified', -2, 2)}
102
103 def __init__(self, gerrit, number, project, branch, subject,
104 status='NEW', upstream_root=None):
105 self.gerrit = gerrit
106 self.reported = 0
107 self.queried = 0
108 self.patchsets = []
109 self.number = number
110 self.project = project
111 self.branch = branch
112 self.subject = subject
113 self.latest_patchset = 0
114 self.depends_on_change = None
115 self.needed_by_changes = []
116 self.fail_merge = False
117 self.messages = []
118 self.data = {
119 'branch': branch,
120 'comments': [],
121 'commitMessage': subject,
122 'createdOn': time.time(),
123 'id': 'I' + random_sha1(),
124 'lastUpdated': time.time(),
125 'number': str(number),
126 'open': status == 'NEW',
127 'owner': {'email': 'user@example.com',
128 'name': 'User Name',
129 'username': 'username'},
130 'patchSets': self.patchsets,
131 'project': project,
132 'status': status,
133 'subject': subject,
134 'submitRecords': [],
135 'url': 'https://hostname/%s' % number}
136
137 self.upstream_root = upstream_root
138 self.addPatchset()
139 self.data['submitRecords'] = self.getSubmitRecords()
140 self.open = status == 'NEW'
141
142 def add_fake_change_to_repo(self, msg, fn, large):
143 path = os.path.join(self.upstream_root, self.project)
144 repo = git.Repo(path)
145 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
146 self.latest_patchset),
147 'refs/tags/init')
148 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700149 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700150 repo.git.clean('-x', '-f', '-d')
151
152 path = os.path.join(self.upstream_root, self.project)
153 if not large:
154 fn = os.path.join(path, fn)
155 f = open(fn, 'w')
156 f.write("test %s %s %s\n" %
157 (self.branch, self.number, self.latest_patchset))
158 f.close()
159 repo.index.add([fn])
160 else:
161 for fni in range(100):
162 fn = os.path.join(path, str(fni))
163 f = open(fn, 'w')
164 for ci in range(4096):
165 f.write(random.choice(string.printable))
166 f.close()
167 repo.index.add([fn])
168
169 r = repo.index.commit(msg)
170 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700171 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700172 repo.git.clean('-x', '-f', '-d')
173 repo.heads['master'].checkout()
174 return r
175
176 def addPatchset(self, files=[], large=False):
177 self.latest_patchset += 1
178 if files:
179 fn = files[0]
180 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700181 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700182 msg = self.subject + '-' + str(self.latest_patchset)
183 c = self.add_fake_change_to_repo(msg, fn, large)
184 ps_files = [{'file': '/COMMIT_MSG',
185 'type': 'ADDED'},
186 {'file': 'README',
187 'type': 'MODIFIED'}]
188 for f in files:
189 ps_files.append({'file': f, 'type': 'ADDED'})
190 d = {'approvals': [],
191 'createdOn': time.time(),
192 'files': ps_files,
193 'number': str(self.latest_patchset),
194 'ref': 'refs/changes/1/%s/%s' % (self.number,
195 self.latest_patchset),
196 'revision': c.hexsha,
197 'uploader': {'email': 'user@example.com',
198 'name': 'User name',
199 'username': 'user'}}
200 self.data['currentPatchSet'] = d
201 self.patchsets.append(d)
202 self.data['submitRecords'] = self.getSubmitRecords()
203
204 def getPatchsetCreatedEvent(self, patchset):
205 event = {"type": "patchset-created",
206 "change": {"project": self.project,
207 "branch": self.branch,
208 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
209 "number": str(self.number),
210 "subject": self.subject,
211 "owner": {"name": "User Name"},
212 "url": "https://hostname/3"},
213 "patchSet": self.patchsets[patchset - 1],
214 "uploader": {"name": "User Name"}}
215 return event
216
217 def getChangeRestoredEvent(self):
218 event = {"type": "change-restored",
219 "change": {"project": self.project,
220 "branch": self.branch,
221 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
222 "number": str(self.number),
223 "subject": self.subject,
224 "owner": {"name": "User Name"},
225 "url": "https://hostname/3"},
226 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100227 "patchSet": self.patchsets[-1],
228 "reason": ""}
229 return event
230
231 def getChangeAbandonedEvent(self):
232 event = {"type": "change-abandoned",
233 "change": {"project": self.project,
234 "branch": self.branch,
235 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
236 "number": str(self.number),
237 "subject": self.subject,
238 "owner": {"name": "User Name"},
239 "url": "https://hostname/3"},
240 "abandoner": {"name": "User Name"},
241 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700242 "reason": ""}
243 return event
244
245 def getChangeCommentEvent(self, patchset):
246 event = {"type": "comment-added",
247 "change": {"project": self.project,
248 "branch": self.branch,
249 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
250 "number": str(self.number),
251 "subject": self.subject,
252 "owner": {"name": "User Name"},
253 "url": "https://hostname/3"},
254 "patchSet": self.patchsets[patchset - 1],
255 "author": {"name": "User Name"},
256 "approvals": [{"type": "Code-Review",
257 "description": "Code-Review",
258 "value": "0"}],
259 "comment": "This is a comment"}
260 return event
261
Joshua Hesketh642824b2014-07-01 17:54:59 +1000262 def addApproval(self, category, value, username='reviewer_john',
263 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700264 if not granted_on:
265 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000266 approval = {
267 'description': self.categories[category][0],
268 'type': category,
269 'value': str(value),
270 'by': {
271 'username': username,
272 'email': username + '@example.com',
273 },
274 'grantedOn': int(granted_on)
275 }
Clark Boylanb640e052014-04-03 16:41:46 -0700276 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
277 if x['by']['username'] == username and x['type'] == category:
278 del self.patchsets[-1]['approvals'][i]
279 self.patchsets[-1]['approvals'].append(approval)
280 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000281 'author': {'email': 'author@example.com',
282 'name': 'Patchset Author',
283 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700284 'change': {'branch': self.branch,
285 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
286 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000287 'owner': {'email': 'owner@example.com',
288 'name': 'Change Owner',
289 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700290 'project': self.project,
291 'subject': self.subject,
292 'topic': 'master',
293 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000294 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700295 'patchSet': self.patchsets[-1],
296 'type': 'comment-added'}
297 self.data['submitRecords'] = self.getSubmitRecords()
298 return json.loads(json.dumps(event))
299
300 def getSubmitRecords(self):
301 status = {}
302 for cat in self.categories.keys():
303 status[cat] = 0
304
305 for a in self.patchsets[-1]['approvals']:
306 cur = status[a['type']]
307 cat_min, cat_max = self.categories[a['type']][1:]
308 new = int(a['value'])
309 if new == cat_min:
310 cur = new
311 elif abs(new) > abs(cur):
312 cur = new
313 status[a['type']] = cur
314
315 labels = []
316 ok = True
317 for typ, cat in self.categories.items():
318 cur = status[typ]
319 cat_min, cat_max = cat[1:]
320 if cur == cat_min:
321 value = 'REJECT'
322 ok = False
323 elif cur == cat_max:
324 value = 'OK'
325 else:
326 value = 'NEED'
327 ok = False
328 labels.append({'label': cat[0], 'status': value})
329 if ok:
330 return [{'status': 'OK'}]
331 return [{'status': 'NOT_READY',
332 'labels': labels}]
333
334 def setDependsOn(self, other, patchset):
335 self.depends_on_change = other
336 d = {'id': other.data['id'],
337 'number': other.data['number'],
338 'ref': other.patchsets[patchset - 1]['ref']
339 }
340 self.data['dependsOn'] = [d]
341
342 other.needed_by_changes.append(self)
343 needed = other.data.get('neededBy', [])
344 d = {'id': self.data['id'],
345 'number': self.data['number'],
346 'ref': self.patchsets[patchset - 1]['ref'],
347 'revision': self.patchsets[patchset - 1]['revision']
348 }
349 needed.append(d)
350 other.data['neededBy'] = needed
351
352 def query(self):
353 self.queried += 1
354 d = self.data.get('dependsOn')
355 if d:
356 d = d[0]
357 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
358 d['isCurrentPatchSet'] = True
359 else:
360 d['isCurrentPatchSet'] = False
361 return json.loads(json.dumps(self.data))
362
363 def setMerged(self):
364 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000365 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700366 return
367 if self.fail_merge:
368 return
369 self.data['status'] = 'MERGED'
370 self.open = False
371
372 path = os.path.join(self.upstream_root, self.project)
373 repo = git.Repo(path)
374 repo.heads[self.branch].commit = \
375 repo.commit(self.patchsets[-1]['revision'])
376
377 def setReported(self):
378 self.reported += 1
379
380
381class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700382 log = logging.getLogger("zuul.test.FakeGerrit")
383
Joshua Hesketh642824b2014-07-01 17:54:59 +1000384 def __init__(self, hostname, username, port=29418, keyfile=None,
385 changes_dbs={}):
386 self.hostname = hostname
387 self.username = username
388 self.port = port
389 self.keyfile = keyfile
Clark Boylanb640e052014-04-03 16:41:46 -0700390 self.event_queue = Queue.Queue()
391 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
392 self.change_number = 0
Joshua Hesketh642824b2014-07-01 17:54:59 +1000393 self.changes = changes_dbs.get(hostname, {})
James E. Blairf8ff9932014-08-15 15:24:24 -0700394 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700395
396 def addFakeChange(self, project, branch, subject, status='NEW'):
397 self.change_number += 1
398 c = FakeChange(self, self.change_number, project, branch, subject,
399 upstream_root=self.upstream_root,
400 status=status)
401 self.changes[self.change_number] = c
402 return c
403
404 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700405 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700406
407 def getEvent(self):
408 return self.event_queue.get()
409
410 def eventDone(self):
411 self.event_queue.task_done()
412
413 def review(self, project, changeid, message, action):
414 number, ps = changeid.split(',')
415 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000416
417 # Add the approval back onto the change (ie simulate what gerrit would
418 # do).
419 # Usually when zuul leaves a review it'll create a feedback loop where
420 # zuul's review enters another gerrit event (which is then picked up by
421 # zuul). However, we can't mimic this behaviour (by adding this
422 # approval event into the queue) as it stops jobs from checking what
423 # happens before this event is triggered. If a job needs to see what
424 # happens they can add their own verified event into the queue.
425 # Nevertheless, we can update change with the new review in gerrit.
426
427 for cat in ['CRVW', 'VRFY', 'APRV']:
428 if cat in action:
429 change.addApproval(cat, action[cat], username=self.username)
430
431 if 'label' in action:
432 parts = action['label'].split('=')
433 change.addApproval(parts[0], parts[2], username=self.username)
434
Clark Boylanb640e052014-04-03 16:41:46 -0700435 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000436
Clark Boylanb640e052014-04-03 16:41:46 -0700437 if 'submit' in action:
438 change.setMerged()
439 if message:
440 change.setReported()
441
442 def query(self, number):
443 change = self.changes.get(int(number))
444 if change:
445 return change.query()
446 return {}
447
James E. Blairc494d542014-08-06 09:23:52 -0700448 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700449 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700450 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800451 if query.startswith('change:'):
452 # Query a specific changeid
453 changeid = query[len('change:'):]
454 l = [change.query() for change in self.changes.values()
455 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700456 elif query.startswith('message:'):
457 # Query the content of a commit message
458 msg = query[len('message:'):].strip()
459 l = [change.query() for change in self.changes.values()
460 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800461 else:
462 # Query all open changes
463 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700464 return l
James E. Blairc494d542014-08-06 09:23:52 -0700465
Clark Boylanb640e052014-04-03 16:41:46 -0700466 def startWatching(self, *args, **kw):
467 pass
468
469
470class BuildHistory(object):
471 def __init__(self, **kw):
472 self.__dict__.update(kw)
473
474 def __repr__(self):
475 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
476 (self.result, self.name, self.number, self.changes))
477
478
479class FakeURLOpener(object):
480 def __init__(self, upstream_root, fake_gerrit, url):
481 self.upstream_root = upstream_root
482 self.fake_gerrit = fake_gerrit
483 self.url = url
484
485 def read(self):
486 res = urlparse.urlparse(self.url)
487 path = res.path
488 project = '/'.join(path.split('/')[2:-2])
489 ret = '001e# service=git-upload-pack\n'
490 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
491 'multi_ack thin-pack side-band side-band-64k ofs-delta '
492 'shallow no-progress include-tag multi_ack_detailed no-done\n')
493 path = os.path.join(self.upstream_root, project)
494 repo = git.Repo(path)
495 for ref in repo.refs:
496 r = ref.object.hexsha + ' ' + ref.path + '\n'
497 ret += '%04x%s' % (len(r) + 4, r)
498 ret += '0000'
499 return ret
500
501
502class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
503 name = 'gerrit'
504
505 def __init__(self, upstream_root, *args):
506 super(FakeGerritTrigger, self).__init__(*args)
507 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700508 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700509
510 def getGitUrl(self, project):
511 return os.path.join(self.upstream_root, project.name)
512
513
514class FakeStatsd(threading.Thread):
515 def __init__(self):
516 threading.Thread.__init__(self)
517 self.daemon = True
518 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
519 self.sock.bind(('', 0))
520 self.port = self.sock.getsockname()[1]
521 self.wake_read, self.wake_write = os.pipe()
522 self.stats = []
523
524 def run(self):
525 while True:
526 poll = select.poll()
527 poll.register(self.sock, select.POLLIN)
528 poll.register(self.wake_read, select.POLLIN)
529 ret = poll.poll()
530 for (fd, event) in ret:
531 if fd == self.sock.fileno():
532 data = self.sock.recvfrom(1024)
533 if not data:
534 return
535 self.stats.append(data[0])
536 if fd == self.wake_read:
537 return
538
539 def stop(self):
540 os.write(self.wake_write, '1\n')
541
542
543class FakeBuild(threading.Thread):
544 log = logging.getLogger("zuul.test")
545
546 def __init__(self, worker, job, number, node):
547 threading.Thread.__init__(self)
548 self.daemon = True
549 self.worker = worker
550 self.job = job
551 self.name = job.name.split(':')[1]
552 self.number = number
553 self.node = node
554 self.parameters = json.loads(job.arguments)
555 self.unique = self.parameters['ZUUL_UUID']
556 self.wait_condition = threading.Condition()
557 self.waiting = False
558 self.aborted = False
559 self.created = time.time()
560 self.description = ''
561 self.run_error = False
562
563 def release(self):
564 self.wait_condition.acquire()
565 self.wait_condition.notify()
566 self.waiting = False
567 self.log.debug("Build %s released" % self.unique)
568 self.wait_condition.release()
569
570 def isWaiting(self):
571 self.wait_condition.acquire()
572 if self.waiting:
573 ret = True
574 else:
575 ret = False
576 self.wait_condition.release()
577 return ret
578
579 def _wait(self):
580 self.wait_condition.acquire()
581 self.waiting = True
582 self.log.debug("Build %s waiting" % self.unique)
583 self.wait_condition.wait()
584 self.wait_condition.release()
585
586 def run(self):
587 data = {
588 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
589 'name': self.name,
590 'number': self.number,
591 'manager': self.worker.worker_id,
592 'worker_name': 'My Worker',
593 'worker_hostname': 'localhost',
594 'worker_ips': ['127.0.0.1', '192.168.1.1'],
595 'worker_fqdn': 'zuul.example.org',
596 'worker_program': 'FakeBuilder',
597 'worker_version': 'v1.1',
598 'worker_extra': {'something': 'else'}
599 }
600
601 self.log.debug('Running build %s' % self.unique)
602
603 self.job.sendWorkData(json.dumps(data))
604 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
605 self.job.sendWorkStatus(0, 100)
606
607 if self.worker.hold_jobs_in_build:
608 self.log.debug('Holding build %s' % self.unique)
609 self._wait()
610 self.log.debug("Build %s continuing" % self.unique)
611
612 self.worker.lock.acquire()
613
614 result = 'SUCCESS'
615 if (('ZUUL_REF' in self.parameters) and
616 self.worker.shouldFailTest(self.name,
617 self.parameters['ZUUL_REF'])):
618 result = 'FAILURE'
619 if self.aborted:
620 result = 'ABORTED'
621
622 if self.run_error:
623 work_fail = True
624 result = 'RUN_ERROR'
625 else:
626 data['result'] = result
627 work_fail = False
628
629 changes = None
630 if 'ZUUL_CHANGE_IDS' in self.parameters:
631 changes = self.parameters['ZUUL_CHANGE_IDS']
632
633 self.worker.build_history.append(
634 BuildHistory(name=self.name, number=self.number,
635 result=result, changes=changes, node=self.node,
636 uuid=self.unique, description=self.description,
637 pipeline=self.parameters['ZUUL_PIPELINE'])
638 )
639
640 self.job.sendWorkData(json.dumps(data))
641 if work_fail:
642 self.job.sendWorkFail()
643 else:
644 self.job.sendWorkComplete(json.dumps(data))
645 del self.worker.gearman_jobs[self.job.unique]
646 self.worker.running_builds.remove(self)
647 self.worker.lock.release()
648
649
650class FakeWorker(gear.Worker):
651 def __init__(self, worker_id, test):
652 super(FakeWorker, self).__init__(worker_id)
653 self.gearman_jobs = {}
654 self.build_history = []
655 self.running_builds = []
656 self.build_counter = 0
657 self.fail_tests = {}
658 self.test = test
659
660 self.hold_jobs_in_build = False
661 self.lock = threading.Lock()
662 self.__work_thread = threading.Thread(target=self.work)
663 self.__work_thread.daemon = True
664 self.__work_thread.start()
665
666 def handleJob(self, job):
667 parts = job.name.split(":")
668 cmd = parts[0]
669 name = parts[1]
670 if len(parts) > 2:
671 node = parts[2]
672 else:
673 node = None
674 if cmd == 'build':
675 self.handleBuild(job, name, node)
676 elif cmd == 'stop':
677 self.handleStop(job, name)
678 elif cmd == 'set_description':
679 self.handleSetDescription(job, name)
680
681 def handleBuild(self, job, name, node):
682 build = FakeBuild(self, job, self.build_counter, node)
683 job.build = build
684 self.gearman_jobs[job.unique] = job
685 self.build_counter += 1
686
687 self.running_builds.append(build)
688 build.start()
689
690 def handleStop(self, job, name):
691 self.log.debug("handle stop")
692 parameters = json.loads(job.arguments)
693 name = parameters['name']
694 number = parameters['number']
695 for build in self.running_builds:
696 if build.name == name and build.number == number:
697 build.aborted = True
698 build.release()
699 job.sendWorkComplete()
700 return
701 job.sendWorkFail()
702
703 def handleSetDescription(self, job, name):
704 self.log.debug("handle set description")
705 parameters = json.loads(job.arguments)
706 name = parameters['name']
707 number = parameters['number']
708 descr = parameters['html_description']
709 for build in self.running_builds:
710 if build.name == name and build.number == number:
711 build.description = descr
712 job.sendWorkComplete()
713 return
714 for build in self.build_history:
715 if build.name == name and build.number == number:
716 build.description = descr
717 job.sendWorkComplete()
718 return
719 job.sendWorkFail()
720
721 def work(self):
722 while self.running:
723 try:
724 job = self.getJob()
725 except gear.InterruptedError:
726 continue
727 try:
728 self.handleJob(job)
729 except:
730 self.log.exception("Worker exception:")
731
732 def addFailTest(self, name, change):
733 l = self.fail_tests.get(name, [])
734 l.append(change)
735 self.fail_tests[name] = l
736
737 def shouldFailTest(self, name, ref):
738 l = self.fail_tests.get(name, [])
739 for change in l:
740 if self.test.ref_has_change(ref, change):
741 return True
742 return False
743
744 def release(self, regex=None):
745 builds = self.running_builds[:]
746 self.log.debug("releasing build %s (%s)" % (regex,
747 len(self.running_builds)))
748 for build in builds:
749 if not regex or re.match(regex, build.name):
750 self.log.debug("releasing build %s" %
751 (build.parameters['ZUUL_UUID']))
752 build.release()
753 else:
754 self.log.debug("not releasing build %s" %
755 (build.parameters['ZUUL_UUID']))
756 self.log.debug("done releasing builds %s (%s)" %
757 (regex, len(self.running_builds)))
758
759
760class FakeGearmanServer(gear.Server):
761 def __init__(self):
762 self.hold_jobs_in_queue = False
763 super(FakeGearmanServer, self).__init__(0)
764
765 def getJobForConnection(self, connection, peek=False):
766 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
767 for job in queue:
768 if not hasattr(job, 'waiting'):
769 if job.name.startswith('build:'):
770 job.waiting = self.hold_jobs_in_queue
771 else:
772 job.waiting = False
773 if job.waiting:
774 continue
775 if job.name in connection.functions:
776 if not peek:
777 queue.remove(job)
778 connection.related_jobs[job.handle] = job
779 job.worker_connection = connection
780 job.running = True
781 return job
782 return None
783
784 def release(self, regex=None):
785 released = False
786 qlen = (len(self.high_queue) + len(self.normal_queue) +
787 len(self.low_queue))
788 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
789 for job in self.getQueue():
790 cmd, name = job.name.split(':')
791 if cmd != 'build':
792 continue
793 if not regex or re.match(regex, name):
794 self.log.debug("releasing queued job %s" %
795 job.unique)
796 job.waiting = False
797 released = True
798 else:
799 self.log.debug("not releasing queued job %s" %
800 job.unique)
801 if released:
802 self.wakeConnections()
803 qlen = (len(self.high_queue) + len(self.normal_queue) +
804 len(self.low_queue))
805 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
806
807
808class FakeSMTP(object):
809 log = logging.getLogger('zuul.FakeSMTP')
810
811 def __init__(self, messages, server, port):
812 self.server = server
813 self.port = port
814 self.messages = messages
815
816 def sendmail(self, from_email, to_email, msg):
817 self.log.info("Sending email from %s, to %s, with msg %s" % (
818 from_email, to_email, msg))
819
820 headers = msg.split('\n\n', 1)[0]
821 body = msg.split('\n\n', 1)[1]
822
823 self.messages.append(dict(
824 from_email=from_email,
825 to_email=to_email,
826 msg=msg,
827 headers=headers,
828 body=body,
829 ))
830
831 return True
832
833 def quit(self):
834 return True
835
836
837class FakeSwiftClientConnection(swiftclient.client.Connection):
838 def post_account(self, headers):
839 # Do nothing
840 pass
841
842 def get_auth(self):
843 # Returns endpoint and (unused) auth token
844 endpoint = os.path.join('https://storage.example.org', 'V1',
845 'AUTH_account')
846 return endpoint, ''
847
848
Maru Newby3fe5f852015-01-13 04:22:14 +0000849class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700850 log = logging.getLogger("zuul.test")
851
852 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000853 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700854 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
855 try:
856 test_timeout = int(test_timeout)
857 except ValueError:
858 # If timeout value is invalid do not set a timeout.
859 test_timeout = 0
860 if test_timeout > 0:
861 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
862
863 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
864 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
865 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
866 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
867 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
868 os.environ.get('OS_STDERR_CAPTURE') == '1'):
869 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
870 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
871 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
872 os.environ.get('OS_LOG_CAPTURE') == '1'):
873 self.useFixture(fixtures.FakeLogger(
874 level=logging.DEBUG,
875 format='%(asctime)s %(name)-32s '
876 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000877
878
879class ZuulTestCase(BaseTestCase):
880
881 def setUp(self):
882 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700883 if USE_TEMPDIR:
884 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000885 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
886 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700887 else:
888 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700889 self.test_root = os.path.join(tmp_root, "zuul-test")
890 self.upstream_root = os.path.join(self.test_root, "upstream")
891 self.git_root = os.path.join(self.test_root, "git")
892
893 if os.path.exists(self.test_root):
894 shutil.rmtree(self.test_root)
895 os.makedirs(self.test_root)
896 os.makedirs(self.upstream_root)
897 os.makedirs(self.git_root)
898
899 # Make per test copy of Configuration.
900 self.setup_config()
901 self.config.set('zuul', 'layout_config',
902 os.path.join(FIXTURE_DIR, "layout.yaml"))
903 self.config.set('merger', 'git_dir', self.git_root)
904
905 # For each project in config:
906 self.init_repo("org/project")
907 self.init_repo("org/project1")
908 self.init_repo("org/project2")
909 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700910 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700911 self.init_repo("org/project5")
912 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700913 self.init_repo("org/one-job-project")
914 self.init_repo("org/nonvoting-project")
915 self.init_repo("org/templated-project")
916 self.init_repo("org/layered-project")
917 self.init_repo("org/node-project")
918 self.init_repo("org/conflict-project")
919 self.init_repo("org/noop-project")
920 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000921 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700922
923 self.statsd = FakeStatsd()
924 os.environ['STATSD_HOST'] = 'localhost'
925 os.environ['STATSD_PORT'] = str(self.statsd.port)
926 self.statsd.start()
927 # the statsd client object is configured in the statsd module import
928 reload(statsd)
929 reload(zuul.scheduler)
930
931 self.gearman_server = FakeGearmanServer()
932
933 self.config.set('gearman', 'port', str(self.gearman_server.port))
934
935 self.worker = FakeWorker('fake_worker', self)
936 self.worker.addServer('127.0.0.1', self.gearman_server.port)
937 self.gearman_server.worker = self.worker
938
939 self.merge_server = zuul.merger.server.MergeServer(self.config)
940 self.merge_server.start()
941
942 self.sched = zuul.scheduler.Scheduler()
943
944 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
945 FakeSwiftClientConnection))
946 self.swift = zuul.lib.swift.Swift(self.config)
947
948 def URLOpenerFactory(*args, **kw):
949 if isinstance(args[0], urllib2.Request):
950 return old_urlopen(*args, **kw)
951 args = [self.fake_gerrit] + list(args)
952 return FakeURLOpener(self.upstream_root, *args, **kw)
953
954 old_urlopen = urllib2.urlopen
955 urllib2.urlopen = URLOpenerFactory
956
957 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
958 self.swift)
959 self.merge_client = zuul.merger.client.MergeClient(
960 self.config, self.sched)
961
962 self.smtp_messages = []
963
964 def FakeSMTPFactory(*args, **kw):
965 args = [self.smtp_messages] + list(args)
966 return FakeSMTP(*args, **kw)
967
Joshua Hesketh642824b2014-07-01 17:54:59 +1000968 # Set a changes database so multiple FakeGerrit's can report back to
969 # a virtual canonical database given by the configured hostname
970 self.gerrit_changes_dbs = {
971 self.config.get('gerrit', 'server'): {}
972 }
973
974 def FakeGerritFactory(*args, **kw):
975 kw['changes_dbs'] = self.gerrit_changes_dbs
976 return FakeGerrit(*args, **kw)
977
978 self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
979 FakeGerritFactory))
980
Clark Boylanb640e052014-04-03 16:41:46 -0700981 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
982
983 self.gerrit = FakeGerritTrigger(
984 self.upstream_root, self.config, self.sched)
985 self.gerrit.replication_timeout = 1.5
986 self.gerrit.replication_retry_interval = 0.5
987 self.fake_gerrit = self.gerrit.gerrit
988 self.fake_gerrit.upstream_root = self.upstream_root
989
990 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
991 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
992
993 self.sched.setLauncher(self.launcher)
994 self.sched.setMerger(self.merge_client)
995 self.sched.registerTrigger(self.gerrit)
996 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
997 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000998 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
999 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -07001000 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -07001001
1002 self.sched.registerReporter(
1003 zuul.reporter.gerrit.Reporter(self.gerrit))
1004 self.smtp_reporter = zuul.reporter.smtp.Reporter(
1005 self.config.get('smtp', 'default_from'),
1006 self.config.get('smtp', 'default_to'),
1007 self.config.get('smtp', 'server'))
1008 self.sched.registerReporter(self.smtp_reporter)
1009
1010 self.sched.start()
1011 self.sched.reconfigure(self.config)
1012 self.sched.resume()
1013 self.webapp.start()
1014 self.rpc.start()
1015 self.launcher.gearman.waitForServer()
1016 self.registerJobs()
1017 self.builds = self.worker.running_builds
1018 self.history = self.worker.build_history
1019
1020 self.addCleanup(self.assertFinalState)
1021 self.addCleanup(self.shutdown)
1022
1023 def setup_config(self):
1024 """Per test config object. Override to set different config."""
1025 self.config = ConfigParser.ConfigParser()
1026 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
1027
1028 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001029 # Make sure that git.Repo objects have been garbage collected.
1030 repos = []
1031 gc.collect()
1032 for obj in gc.get_objects():
1033 if isinstance(obj, git.Repo):
1034 repos.append(obj)
1035 self.assertEqual(len(repos), 0)
1036 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001037 for pipeline in self.sched.layout.pipelines.values():
1038 if isinstance(pipeline.manager,
1039 zuul.scheduler.IndependentPipelineManager):
1040 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001041
1042 def shutdown(self):
1043 self.log.debug("Shutting down after tests")
1044 self.launcher.stop()
1045 self.merge_server.stop()
1046 self.merge_server.join()
1047 self.merge_client.stop()
1048 self.worker.shutdown()
1049 self.gerrit.stop()
1050 self.timer.stop()
1051 self.sched.stop()
1052 self.sched.join()
1053 self.statsd.stop()
1054 self.statsd.join()
1055 self.webapp.stop()
1056 self.webapp.join()
1057 self.rpc.stop()
1058 self.rpc.join()
1059 self.gearman_server.shutdown()
1060 threads = threading.enumerate()
1061 if len(threads) > 1:
1062 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001063
1064 def init_repo(self, project):
1065 parts = project.split('/')
1066 path = os.path.join(self.upstream_root, *parts[:-1])
1067 if not os.path.exists(path):
1068 os.makedirs(path)
1069 path = os.path.join(self.upstream_root, project)
1070 repo = git.Repo.init(path)
1071
1072 repo.config_writer().set_value('user', 'email', 'user@example.com')
1073 repo.config_writer().set_value('user', 'name', 'User Name')
1074 repo.config_writer().write()
1075
1076 fn = os.path.join(path, 'README')
1077 f = open(fn, 'w')
1078 f.write("test\n")
1079 f.close()
1080 repo.index.add([fn])
1081 repo.index.commit('initial commit')
1082 master = repo.create_head('master')
1083 repo.create_tag('init')
1084
James E. Blair97d902e2014-08-21 13:25:56 -07001085 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001086 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001087 repo.git.clean('-x', '-f', '-d')
1088
1089 self.create_branch(project, 'mp')
1090
1091 def create_branch(self, project, branch):
1092 path = os.path.join(self.upstream_root, project)
1093 repo = git.Repo.init(path)
1094 fn = os.path.join(path, 'README')
1095
1096 branch_head = repo.create_head(branch)
1097 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001098 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001099 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001100 f.close()
1101 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001102 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001103
James E. Blair97d902e2014-08-21 13:25:56 -07001104 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001105 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001106 repo.git.clean('-x', '-f', '-d')
1107
1108 def ref_has_change(self, ref, change):
1109 path = os.path.join(self.git_root, change.project)
1110 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001111 try:
1112 for commit in repo.iter_commits(ref):
1113 if commit.message.strip() == ('%s-1' % change.subject):
1114 return True
1115 except GitCommandError:
1116 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001117 return False
1118
1119 def job_has_changes(self, *args):
1120 job = args[0]
1121 commits = args[1:]
1122 if isinstance(job, FakeBuild):
1123 parameters = job.parameters
1124 else:
1125 parameters = json.loads(job.arguments)
1126 project = parameters['ZUUL_PROJECT']
1127 path = os.path.join(self.git_root, project)
1128 repo = git.Repo(path)
1129 ref = parameters['ZUUL_REF']
1130 sha = parameters['ZUUL_COMMIT']
1131 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1132 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1133 commit_messages = ['%s-1' % commit.subject for commit in commits]
1134 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1135 " repo_messages %s; sha %s" % (job, commit_messages,
1136 repo_messages, sha))
1137 for msg in commit_messages:
1138 if msg not in repo_messages:
1139 self.log.debug(" messages do not match")
1140 return False
1141 if repo_shas[0] != sha:
1142 self.log.debug(" sha does not match")
1143 return False
1144 self.log.debug(" OK")
1145 return True
1146
1147 def registerJobs(self):
1148 count = 0
1149 for job in self.sched.layout.jobs.keys():
1150 self.worker.registerFunction('build:' + job)
1151 count += 1
1152 self.worker.registerFunction('stop:' + self.worker.worker_id)
1153 count += 1
1154
1155 while len(self.gearman_server.functions) < count:
1156 time.sleep(0)
1157
James E. Blairb8c16472015-05-05 14:55:26 -07001158 def orderedRelease(self):
1159 # Run one build at a time to ensure non-race order:
1160 while len(self.builds):
1161 self.release(self.builds[0])
1162 self.waitUntilSettled()
1163
Clark Boylanb640e052014-04-03 16:41:46 -07001164 def release(self, job):
1165 if isinstance(job, FakeBuild):
1166 job.release()
1167 else:
1168 job.waiting = False
1169 self.log.debug("Queued job %s released" % job.unique)
1170 self.gearman_server.wakeConnections()
1171
1172 def getParameter(self, job, name):
1173 if isinstance(job, FakeBuild):
1174 return job.parameters[name]
1175 else:
1176 parameters = json.loads(job.arguments)
1177 return parameters[name]
1178
1179 def resetGearmanServer(self):
1180 self.worker.setFunctions([])
1181 while True:
1182 done = True
1183 for connection in self.gearman_server.active_connections:
1184 if (connection.functions and
1185 connection.client_id not in ['Zuul RPC Listener',
1186 'Zuul Merger']):
1187 done = False
1188 if done:
1189 break
1190 time.sleep(0)
1191 self.gearman_server.functions = set()
1192 self.rpc.register()
1193 self.merge_server.register()
1194
1195 def haveAllBuildsReported(self):
1196 # See if Zuul is waiting on a meta job to complete
1197 if self.launcher.meta_jobs:
1198 return False
1199 # Find out if every build that the worker has completed has been
1200 # reported back to Zuul. If it hasn't then that means a Gearman
1201 # event is still in transit and the system is not stable.
1202 for build in self.worker.build_history:
1203 zbuild = self.launcher.builds.get(build.uuid)
1204 if not zbuild:
1205 # It has already been reported
1206 continue
1207 # It hasn't been reported yet.
1208 return False
1209 # Make sure that none of the worker connections are in GRAB_WAIT
1210 for connection in self.worker.active_connections:
1211 if connection.state == 'GRAB_WAIT':
1212 return False
1213 return True
1214
1215 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001216 builds = self.launcher.builds.values()
1217 for build in builds:
1218 client_job = None
1219 for conn in self.launcher.gearman.active_connections:
1220 for j in conn.related_jobs.values():
1221 if j.unique == build.uuid:
1222 client_job = j
1223 break
1224 if not client_job:
1225 self.log.debug("%s is not known to the gearman client" %
1226 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001227 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001228 if not client_job.handle:
1229 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001230 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001231 server_job = self.gearman_server.jobs.get(client_job.handle)
1232 if not server_job:
1233 self.log.debug("%s is not known to the gearman server" %
1234 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001235 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001236 if not hasattr(server_job, 'waiting'):
1237 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001238 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001239 if server_job.waiting:
1240 continue
1241 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1242 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001243 if build.number is None:
1244 self.log.debug("%s has not reported start" % worker_job)
1245 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001246 if worker_job.build.isWaiting():
1247 continue
1248 else:
1249 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001250 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001251 else:
1252 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001253 return False
1254 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001255
1256 def waitUntilSettled(self):
1257 self.log.debug("Waiting until settled...")
1258 start = time.time()
1259 while True:
1260 if time.time() - start > 10:
1261 print 'queue status:',
1262 print self.sched.trigger_event_queue.empty(),
1263 print self.sched.result_event_queue.empty(),
1264 print self.fake_gerrit.event_queue.empty(),
1265 print self.areAllBuildsWaiting()
1266 raise Exception("Timeout waiting for Zuul to settle")
1267 # Make sure no new events show up while we're checking
1268 self.worker.lock.acquire()
1269 # have all build states propogated to zuul?
1270 if self.haveAllBuildsReported():
1271 # Join ensures that the queue is empty _and_ events have been
1272 # processed
1273 self.fake_gerrit.event_queue.join()
1274 self.sched.trigger_event_queue.join()
1275 self.sched.result_event_queue.join()
1276 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001277 if (not self.merge_client.build_sets and
1278 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001279 self.sched.result_event_queue.empty() and
1280 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001281 self.haveAllBuildsReported() and
1282 self.areAllBuildsWaiting()):
1283 self.sched.run_handler_lock.release()
1284 self.worker.lock.release()
1285 self.log.debug("...settled.")
1286 return
1287 self.sched.run_handler_lock.release()
1288 self.worker.lock.release()
1289 self.sched.wake_event.wait(0.1)
1290
1291 def countJobResults(self, jobs, result):
1292 jobs = filter(lambda x: x.result == result, jobs)
1293 return len(jobs)
1294
1295 def getJobFromHistory(self, name):
1296 history = self.worker.build_history
1297 for job in history:
1298 if job.name == name:
1299 return job
1300 raise Exception("Unable to find job %s in history" % name)
1301
1302 def assertEmptyQueues(self):
1303 # Make sure there are no orphaned jobs
1304 for pipeline in self.sched.layout.pipelines.values():
1305 for queue in pipeline.queues:
1306 if len(queue.queue) != 0:
1307 print 'pipeline %s queue %s contents %s' % (
1308 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001309 self.assertEqual(len(queue.queue), 0,
1310 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001311
1312 def assertReportedStat(self, key, value=None, kind=None):
1313 start = time.time()
1314 while time.time() < (start + 5):
1315 for stat in self.statsd.stats:
1316 pprint.pprint(self.statsd.stats)
1317 k, v = stat.split(':')
1318 if key == k:
1319 if value is None and kind is None:
1320 return
1321 elif value:
1322 if value == v:
1323 return
1324 elif kind:
1325 if v.endswith('|' + kind):
1326 return
1327 time.sleep(0.1)
1328
1329 pprint.pprint(self.statsd.stats)
1330 raise Exception("Key %s not found in reported stats" % key)