blob: abbdb0ac9b8b1b5d3d7525d621aa99931d95c9a1 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070050import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070051import zuul.merger.merger
52import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070053import zuul.reporter.gerrit
54import zuul.reporter.smtp
55import zuul.trigger.gerrit
56import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070057import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070058
59FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
60 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070061USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070062
63logging.basicConfig(level=logging.DEBUG,
64 format='%(asctime)s %(name)-32s '
65 '%(levelname)-8s %(message)s')
66
67
68def repack_repo(path):
69 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
70 output = subprocess.Popen(cmd, close_fds=True,
71 stdout=subprocess.PIPE,
72 stderr=subprocess.PIPE)
73 out = output.communicate()
74 if output.returncode:
75 raise Exception("git repack returned %d" % output.returncode)
76 return out
77
78
79def random_sha1():
80 return hashlib.sha1(str(random.random())).hexdigest()
81
82
James E. Blaira190f3b2015-01-05 14:56:54 -080083def iterate_timeout(max_seconds, purpose):
84 start = time.time()
85 count = 0
86 while (time.time() < start + max_seconds):
87 count += 1
88 yield count
89 time.sleep(0)
90 raise Exception("Timeout waiting for %s" % purpose)
91
92
Clark Boylanb640e052014-04-03 16:41:46 -070093class ChangeReference(git.Reference):
94 _common_path_default = "refs/changes"
95 _points_to_commits_only = True
96
97
98class FakeChange(object):
99 categories = {'APRV': ('Approved', -1, 1),
100 'CRVW': ('Code-Review', -2, 2),
101 'VRFY': ('Verified', -2, 2)}
102
103 def __init__(self, gerrit, number, project, branch, subject,
104 status='NEW', upstream_root=None):
105 self.gerrit = gerrit
106 self.reported = 0
107 self.queried = 0
108 self.patchsets = []
109 self.number = number
110 self.project = project
111 self.branch = branch
112 self.subject = subject
113 self.latest_patchset = 0
114 self.depends_on_change = None
115 self.needed_by_changes = []
116 self.fail_merge = False
117 self.messages = []
118 self.data = {
119 'branch': branch,
120 'comments': [],
121 'commitMessage': subject,
122 'createdOn': time.time(),
123 'id': 'I' + random_sha1(),
124 'lastUpdated': time.time(),
125 'number': str(number),
126 'open': status == 'NEW',
127 'owner': {'email': 'user@example.com',
128 'name': 'User Name',
129 'username': 'username'},
130 'patchSets': self.patchsets,
131 'project': project,
132 'status': status,
133 'subject': subject,
134 'submitRecords': [],
135 'url': 'https://hostname/%s' % number}
136
137 self.upstream_root = upstream_root
138 self.addPatchset()
139 self.data['submitRecords'] = self.getSubmitRecords()
140 self.open = status == 'NEW'
141
142 def add_fake_change_to_repo(self, msg, fn, large):
143 path = os.path.join(self.upstream_root, self.project)
144 repo = git.Repo(path)
145 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
146 self.latest_patchset),
147 'refs/tags/init')
148 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700149 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700150 repo.git.clean('-x', '-f', '-d')
151
152 path = os.path.join(self.upstream_root, self.project)
153 if not large:
154 fn = os.path.join(path, fn)
155 f = open(fn, 'w')
156 f.write("test %s %s %s\n" %
157 (self.branch, self.number, self.latest_patchset))
158 f.close()
159 repo.index.add([fn])
160 else:
161 for fni in range(100):
162 fn = os.path.join(path, str(fni))
163 f = open(fn, 'w')
164 for ci in range(4096):
165 f.write(random.choice(string.printable))
166 f.close()
167 repo.index.add([fn])
168
169 r = repo.index.commit(msg)
170 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700171 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700172 repo.git.clean('-x', '-f', '-d')
173 repo.heads['master'].checkout()
174 return r
175
176 def addPatchset(self, files=[], large=False):
177 self.latest_patchset += 1
178 if files:
179 fn = files[0]
180 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700181 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700182 msg = self.subject + '-' + str(self.latest_patchset)
183 c = self.add_fake_change_to_repo(msg, fn, large)
184 ps_files = [{'file': '/COMMIT_MSG',
185 'type': 'ADDED'},
186 {'file': 'README',
187 'type': 'MODIFIED'}]
188 for f in files:
189 ps_files.append({'file': f, 'type': 'ADDED'})
190 d = {'approvals': [],
191 'createdOn': time.time(),
192 'files': ps_files,
193 'number': str(self.latest_patchset),
194 'ref': 'refs/changes/1/%s/%s' % (self.number,
195 self.latest_patchset),
196 'revision': c.hexsha,
197 'uploader': {'email': 'user@example.com',
198 'name': 'User name',
199 'username': 'user'}}
200 self.data['currentPatchSet'] = d
201 self.patchsets.append(d)
202 self.data['submitRecords'] = self.getSubmitRecords()
203
204 def getPatchsetCreatedEvent(self, patchset):
205 event = {"type": "patchset-created",
206 "change": {"project": self.project,
207 "branch": self.branch,
208 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
209 "number": str(self.number),
210 "subject": self.subject,
211 "owner": {"name": "User Name"},
212 "url": "https://hostname/3"},
213 "patchSet": self.patchsets[patchset - 1],
214 "uploader": {"name": "User Name"}}
215 return event
216
217 def getChangeRestoredEvent(self):
218 event = {"type": "change-restored",
219 "change": {"project": self.project,
220 "branch": self.branch,
221 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
222 "number": str(self.number),
223 "subject": self.subject,
224 "owner": {"name": "User Name"},
225 "url": "https://hostname/3"},
226 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100227 "patchSet": self.patchsets[-1],
228 "reason": ""}
229 return event
230
231 def getChangeAbandonedEvent(self):
232 event = {"type": "change-abandoned",
233 "change": {"project": self.project,
234 "branch": self.branch,
235 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
236 "number": str(self.number),
237 "subject": self.subject,
238 "owner": {"name": "User Name"},
239 "url": "https://hostname/3"},
240 "abandoner": {"name": "User Name"},
241 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700242 "reason": ""}
243 return event
244
245 def getChangeCommentEvent(self, patchset):
246 event = {"type": "comment-added",
247 "change": {"project": self.project,
248 "branch": self.branch,
249 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
250 "number": str(self.number),
251 "subject": self.subject,
252 "owner": {"name": "User Name"},
253 "url": "https://hostname/3"},
254 "patchSet": self.patchsets[patchset - 1],
255 "author": {"name": "User Name"},
256 "approvals": [{"type": "Code-Review",
257 "description": "Code-Review",
258 "value": "0"}],
259 "comment": "This is a comment"}
260 return event
261
Joshua Hesketh642824b2014-07-01 17:54:59 +1000262 def addApproval(self, category, value, username='reviewer_john',
263 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700264 if not granted_on:
265 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000266 approval = {
267 'description': self.categories[category][0],
268 'type': category,
269 'value': str(value),
270 'by': {
271 'username': username,
272 'email': username + '@example.com',
273 },
274 'grantedOn': int(granted_on)
275 }
Clark Boylanb640e052014-04-03 16:41:46 -0700276 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
277 if x['by']['username'] == username and x['type'] == category:
278 del self.patchsets[-1]['approvals'][i]
279 self.patchsets[-1]['approvals'].append(approval)
280 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000281 'author': {'email': 'author@example.com',
282 'name': 'Patchset Author',
283 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700284 'change': {'branch': self.branch,
285 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
286 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000287 'owner': {'email': 'owner@example.com',
288 'name': 'Change Owner',
289 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700290 'project': self.project,
291 'subject': self.subject,
292 'topic': 'master',
293 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000294 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700295 'patchSet': self.patchsets[-1],
296 'type': 'comment-added'}
297 self.data['submitRecords'] = self.getSubmitRecords()
298 return json.loads(json.dumps(event))
299
300 def getSubmitRecords(self):
301 status = {}
302 for cat in self.categories.keys():
303 status[cat] = 0
304
305 for a in self.patchsets[-1]['approvals']:
306 cur = status[a['type']]
307 cat_min, cat_max = self.categories[a['type']][1:]
308 new = int(a['value'])
309 if new == cat_min:
310 cur = new
311 elif abs(new) > abs(cur):
312 cur = new
313 status[a['type']] = cur
314
315 labels = []
316 ok = True
317 for typ, cat in self.categories.items():
318 cur = status[typ]
319 cat_min, cat_max = cat[1:]
320 if cur == cat_min:
321 value = 'REJECT'
322 ok = False
323 elif cur == cat_max:
324 value = 'OK'
325 else:
326 value = 'NEED'
327 ok = False
328 labels.append({'label': cat[0], 'status': value})
329 if ok:
330 return [{'status': 'OK'}]
331 return [{'status': 'NOT_READY',
332 'labels': labels}]
333
334 def setDependsOn(self, other, patchset):
335 self.depends_on_change = other
336 d = {'id': other.data['id'],
337 'number': other.data['number'],
338 'ref': other.patchsets[patchset - 1]['ref']
339 }
340 self.data['dependsOn'] = [d]
341
342 other.needed_by_changes.append(self)
343 needed = other.data.get('neededBy', [])
344 d = {'id': self.data['id'],
345 'number': self.data['number'],
346 'ref': self.patchsets[patchset - 1]['ref'],
347 'revision': self.patchsets[patchset - 1]['revision']
348 }
349 needed.append(d)
350 other.data['neededBy'] = needed
351
352 def query(self):
353 self.queried += 1
354 d = self.data.get('dependsOn')
355 if d:
356 d = d[0]
357 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
358 d['isCurrentPatchSet'] = True
359 else:
360 d['isCurrentPatchSet'] = False
361 return json.loads(json.dumps(self.data))
362
363 def setMerged(self):
364 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000365 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700366 return
367 if self.fail_merge:
368 return
369 self.data['status'] = 'MERGED'
370 self.open = False
371
372 path = os.path.join(self.upstream_root, self.project)
373 repo = git.Repo(path)
374 repo.heads[self.branch].commit = \
375 repo.commit(self.patchsets[-1]['revision'])
376
377 def setReported(self):
378 self.reported += 1
379
380
381class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700382 log = logging.getLogger("zuul.test.FakeGerrit")
383
Joshua Hesketh642824b2014-07-01 17:54:59 +1000384 def __init__(self, hostname, username, port=29418, keyfile=None,
385 changes_dbs={}):
386 self.hostname = hostname
387 self.username = username
388 self.port = port
389 self.keyfile = keyfile
Clark Boylanb640e052014-04-03 16:41:46 -0700390 self.event_queue = Queue.Queue()
391 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
392 self.change_number = 0
Joshua Hesketh642824b2014-07-01 17:54:59 +1000393 self.changes = changes_dbs.get(hostname, {})
James E. Blairf8ff9932014-08-15 15:24:24 -0700394 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700395
396 def addFakeChange(self, project, branch, subject, status='NEW'):
397 self.change_number += 1
398 c = FakeChange(self, self.change_number, project, branch, subject,
399 upstream_root=self.upstream_root,
400 status=status)
401 self.changes[self.change_number] = c
402 return c
403
404 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700405 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700406
407 def getEvent(self):
408 return self.event_queue.get()
409
410 def eventDone(self):
411 self.event_queue.task_done()
412
413 def review(self, project, changeid, message, action):
414 number, ps = changeid.split(',')
415 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000416
417 # Add the approval back onto the change (ie simulate what gerrit would
418 # do).
419 # Usually when zuul leaves a review it'll create a feedback loop where
420 # zuul's review enters another gerrit event (which is then picked up by
421 # zuul). However, we can't mimic this behaviour (by adding this
422 # approval event into the queue) as it stops jobs from checking what
423 # happens before this event is triggered. If a job needs to see what
424 # happens they can add their own verified event into the queue.
425 # Nevertheless, we can update change with the new review in gerrit.
426
427 for cat in ['CRVW', 'VRFY', 'APRV']:
428 if cat in action:
429 change.addApproval(cat, action[cat], username=self.username)
430
431 if 'label' in action:
432 parts = action['label'].split('=')
433 change.addApproval(parts[0], parts[2], username=self.username)
434
Clark Boylanb640e052014-04-03 16:41:46 -0700435 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000436
Clark Boylanb640e052014-04-03 16:41:46 -0700437 if 'submit' in action:
438 change.setMerged()
439 if message:
440 change.setReported()
441
442 def query(self, number):
443 change = self.changes.get(int(number))
444 if change:
445 return change.query()
446 return {}
447
James E. Blairc494d542014-08-06 09:23:52 -0700448 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700449 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700450 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800451 if query.startswith('change:'):
452 # Query a specific changeid
453 changeid = query[len('change:'):]
454 l = [change.query() for change in self.changes.values()
455 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700456 elif query.startswith('message:'):
457 # Query the content of a commit message
458 msg = query[len('message:'):].strip()
459 l = [change.query() for change in self.changes.values()
460 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800461 else:
462 # Query all open changes
463 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700464 return l
James E. Blairc494d542014-08-06 09:23:52 -0700465
Clark Boylanb640e052014-04-03 16:41:46 -0700466 def startWatching(self, *args, **kw):
467 pass
468
469
470class BuildHistory(object):
471 def __init__(self, **kw):
472 self.__dict__.update(kw)
473
474 def __repr__(self):
475 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
476 (self.result, self.name, self.number, self.changes))
477
478
479class FakeURLOpener(object):
480 def __init__(self, upstream_root, fake_gerrit, url):
481 self.upstream_root = upstream_root
482 self.fake_gerrit = fake_gerrit
483 self.url = url
484
485 def read(self):
486 res = urlparse.urlparse(self.url)
487 path = res.path
488 project = '/'.join(path.split('/')[2:-2])
489 ret = '001e# service=git-upload-pack\n'
490 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
491 'multi_ack thin-pack side-band side-band-64k ofs-delta '
492 'shallow no-progress include-tag multi_ack_detailed no-done\n')
493 path = os.path.join(self.upstream_root, project)
494 repo = git.Repo(path)
495 for ref in repo.refs:
496 r = ref.object.hexsha + ' ' + ref.path + '\n'
497 ret += '%04x%s' % (len(r) + 4, r)
498 ret += '0000'
499 return ret
500
501
502class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
503 name = 'gerrit'
504
505 def __init__(self, upstream_root, *args):
506 super(FakeGerritTrigger, self).__init__(*args)
507 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700508 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700509
510 def getGitUrl(self, project):
511 return os.path.join(self.upstream_root, project.name)
512
513
514class FakeStatsd(threading.Thread):
515 def __init__(self):
516 threading.Thread.__init__(self)
517 self.daemon = True
518 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
519 self.sock.bind(('', 0))
520 self.port = self.sock.getsockname()[1]
521 self.wake_read, self.wake_write = os.pipe()
522 self.stats = []
523
524 def run(self):
525 while True:
526 poll = select.poll()
527 poll.register(self.sock, select.POLLIN)
528 poll.register(self.wake_read, select.POLLIN)
529 ret = poll.poll()
530 for (fd, event) in ret:
531 if fd == self.sock.fileno():
532 data = self.sock.recvfrom(1024)
533 if not data:
534 return
535 self.stats.append(data[0])
536 if fd == self.wake_read:
537 return
538
539 def stop(self):
540 os.write(self.wake_write, '1\n')
541
542
543class FakeBuild(threading.Thread):
544 log = logging.getLogger("zuul.test")
545
546 def __init__(self, worker, job, number, node):
547 threading.Thread.__init__(self)
548 self.daemon = True
549 self.worker = worker
550 self.job = job
551 self.name = job.name.split(':')[1]
552 self.number = number
553 self.node = node
554 self.parameters = json.loads(job.arguments)
555 self.unique = self.parameters['ZUUL_UUID']
556 self.wait_condition = threading.Condition()
557 self.waiting = False
558 self.aborted = False
559 self.created = time.time()
560 self.description = ''
561 self.run_error = False
562
563 def release(self):
564 self.wait_condition.acquire()
565 self.wait_condition.notify()
566 self.waiting = False
567 self.log.debug("Build %s released" % self.unique)
568 self.wait_condition.release()
569
570 def isWaiting(self):
571 self.wait_condition.acquire()
572 if self.waiting:
573 ret = True
574 else:
575 ret = False
576 self.wait_condition.release()
577 return ret
578
579 def _wait(self):
580 self.wait_condition.acquire()
581 self.waiting = True
582 self.log.debug("Build %s waiting" % self.unique)
583 self.wait_condition.wait()
584 self.wait_condition.release()
585
586 def run(self):
587 data = {
588 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
589 'name': self.name,
590 'number': self.number,
591 'manager': self.worker.worker_id,
592 'worker_name': 'My Worker',
593 'worker_hostname': 'localhost',
594 'worker_ips': ['127.0.0.1', '192.168.1.1'],
595 'worker_fqdn': 'zuul.example.org',
596 'worker_program': 'FakeBuilder',
597 'worker_version': 'v1.1',
598 'worker_extra': {'something': 'else'}
599 }
600
601 self.log.debug('Running build %s' % self.unique)
602
603 self.job.sendWorkData(json.dumps(data))
604 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
605 self.job.sendWorkStatus(0, 100)
606
607 if self.worker.hold_jobs_in_build:
608 self.log.debug('Holding build %s' % self.unique)
609 self._wait()
610 self.log.debug("Build %s continuing" % self.unique)
611
612 self.worker.lock.acquire()
613
614 result = 'SUCCESS'
615 if (('ZUUL_REF' in self.parameters) and
616 self.worker.shouldFailTest(self.name,
617 self.parameters['ZUUL_REF'])):
618 result = 'FAILURE'
619 if self.aborted:
620 result = 'ABORTED'
621
622 if self.run_error:
623 work_fail = True
624 result = 'RUN_ERROR'
625 else:
626 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500627 data['node_labels'] = ['bare-necessities']
628 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700629 work_fail = False
630
631 changes = None
632 if 'ZUUL_CHANGE_IDS' in self.parameters:
633 changes = self.parameters['ZUUL_CHANGE_IDS']
634
635 self.worker.build_history.append(
636 BuildHistory(name=self.name, number=self.number,
637 result=result, changes=changes, node=self.node,
638 uuid=self.unique, description=self.description,
639 pipeline=self.parameters['ZUUL_PIPELINE'])
640 )
641
642 self.job.sendWorkData(json.dumps(data))
643 if work_fail:
644 self.job.sendWorkFail()
645 else:
646 self.job.sendWorkComplete(json.dumps(data))
647 del self.worker.gearman_jobs[self.job.unique]
648 self.worker.running_builds.remove(self)
649 self.worker.lock.release()
650
651
652class FakeWorker(gear.Worker):
653 def __init__(self, worker_id, test):
654 super(FakeWorker, self).__init__(worker_id)
655 self.gearman_jobs = {}
656 self.build_history = []
657 self.running_builds = []
658 self.build_counter = 0
659 self.fail_tests = {}
660 self.test = test
661
662 self.hold_jobs_in_build = False
663 self.lock = threading.Lock()
664 self.__work_thread = threading.Thread(target=self.work)
665 self.__work_thread.daemon = True
666 self.__work_thread.start()
667
668 def handleJob(self, job):
669 parts = job.name.split(":")
670 cmd = parts[0]
671 name = parts[1]
672 if len(parts) > 2:
673 node = parts[2]
674 else:
675 node = None
676 if cmd == 'build':
677 self.handleBuild(job, name, node)
678 elif cmd == 'stop':
679 self.handleStop(job, name)
680 elif cmd == 'set_description':
681 self.handleSetDescription(job, name)
682
683 def handleBuild(self, job, name, node):
684 build = FakeBuild(self, job, self.build_counter, node)
685 job.build = build
686 self.gearman_jobs[job.unique] = job
687 self.build_counter += 1
688
689 self.running_builds.append(build)
690 build.start()
691
692 def handleStop(self, job, name):
693 self.log.debug("handle stop")
694 parameters = json.loads(job.arguments)
695 name = parameters['name']
696 number = parameters['number']
697 for build in self.running_builds:
698 if build.name == name and build.number == number:
699 build.aborted = True
700 build.release()
701 job.sendWorkComplete()
702 return
703 job.sendWorkFail()
704
705 def handleSetDescription(self, job, name):
706 self.log.debug("handle set description")
707 parameters = json.loads(job.arguments)
708 name = parameters['name']
709 number = parameters['number']
710 descr = parameters['html_description']
711 for build in self.running_builds:
712 if build.name == name and build.number == number:
713 build.description = descr
714 job.sendWorkComplete()
715 return
716 for build in self.build_history:
717 if build.name == name and build.number == number:
718 build.description = descr
719 job.sendWorkComplete()
720 return
721 job.sendWorkFail()
722
723 def work(self):
724 while self.running:
725 try:
726 job = self.getJob()
727 except gear.InterruptedError:
728 continue
729 try:
730 self.handleJob(job)
731 except:
732 self.log.exception("Worker exception:")
733
734 def addFailTest(self, name, change):
735 l = self.fail_tests.get(name, [])
736 l.append(change)
737 self.fail_tests[name] = l
738
739 def shouldFailTest(self, name, ref):
740 l = self.fail_tests.get(name, [])
741 for change in l:
742 if self.test.ref_has_change(ref, change):
743 return True
744 return False
745
746 def release(self, regex=None):
747 builds = self.running_builds[:]
748 self.log.debug("releasing build %s (%s)" % (regex,
749 len(self.running_builds)))
750 for build in builds:
751 if not regex or re.match(regex, build.name):
752 self.log.debug("releasing build %s" %
753 (build.parameters['ZUUL_UUID']))
754 build.release()
755 else:
756 self.log.debug("not releasing build %s" %
757 (build.parameters['ZUUL_UUID']))
758 self.log.debug("done releasing builds %s (%s)" %
759 (regex, len(self.running_builds)))
760
761
762class FakeGearmanServer(gear.Server):
763 def __init__(self):
764 self.hold_jobs_in_queue = False
765 super(FakeGearmanServer, self).__init__(0)
766
767 def getJobForConnection(self, connection, peek=False):
768 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
769 for job in queue:
770 if not hasattr(job, 'waiting'):
771 if job.name.startswith('build:'):
772 job.waiting = self.hold_jobs_in_queue
773 else:
774 job.waiting = False
775 if job.waiting:
776 continue
777 if job.name in connection.functions:
778 if not peek:
779 queue.remove(job)
780 connection.related_jobs[job.handle] = job
781 job.worker_connection = connection
782 job.running = True
783 return job
784 return None
785
786 def release(self, regex=None):
787 released = False
788 qlen = (len(self.high_queue) + len(self.normal_queue) +
789 len(self.low_queue))
790 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
791 for job in self.getQueue():
792 cmd, name = job.name.split(':')
793 if cmd != 'build':
794 continue
795 if not regex or re.match(regex, name):
796 self.log.debug("releasing queued job %s" %
797 job.unique)
798 job.waiting = False
799 released = True
800 else:
801 self.log.debug("not releasing queued job %s" %
802 job.unique)
803 if released:
804 self.wakeConnections()
805 qlen = (len(self.high_queue) + len(self.normal_queue) +
806 len(self.low_queue))
807 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
808
809
810class FakeSMTP(object):
811 log = logging.getLogger('zuul.FakeSMTP')
812
813 def __init__(self, messages, server, port):
814 self.server = server
815 self.port = port
816 self.messages = messages
817
818 def sendmail(self, from_email, to_email, msg):
819 self.log.info("Sending email from %s, to %s, with msg %s" % (
820 from_email, to_email, msg))
821
822 headers = msg.split('\n\n', 1)[0]
823 body = msg.split('\n\n', 1)[1]
824
825 self.messages.append(dict(
826 from_email=from_email,
827 to_email=to_email,
828 msg=msg,
829 headers=headers,
830 body=body,
831 ))
832
833 return True
834
835 def quit(self):
836 return True
837
838
839class FakeSwiftClientConnection(swiftclient.client.Connection):
840 def post_account(self, headers):
841 # Do nothing
842 pass
843
844 def get_auth(self):
845 # Returns endpoint and (unused) auth token
846 endpoint = os.path.join('https://storage.example.org', 'V1',
847 'AUTH_account')
848 return endpoint, ''
849
850
Maru Newby3fe5f852015-01-13 04:22:14 +0000851class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700852 log = logging.getLogger("zuul.test")
853
854 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000855 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700856 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
857 try:
858 test_timeout = int(test_timeout)
859 except ValueError:
860 # If timeout value is invalid do not set a timeout.
861 test_timeout = 0
862 if test_timeout > 0:
863 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
864
865 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
866 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
867 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
868 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
869 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
870 os.environ.get('OS_STDERR_CAPTURE') == '1'):
871 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
872 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
873 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
874 os.environ.get('OS_LOG_CAPTURE') == '1'):
875 self.useFixture(fixtures.FakeLogger(
876 level=logging.DEBUG,
877 format='%(asctime)s %(name)-32s '
878 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000879
880
881class ZuulTestCase(BaseTestCase):
882
883 def setUp(self):
884 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700885 if USE_TEMPDIR:
886 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000887 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
888 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700889 else:
890 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700891 self.test_root = os.path.join(tmp_root, "zuul-test")
892 self.upstream_root = os.path.join(self.test_root, "upstream")
893 self.git_root = os.path.join(self.test_root, "git")
894
895 if os.path.exists(self.test_root):
896 shutil.rmtree(self.test_root)
897 os.makedirs(self.test_root)
898 os.makedirs(self.upstream_root)
899 os.makedirs(self.git_root)
900
901 # Make per test copy of Configuration.
902 self.setup_config()
903 self.config.set('zuul', 'layout_config',
904 os.path.join(FIXTURE_DIR, "layout.yaml"))
905 self.config.set('merger', 'git_dir', self.git_root)
906
907 # For each project in config:
908 self.init_repo("org/project")
909 self.init_repo("org/project1")
910 self.init_repo("org/project2")
911 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700912 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700913 self.init_repo("org/project5")
914 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700915 self.init_repo("org/one-job-project")
916 self.init_repo("org/nonvoting-project")
917 self.init_repo("org/templated-project")
918 self.init_repo("org/layered-project")
919 self.init_repo("org/node-project")
920 self.init_repo("org/conflict-project")
921 self.init_repo("org/noop-project")
922 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000923 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700924
925 self.statsd = FakeStatsd()
926 os.environ['STATSD_HOST'] = 'localhost'
927 os.environ['STATSD_PORT'] = str(self.statsd.port)
928 self.statsd.start()
929 # the statsd client object is configured in the statsd module import
930 reload(statsd)
931 reload(zuul.scheduler)
932
933 self.gearman_server = FakeGearmanServer()
934
935 self.config.set('gearman', 'port', str(self.gearman_server.port))
936
937 self.worker = FakeWorker('fake_worker', self)
938 self.worker.addServer('127.0.0.1', self.gearman_server.port)
939 self.gearman_server.worker = self.worker
940
941 self.merge_server = zuul.merger.server.MergeServer(self.config)
942 self.merge_server.start()
943
944 self.sched = zuul.scheduler.Scheduler()
945
946 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
947 FakeSwiftClientConnection))
948 self.swift = zuul.lib.swift.Swift(self.config)
949
950 def URLOpenerFactory(*args, **kw):
951 if isinstance(args[0], urllib2.Request):
952 return old_urlopen(*args, **kw)
953 args = [self.fake_gerrit] + list(args)
954 return FakeURLOpener(self.upstream_root, *args, **kw)
955
956 old_urlopen = urllib2.urlopen
957 urllib2.urlopen = URLOpenerFactory
958
959 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
960 self.swift)
961 self.merge_client = zuul.merger.client.MergeClient(
962 self.config, self.sched)
963
964 self.smtp_messages = []
965
966 def FakeSMTPFactory(*args, **kw):
967 args = [self.smtp_messages] + list(args)
968 return FakeSMTP(*args, **kw)
969
Joshua Hesketh642824b2014-07-01 17:54:59 +1000970 # Set a changes database so multiple FakeGerrit's can report back to
971 # a virtual canonical database given by the configured hostname
972 self.gerrit_changes_dbs = {
973 self.config.get('gerrit', 'server'): {}
974 }
975
976 def FakeGerritFactory(*args, **kw):
977 kw['changes_dbs'] = self.gerrit_changes_dbs
978 return FakeGerrit(*args, **kw)
979
980 self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
981 FakeGerritFactory))
982
Clark Boylanb640e052014-04-03 16:41:46 -0700983 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
984
985 self.gerrit = FakeGerritTrigger(
986 self.upstream_root, self.config, self.sched)
987 self.gerrit.replication_timeout = 1.5
988 self.gerrit.replication_retry_interval = 0.5
989 self.fake_gerrit = self.gerrit.gerrit
990 self.fake_gerrit.upstream_root = self.upstream_root
991
992 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
993 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
994
995 self.sched.setLauncher(self.launcher)
996 self.sched.setMerger(self.merge_client)
997 self.sched.registerTrigger(self.gerrit)
998 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
999 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +10001000 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
1001 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -07001002 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -07001003
1004 self.sched.registerReporter(
1005 zuul.reporter.gerrit.Reporter(self.gerrit))
1006 self.smtp_reporter = zuul.reporter.smtp.Reporter(
1007 self.config.get('smtp', 'default_from'),
1008 self.config.get('smtp', 'default_to'),
1009 self.config.get('smtp', 'server'))
1010 self.sched.registerReporter(self.smtp_reporter)
1011
1012 self.sched.start()
1013 self.sched.reconfigure(self.config)
1014 self.sched.resume()
1015 self.webapp.start()
1016 self.rpc.start()
1017 self.launcher.gearman.waitForServer()
1018 self.registerJobs()
1019 self.builds = self.worker.running_builds
1020 self.history = self.worker.build_history
1021
1022 self.addCleanup(self.assertFinalState)
1023 self.addCleanup(self.shutdown)
1024
1025 def setup_config(self):
1026 """Per test config object. Override to set different config."""
1027 self.config = ConfigParser.ConfigParser()
1028 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
1029
1030 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001031 # Make sure that git.Repo objects have been garbage collected.
1032 repos = []
1033 gc.collect()
1034 for obj in gc.get_objects():
1035 if isinstance(obj, git.Repo):
1036 repos.append(obj)
1037 self.assertEqual(len(repos), 0)
1038 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001039 for pipeline in self.sched.layout.pipelines.values():
1040 if isinstance(pipeline.manager,
1041 zuul.scheduler.IndependentPipelineManager):
1042 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001043
1044 def shutdown(self):
1045 self.log.debug("Shutting down after tests")
1046 self.launcher.stop()
1047 self.merge_server.stop()
1048 self.merge_server.join()
1049 self.merge_client.stop()
1050 self.worker.shutdown()
1051 self.gerrit.stop()
1052 self.timer.stop()
1053 self.sched.stop()
1054 self.sched.join()
1055 self.statsd.stop()
1056 self.statsd.join()
1057 self.webapp.stop()
1058 self.webapp.join()
1059 self.rpc.stop()
1060 self.rpc.join()
1061 self.gearman_server.shutdown()
1062 threads = threading.enumerate()
1063 if len(threads) > 1:
1064 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001065
1066 def init_repo(self, project):
1067 parts = project.split('/')
1068 path = os.path.join(self.upstream_root, *parts[:-1])
1069 if not os.path.exists(path):
1070 os.makedirs(path)
1071 path = os.path.join(self.upstream_root, project)
1072 repo = git.Repo.init(path)
1073
1074 repo.config_writer().set_value('user', 'email', 'user@example.com')
1075 repo.config_writer().set_value('user', 'name', 'User Name')
1076 repo.config_writer().write()
1077
1078 fn = os.path.join(path, 'README')
1079 f = open(fn, 'w')
1080 f.write("test\n")
1081 f.close()
1082 repo.index.add([fn])
1083 repo.index.commit('initial commit')
1084 master = repo.create_head('master')
1085 repo.create_tag('init')
1086
James E. Blair97d902e2014-08-21 13:25:56 -07001087 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001088 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001089 repo.git.clean('-x', '-f', '-d')
1090
1091 self.create_branch(project, 'mp')
1092
1093 def create_branch(self, project, branch):
1094 path = os.path.join(self.upstream_root, project)
1095 repo = git.Repo.init(path)
1096 fn = os.path.join(path, 'README')
1097
1098 branch_head = repo.create_head(branch)
1099 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001100 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001101 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001102 f.close()
1103 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001104 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001105
James E. Blair97d902e2014-08-21 13:25:56 -07001106 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001107 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001108 repo.git.clean('-x', '-f', '-d')
1109
1110 def ref_has_change(self, ref, change):
1111 path = os.path.join(self.git_root, change.project)
1112 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001113 try:
1114 for commit in repo.iter_commits(ref):
1115 if commit.message.strip() == ('%s-1' % change.subject):
1116 return True
1117 except GitCommandError:
1118 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001119 return False
1120
1121 def job_has_changes(self, *args):
1122 job = args[0]
1123 commits = args[1:]
1124 if isinstance(job, FakeBuild):
1125 parameters = job.parameters
1126 else:
1127 parameters = json.loads(job.arguments)
1128 project = parameters['ZUUL_PROJECT']
1129 path = os.path.join(self.git_root, project)
1130 repo = git.Repo(path)
1131 ref = parameters['ZUUL_REF']
1132 sha = parameters['ZUUL_COMMIT']
1133 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1134 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1135 commit_messages = ['%s-1' % commit.subject for commit in commits]
1136 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1137 " repo_messages %s; sha %s" % (job, commit_messages,
1138 repo_messages, sha))
1139 for msg in commit_messages:
1140 if msg not in repo_messages:
1141 self.log.debug(" messages do not match")
1142 return False
1143 if repo_shas[0] != sha:
1144 self.log.debug(" sha does not match")
1145 return False
1146 self.log.debug(" OK")
1147 return True
1148
1149 def registerJobs(self):
1150 count = 0
1151 for job in self.sched.layout.jobs.keys():
1152 self.worker.registerFunction('build:' + job)
1153 count += 1
1154 self.worker.registerFunction('stop:' + self.worker.worker_id)
1155 count += 1
1156
1157 while len(self.gearman_server.functions) < count:
1158 time.sleep(0)
1159
James E. Blairb8c16472015-05-05 14:55:26 -07001160 def orderedRelease(self):
1161 # Run one build at a time to ensure non-race order:
1162 while len(self.builds):
1163 self.release(self.builds[0])
1164 self.waitUntilSettled()
1165
Clark Boylanb640e052014-04-03 16:41:46 -07001166 def release(self, job):
1167 if isinstance(job, FakeBuild):
1168 job.release()
1169 else:
1170 job.waiting = False
1171 self.log.debug("Queued job %s released" % job.unique)
1172 self.gearman_server.wakeConnections()
1173
1174 def getParameter(self, job, name):
1175 if isinstance(job, FakeBuild):
1176 return job.parameters[name]
1177 else:
1178 parameters = json.loads(job.arguments)
1179 return parameters[name]
1180
1181 def resetGearmanServer(self):
1182 self.worker.setFunctions([])
1183 while True:
1184 done = True
1185 for connection in self.gearman_server.active_connections:
1186 if (connection.functions and
1187 connection.client_id not in ['Zuul RPC Listener',
1188 'Zuul Merger']):
1189 done = False
1190 if done:
1191 break
1192 time.sleep(0)
1193 self.gearman_server.functions = set()
1194 self.rpc.register()
1195 self.merge_server.register()
1196
1197 def haveAllBuildsReported(self):
1198 # See if Zuul is waiting on a meta job to complete
1199 if self.launcher.meta_jobs:
1200 return False
1201 # Find out if every build that the worker has completed has been
1202 # reported back to Zuul. If it hasn't then that means a Gearman
1203 # event is still in transit and the system is not stable.
1204 for build in self.worker.build_history:
1205 zbuild = self.launcher.builds.get(build.uuid)
1206 if not zbuild:
1207 # It has already been reported
1208 continue
1209 # It hasn't been reported yet.
1210 return False
1211 # Make sure that none of the worker connections are in GRAB_WAIT
1212 for connection in self.worker.active_connections:
1213 if connection.state == 'GRAB_WAIT':
1214 return False
1215 return True
1216
1217 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001218 builds = self.launcher.builds.values()
1219 for build in builds:
1220 client_job = None
1221 for conn in self.launcher.gearman.active_connections:
1222 for j in conn.related_jobs.values():
1223 if j.unique == build.uuid:
1224 client_job = j
1225 break
1226 if not client_job:
1227 self.log.debug("%s is not known to the gearman client" %
1228 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001229 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001230 if not client_job.handle:
1231 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001232 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001233 server_job = self.gearman_server.jobs.get(client_job.handle)
1234 if not server_job:
1235 self.log.debug("%s is not known to the gearman server" %
1236 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001237 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001238 if not hasattr(server_job, 'waiting'):
1239 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001240 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001241 if server_job.waiting:
1242 continue
1243 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1244 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001245 if build.number is None:
1246 self.log.debug("%s has not reported start" % worker_job)
1247 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001248 if worker_job.build.isWaiting():
1249 continue
1250 else:
1251 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001252 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001253 else:
1254 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001255 return False
1256 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001257
1258 def waitUntilSettled(self):
1259 self.log.debug("Waiting until settled...")
1260 start = time.time()
1261 while True:
1262 if time.time() - start > 10:
1263 print 'queue status:',
1264 print self.sched.trigger_event_queue.empty(),
1265 print self.sched.result_event_queue.empty(),
1266 print self.fake_gerrit.event_queue.empty(),
1267 print self.areAllBuildsWaiting()
1268 raise Exception("Timeout waiting for Zuul to settle")
1269 # Make sure no new events show up while we're checking
1270 self.worker.lock.acquire()
1271 # have all build states propogated to zuul?
1272 if self.haveAllBuildsReported():
1273 # Join ensures that the queue is empty _and_ events have been
1274 # processed
1275 self.fake_gerrit.event_queue.join()
1276 self.sched.trigger_event_queue.join()
1277 self.sched.result_event_queue.join()
1278 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001279 if (not self.merge_client.build_sets and
1280 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001281 self.sched.result_event_queue.empty() and
1282 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001283 self.haveAllBuildsReported() and
1284 self.areAllBuildsWaiting()):
1285 self.sched.run_handler_lock.release()
1286 self.worker.lock.release()
1287 self.log.debug("...settled.")
1288 return
1289 self.sched.run_handler_lock.release()
1290 self.worker.lock.release()
1291 self.sched.wake_event.wait(0.1)
1292
1293 def countJobResults(self, jobs, result):
1294 jobs = filter(lambda x: x.result == result, jobs)
1295 return len(jobs)
1296
1297 def getJobFromHistory(self, name):
1298 history = self.worker.build_history
1299 for job in history:
1300 if job.name == name:
1301 return job
1302 raise Exception("Unable to find job %s in history" % name)
1303
1304 def assertEmptyQueues(self):
1305 # Make sure there are no orphaned jobs
1306 for pipeline in self.sched.layout.pipelines.values():
1307 for queue in pipeline.queues:
1308 if len(queue.queue) != 0:
1309 print 'pipeline %s queue %s contents %s' % (
1310 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001311 self.assertEqual(len(queue.queue), 0,
1312 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001313
1314 def assertReportedStat(self, key, value=None, kind=None):
1315 start = time.time()
1316 while time.time() < (start + 5):
1317 for stat in self.statsd.stats:
1318 pprint.pprint(self.statsd.stats)
1319 k, v = stat.split(':')
1320 if key == k:
1321 if value is None and kind is None:
1322 return
1323 elif value:
1324 if value == v:
1325 return
1326 elif kind:
1327 if v.endswith('|' + kind):
1328 return
1329 time.sleep(0.1)
1330
1331 pprint.pprint(self.statsd.stats)
1332 raise Exception("Key %s not found in reported stats" % key)