blob: 57b30b5ba6bfa306ced1f13e9bd4c84c33f35ddc [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070050import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070051import zuul.merger.merger
52import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070053import zuul.reporter.gerrit
54import zuul.reporter.smtp
55import zuul.trigger.gerrit
56import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070057import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070058
59FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
60 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070061USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070062
63logging.basicConfig(level=logging.DEBUG,
64 format='%(asctime)s %(name)-32s '
65 '%(levelname)-8s %(message)s')
66
67
68def repack_repo(path):
69 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
70 output = subprocess.Popen(cmd, close_fds=True,
71 stdout=subprocess.PIPE,
72 stderr=subprocess.PIPE)
73 out = output.communicate()
74 if output.returncode:
75 raise Exception("git repack returned %d" % output.returncode)
76 return out
77
78
79def random_sha1():
80 return hashlib.sha1(str(random.random())).hexdigest()
81
82
James E. Blaira190f3b2015-01-05 14:56:54 -080083def iterate_timeout(max_seconds, purpose):
84 start = time.time()
85 count = 0
86 while (time.time() < start + max_seconds):
87 count += 1
88 yield count
89 time.sleep(0)
90 raise Exception("Timeout waiting for %s" % purpose)
91
92
Clark Boylanb640e052014-04-03 16:41:46 -070093class ChangeReference(git.Reference):
94 _common_path_default = "refs/changes"
95 _points_to_commits_only = True
96
97
98class FakeChange(object):
99 categories = {'APRV': ('Approved', -1, 1),
100 'CRVW': ('Code-Review', -2, 2),
101 'VRFY': ('Verified', -2, 2)}
102
103 def __init__(self, gerrit, number, project, branch, subject,
104 status='NEW', upstream_root=None):
105 self.gerrit = gerrit
106 self.reported = 0
107 self.queried = 0
108 self.patchsets = []
109 self.number = number
110 self.project = project
111 self.branch = branch
112 self.subject = subject
113 self.latest_patchset = 0
114 self.depends_on_change = None
115 self.needed_by_changes = []
116 self.fail_merge = False
117 self.messages = []
118 self.data = {
119 'branch': branch,
120 'comments': [],
121 'commitMessage': subject,
122 'createdOn': time.time(),
123 'id': 'I' + random_sha1(),
124 'lastUpdated': time.time(),
125 'number': str(number),
126 'open': status == 'NEW',
127 'owner': {'email': 'user@example.com',
128 'name': 'User Name',
129 'username': 'username'},
130 'patchSets': self.patchsets,
131 'project': project,
132 'status': status,
133 'subject': subject,
134 'submitRecords': [],
135 'url': 'https://hostname/%s' % number}
136
137 self.upstream_root = upstream_root
138 self.addPatchset()
139 self.data['submitRecords'] = self.getSubmitRecords()
140 self.open = status == 'NEW'
141
142 def add_fake_change_to_repo(self, msg, fn, large):
143 path = os.path.join(self.upstream_root, self.project)
144 repo = git.Repo(path)
145 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
146 self.latest_patchset),
147 'refs/tags/init')
148 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700149 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700150 repo.git.clean('-x', '-f', '-d')
151
152 path = os.path.join(self.upstream_root, self.project)
153 if not large:
154 fn = os.path.join(path, fn)
155 f = open(fn, 'w')
156 f.write("test %s %s %s\n" %
157 (self.branch, self.number, self.latest_patchset))
158 f.close()
159 repo.index.add([fn])
160 else:
161 for fni in range(100):
162 fn = os.path.join(path, str(fni))
163 f = open(fn, 'w')
164 for ci in range(4096):
165 f.write(random.choice(string.printable))
166 f.close()
167 repo.index.add([fn])
168
169 r = repo.index.commit(msg)
170 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700171 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700172 repo.git.clean('-x', '-f', '-d')
173 repo.heads['master'].checkout()
174 return r
175
176 def addPatchset(self, files=[], large=False):
177 self.latest_patchset += 1
178 if files:
179 fn = files[0]
180 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700181 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700182 msg = self.subject + '-' + str(self.latest_patchset)
183 c = self.add_fake_change_to_repo(msg, fn, large)
184 ps_files = [{'file': '/COMMIT_MSG',
185 'type': 'ADDED'},
186 {'file': 'README',
187 'type': 'MODIFIED'}]
188 for f in files:
189 ps_files.append({'file': f, 'type': 'ADDED'})
190 d = {'approvals': [],
191 'createdOn': time.time(),
192 'files': ps_files,
193 'number': str(self.latest_patchset),
194 'ref': 'refs/changes/1/%s/%s' % (self.number,
195 self.latest_patchset),
196 'revision': c.hexsha,
197 'uploader': {'email': 'user@example.com',
198 'name': 'User name',
199 'username': 'user'}}
200 self.data['currentPatchSet'] = d
201 self.patchsets.append(d)
202 self.data['submitRecords'] = self.getSubmitRecords()
203
204 def getPatchsetCreatedEvent(self, patchset):
205 event = {"type": "patchset-created",
206 "change": {"project": self.project,
207 "branch": self.branch,
208 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
209 "number": str(self.number),
210 "subject": self.subject,
211 "owner": {"name": "User Name"},
212 "url": "https://hostname/3"},
213 "patchSet": self.patchsets[patchset - 1],
214 "uploader": {"name": "User Name"}}
215 return event
216
217 def getChangeRestoredEvent(self):
218 event = {"type": "change-restored",
219 "change": {"project": self.project,
220 "branch": self.branch,
221 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
222 "number": str(self.number),
223 "subject": self.subject,
224 "owner": {"name": "User Name"},
225 "url": "https://hostname/3"},
226 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100227 "patchSet": self.patchsets[-1],
228 "reason": ""}
229 return event
230
231 def getChangeAbandonedEvent(self):
232 event = {"type": "change-abandoned",
233 "change": {"project": self.project,
234 "branch": self.branch,
235 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
236 "number": str(self.number),
237 "subject": self.subject,
238 "owner": {"name": "User Name"},
239 "url": "https://hostname/3"},
240 "abandoner": {"name": "User Name"},
241 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700242 "reason": ""}
243 return event
244
245 def getChangeCommentEvent(self, patchset):
246 event = {"type": "comment-added",
247 "change": {"project": self.project,
248 "branch": self.branch,
249 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
250 "number": str(self.number),
251 "subject": self.subject,
252 "owner": {"name": "User Name"},
253 "url": "https://hostname/3"},
254 "patchSet": self.patchsets[patchset - 1],
255 "author": {"name": "User Name"},
256 "approvals": [{"type": "Code-Review",
257 "description": "Code-Review",
258 "value": "0"}],
259 "comment": "This is a comment"}
260 return event
261
Joshua Hesketh642824b2014-07-01 17:54:59 +1000262 def addApproval(self, category, value, username='reviewer_john',
263 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700264 if not granted_on:
265 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000266 approval = {
267 'description': self.categories[category][0],
268 'type': category,
269 'value': str(value),
270 'by': {
271 'username': username,
272 'email': username + '@example.com',
273 },
274 'grantedOn': int(granted_on)
275 }
Clark Boylanb640e052014-04-03 16:41:46 -0700276 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
277 if x['by']['username'] == username and x['type'] == category:
278 del self.patchsets[-1]['approvals'][i]
279 self.patchsets[-1]['approvals'].append(approval)
280 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000281 'author': {'email': 'author@example.com',
282 'name': 'Patchset Author',
283 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700284 'change': {'branch': self.branch,
285 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
286 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000287 'owner': {'email': 'owner@example.com',
288 'name': 'Change Owner',
289 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700290 'project': self.project,
291 'subject': self.subject,
292 'topic': 'master',
293 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000294 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700295 'patchSet': self.patchsets[-1],
296 'type': 'comment-added'}
297 self.data['submitRecords'] = self.getSubmitRecords()
298 return json.loads(json.dumps(event))
299
300 def getSubmitRecords(self):
301 status = {}
302 for cat in self.categories.keys():
303 status[cat] = 0
304
305 for a in self.patchsets[-1]['approvals']:
306 cur = status[a['type']]
307 cat_min, cat_max = self.categories[a['type']][1:]
308 new = int(a['value'])
309 if new == cat_min:
310 cur = new
311 elif abs(new) > abs(cur):
312 cur = new
313 status[a['type']] = cur
314
315 labels = []
316 ok = True
317 for typ, cat in self.categories.items():
318 cur = status[typ]
319 cat_min, cat_max = cat[1:]
320 if cur == cat_min:
321 value = 'REJECT'
322 ok = False
323 elif cur == cat_max:
324 value = 'OK'
325 else:
326 value = 'NEED'
327 ok = False
328 labels.append({'label': cat[0], 'status': value})
329 if ok:
330 return [{'status': 'OK'}]
331 return [{'status': 'NOT_READY',
332 'labels': labels}]
333
334 def setDependsOn(self, other, patchset):
335 self.depends_on_change = other
336 d = {'id': other.data['id'],
337 'number': other.data['number'],
338 'ref': other.patchsets[patchset - 1]['ref']
339 }
340 self.data['dependsOn'] = [d]
341
342 other.needed_by_changes.append(self)
343 needed = other.data.get('neededBy', [])
344 d = {'id': self.data['id'],
345 'number': self.data['number'],
346 'ref': self.patchsets[patchset - 1]['ref'],
347 'revision': self.patchsets[patchset - 1]['revision']
348 }
349 needed.append(d)
350 other.data['neededBy'] = needed
351
352 def query(self):
353 self.queried += 1
354 d = self.data.get('dependsOn')
355 if d:
356 d = d[0]
357 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
358 d['isCurrentPatchSet'] = True
359 else:
360 d['isCurrentPatchSet'] = False
361 return json.loads(json.dumps(self.data))
362
363 def setMerged(self):
364 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000365 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700366 return
367 if self.fail_merge:
368 return
369 self.data['status'] = 'MERGED'
370 self.open = False
371
372 path = os.path.join(self.upstream_root, self.project)
373 repo = git.Repo(path)
374 repo.heads[self.branch].commit = \
375 repo.commit(self.patchsets[-1]['revision'])
376
377 def setReported(self):
378 self.reported += 1
379
380
381class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700382 log = logging.getLogger("zuul.test.FakeGerrit")
383
Joshua Hesketh642824b2014-07-01 17:54:59 +1000384 def __init__(self, hostname, username, port=29418, keyfile=None,
385 changes_dbs={}):
386 self.hostname = hostname
387 self.username = username
388 self.port = port
389 self.keyfile = keyfile
Clark Boylanb640e052014-04-03 16:41:46 -0700390 self.event_queue = Queue.Queue()
391 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
392 self.change_number = 0
Joshua Hesketh642824b2014-07-01 17:54:59 +1000393 self.changes = changes_dbs.get(hostname, {})
James E. Blairf8ff9932014-08-15 15:24:24 -0700394 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700395
396 def addFakeChange(self, project, branch, subject, status='NEW'):
397 self.change_number += 1
398 c = FakeChange(self, self.change_number, project, branch, subject,
399 upstream_root=self.upstream_root,
400 status=status)
401 self.changes[self.change_number] = c
402 return c
403
404 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700405 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700406
407 def getEvent(self):
408 return self.event_queue.get()
409
410 def eventDone(self):
411 self.event_queue.task_done()
412
413 def review(self, project, changeid, message, action):
414 number, ps = changeid.split(',')
415 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000416
417 # Add the approval back onto the change (ie simulate what gerrit would
418 # do).
419 # Usually when zuul leaves a review it'll create a feedback loop where
420 # zuul's review enters another gerrit event (which is then picked up by
421 # zuul). However, we can't mimic this behaviour (by adding this
422 # approval event into the queue) as it stops jobs from checking what
423 # happens before this event is triggered. If a job needs to see what
424 # happens they can add their own verified event into the queue.
425 # Nevertheless, we can update change with the new review in gerrit.
426
427 for cat in ['CRVW', 'VRFY', 'APRV']:
428 if cat in action:
429 change.addApproval(cat, action[cat], username=self.username)
430
431 if 'label' in action:
432 parts = action['label'].split('=')
433 change.addApproval(parts[0], parts[2], username=self.username)
434
Clark Boylanb640e052014-04-03 16:41:46 -0700435 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000436
Clark Boylanb640e052014-04-03 16:41:46 -0700437 if 'submit' in action:
438 change.setMerged()
439 if message:
440 change.setReported()
441
442 def query(self, number):
443 change = self.changes.get(int(number))
444 if change:
445 return change.query()
446 return {}
447
James E. Blairc494d542014-08-06 09:23:52 -0700448 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700449 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700450 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800451 if query.startswith('change:'):
452 # Query a specific changeid
453 changeid = query[len('change:'):]
454 l = [change.query() for change in self.changes.values()
455 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700456 elif query.startswith('message:'):
457 # Query the content of a commit message
458 msg = query[len('message:'):].strip()
459 l = [change.query() for change in self.changes.values()
460 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800461 else:
462 # Query all open changes
463 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700464 return l
James E. Blairc494d542014-08-06 09:23:52 -0700465
Clark Boylanb640e052014-04-03 16:41:46 -0700466 def startWatching(self, *args, **kw):
467 pass
468
469
470class BuildHistory(object):
471 def __init__(self, **kw):
472 self.__dict__.update(kw)
473
474 def __repr__(self):
475 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
476 (self.result, self.name, self.number, self.changes))
477
478
479class FakeURLOpener(object):
480 def __init__(self, upstream_root, fake_gerrit, url):
481 self.upstream_root = upstream_root
482 self.fake_gerrit = fake_gerrit
483 self.url = url
484
485 def read(self):
486 res = urlparse.urlparse(self.url)
487 path = res.path
488 project = '/'.join(path.split('/')[2:-2])
489 ret = '001e# service=git-upload-pack\n'
490 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
491 'multi_ack thin-pack side-band side-band-64k ofs-delta '
492 'shallow no-progress include-tag multi_ack_detailed no-done\n')
493 path = os.path.join(self.upstream_root, project)
494 repo = git.Repo(path)
495 for ref in repo.refs:
496 r = ref.object.hexsha + ' ' + ref.path + '\n'
497 ret += '%04x%s' % (len(r) + 4, r)
498 ret += '0000'
499 return ret
500
501
502class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
503 name = 'gerrit'
504
505 def __init__(self, upstream_root, *args):
506 super(FakeGerritTrigger, self).__init__(*args)
507 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700508 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700509
510 def getGitUrl(self, project):
511 return os.path.join(self.upstream_root, project.name)
512
513
514class FakeStatsd(threading.Thread):
515 def __init__(self):
516 threading.Thread.__init__(self)
517 self.daemon = True
518 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
519 self.sock.bind(('', 0))
520 self.port = self.sock.getsockname()[1]
521 self.wake_read, self.wake_write = os.pipe()
522 self.stats = []
523
524 def run(self):
525 while True:
526 poll = select.poll()
527 poll.register(self.sock, select.POLLIN)
528 poll.register(self.wake_read, select.POLLIN)
529 ret = poll.poll()
530 for (fd, event) in ret:
531 if fd == self.sock.fileno():
532 data = self.sock.recvfrom(1024)
533 if not data:
534 return
535 self.stats.append(data[0])
536 if fd == self.wake_read:
537 return
538
539 def stop(self):
540 os.write(self.wake_write, '1\n')
541
542
543class FakeBuild(threading.Thread):
544 log = logging.getLogger("zuul.test")
545
546 def __init__(self, worker, job, number, node):
547 threading.Thread.__init__(self)
548 self.daemon = True
549 self.worker = worker
550 self.job = job
551 self.name = job.name.split(':')[1]
552 self.number = number
553 self.node = node
554 self.parameters = json.loads(job.arguments)
555 self.unique = self.parameters['ZUUL_UUID']
556 self.wait_condition = threading.Condition()
557 self.waiting = False
558 self.aborted = False
559 self.created = time.time()
560 self.description = ''
561 self.run_error = False
562
563 def release(self):
564 self.wait_condition.acquire()
565 self.wait_condition.notify()
566 self.waiting = False
567 self.log.debug("Build %s released" % self.unique)
568 self.wait_condition.release()
569
570 def isWaiting(self):
571 self.wait_condition.acquire()
572 if self.waiting:
573 ret = True
574 else:
575 ret = False
576 self.wait_condition.release()
577 return ret
578
579 def _wait(self):
580 self.wait_condition.acquire()
581 self.waiting = True
582 self.log.debug("Build %s waiting" % self.unique)
583 self.wait_condition.wait()
584 self.wait_condition.release()
585
586 def run(self):
587 data = {
588 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
589 'name': self.name,
590 'number': self.number,
591 'manager': self.worker.worker_id,
592 'worker_name': 'My Worker',
593 'worker_hostname': 'localhost',
594 'worker_ips': ['127.0.0.1', '192.168.1.1'],
595 'worker_fqdn': 'zuul.example.org',
596 'worker_program': 'FakeBuilder',
597 'worker_version': 'v1.1',
598 'worker_extra': {'something': 'else'}
599 }
600
601 self.log.debug('Running build %s' % self.unique)
602
603 self.job.sendWorkData(json.dumps(data))
604 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
605 self.job.sendWorkStatus(0, 100)
606
607 if self.worker.hold_jobs_in_build:
608 self.log.debug('Holding build %s' % self.unique)
609 self._wait()
610 self.log.debug("Build %s continuing" % self.unique)
611
612 self.worker.lock.acquire()
613
614 result = 'SUCCESS'
615 if (('ZUUL_REF' in self.parameters) and
616 self.worker.shouldFailTest(self.name,
617 self.parameters['ZUUL_REF'])):
618 result = 'FAILURE'
619 if self.aborted:
620 result = 'ABORTED'
621
622 if self.run_error:
623 work_fail = True
624 result = 'RUN_ERROR'
625 else:
626 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500627 data['node_labels'] = ['bare-necessities']
628 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700629 work_fail = False
630
631 changes = None
632 if 'ZUUL_CHANGE_IDS' in self.parameters:
633 changes = self.parameters['ZUUL_CHANGE_IDS']
634
635 self.worker.build_history.append(
636 BuildHistory(name=self.name, number=self.number,
637 result=result, changes=changes, node=self.node,
638 uuid=self.unique, description=self.description,
639 pipeline=self.parameters['ZUUL_PIPELINE'])
640 )
641
642 self.job.sendWorkData(json.dumps(data))
643 if work_fail:
644 self.job.sendWorkFail()
645 else:
646 self.job.sendWorkComplete(json.dumps(data))
647 del self.worker.gearman_jobs[self.job.unique]
648 self.worker.running_builds.remove(self)
649 self.worker.lock.release()
650
651
652class FakeWorker(gear.Worker):
653 def __init__(self, worker_id, test):
654 super(FakeWorker, self).__init__(worker_id)
655 self.gearman_jobs = {}
656 self.build_history = []
657 self.running_builds = []
658 self.build_counter = 0
659 self.fail_tests = {}
660 self.test = test
661
662 self.hold_jobs_in_build = False
663 self.lock = threading.Lock()
664 self.__work_thread = threading.Thread(target=self.work)
665 self.__work_thread.daemon = True
666 self.__work_thread.start()
667
668 def handleJob(self, job):
669 parts = job.name.split(":")
670 cmd = parts[0]
671 name = parts[1]
672 if len(parts) > 2:
673 node = parts[2]
674 else:
675 node = None
676 if cmd == 'build':
677 self.handleBuild(job, name, node)
678 elif cmd == 'stop':
679 self.handleStop(job, name)
680 elif cmd == 'set_description':
681 self.handleSetDescription(job, name)
682
683 def handleBuild(self, job, name, node):
684 build = FakeBuild(self, job, self.build_counter, node)
685 job.build = build
686 self.gearman_jobs[job.unique] = job
687 self.build_counter += 1
688
689 self.running_builds.append(build)
690 build.start()
691
692 def handleStop(self, job, name):
693 self.log.debug("handle stop")
694 parameters = json.loads(job.arguments)
695 name = parameters['name']
696 number = parameters['number']
697 for build in self.running_builds:
698 if build.name == name and build.number == number:
699 build.aborted = True
700 build.release()
701 job.sendWorkComplete()
702 return
703 job.sendWorkFail()
704
705 def handleSetDescription(self, job, name):
706 self.log.debug("handle set description")
707 parameters = json.loads(job.arguments)
708 name = parameters['name']
709 number = parameters['number']
710 descr = parameters['html_description']
711 for build in self.running_builds:
712 if build.name == name and build.number == number:
713 build.description = descr
714 job.sendWorkComplete()
715 return
716 for build in self.build_history:
717 if build.name == name and build.number == number:
718 build.description = descr
719 job.sendWorkComplete()
720 return
721 job.sendWorkFail()
722
723 def work(self):
724 while self.running:
725 try:
726 job = self.getJob()
727 except gear.InterruptedError:
728 continue
729 try:
730 self.handleJob(job)
731 except:
732 self.log.exception("Worker exception:")
733
734 def addFailTest(self, name, change):
735 l = self.fail_tests.get(name, [])
736 l.append(change)
737 self.fail_tests[name] = l
738
739 def shouldFailTest(self, name, ref):
740 l = self.fail_tests.get(name, [])
741 for change in l:
742 if self.test.ref_has_change(ref, change):
743 return True
744 return False
745
746 def release(self, regex=None):
747 builds = self.running_builds[:]
748 self.log.debug("releasing build %s (%s)" % (regex,
749 len(self.running_builds)))
750 for build in builds:
751 if not regex or re.match(regex, build.name):
752 self.log.debug("releasing build %s" %
753 (build.parameters['ZUUL_UUID']))
754 build.release()
755 else:
756 self.log.debug("not releasing build %s" %
757 (build.parameters['ZUUL_UUID']))
758 self.log.debug("done releasing builds %s (%s)" %
759 (regex, len(self.running_builds)))
760
761
762class FakeGearmanServer(gear.Server):
763 def __init__(self):
764 self.hold_jobs_in_queue = False
765 super(FakeGearmanServer, self).__init__(0)
766
767 def getJobForConnection(self, connection, peek=False):
768 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
769 for job in queue:
770 if not hasattr(job, 'waiting'):
771 if job.name.startswith('build:'):
772 job.waiting = self.hold_jobs_in_queue
773 else:
774 job.waiting = False
775 if job.waiting:
776 continue
777 if job.name in connection.functions:
778 if not peek:
779 queue.remove(job)
780 connection.related_jobs[job.handle] = job
781 job.worker_connection = connection
782 job.running = True
783 return job
784 return None
785
786 def release(self, regex=None):
787 released = False
788 qlen = (len(self.high_queue) + len(self.normal_queue) +
789 len(self.low_queue))
790 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
791 for job in self.getQueue():
792 cmd, name = job.name.split(':')
793 if cmd != 'build':
794 continue
795 if not regex or re.match(regex, name):
796 self.log.debug("releasing queued job %s" %
797 job.unique)
798 job.waiting = False
799 released = True
800 else:
801 self.log.debug("not releasing queued job %s" %
802 job.unique)
803 if released:
804 self.wakeConnections()
805 qlen = (len(self.high_queue) + len(self.normal_queue) +
806 len(self.low_queue))
807 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
808
809
810class FakeSMTP(object):
811 log = logging.getLogger('zuul.FakeSMTP')
812
813 def __init__(self, messages, server, port):
814 self.server = server
815 self.port = port
816 self.messages = messages
817
818 def sendmail(self, from_email, to_email, msg):
819 self.log.info("Sending email from %s, to %s, with msg %s" % (
820 from_email, to_email, msg))
821
822 headers = msg.split('\n\n', 1)[0]
823 body = msg.split('\n\n', 1)[1]
824
825 self.messages.append(dict(
826 from_email=from_email,
827 to_email=to_email,
828 msg=msg,
829 headers=headers,
830 body=body,
831 ))
832
833 return True
834
835 def quit(self):
836 return True
837
838
839class FakeSwiftClientConnection(swiftclient.client.Connection):
840 def post_account(self, headers):
841 # Do nothing
842 pass
843
844 def get_auth(self):
845 # Returns endpoint and (unused) auth token
846 endpoint = os.path.join('https://storage.example.org', 'V1',
847 'AUTH_account')
848 return endpoint, ''
849
850
Maru Newby3fe5f852015-01-13 04:22:14 +0000851class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700852 log = logging.getLogger("zuul.test")
853
854 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000855 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700856 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
857 try:
858 test_timeout = int(test_timeout)
859 except ValueError:
860 # If timeout value is invalid do not set a timeout.
861 test_timeout = 0
862 if test_timeout > 0:
863 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
864
865 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
866 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
867 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
868 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
869 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
870 os.environ.get('OS_STDERR_CAPTURE') == '1'):
871 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
872 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
873 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
874 os.environ.get('OS_LOG_CAPTURE') == '1'):
875 self.useFixture(fixtures.FakeLogger(
876 level=logging.DEBUG,
877 format='%(asctime)s %(name)-32s '
878 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000879
880
881class ZuulTestCase(BaseTestCase):
882
883 def setUp(self):
884 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700885 if USE_TEMPDIR:
886 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000887 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
888 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700889 else:
890 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700891 self.test_root = os.path.join(tmp_root, "zuul-test")
892 self.upstream_root = os.path.join(self.test_root, "upstream")
893 self.git_root = os.path.join(self.test_root, "git")
894
895 if os.path.exists(self.test_root):
896 shutil.rmtree(self.test_root)
897 os.makedirs(self.test_root)
898 os.makedirs(self.upstream_root)
899 os.makedirs(self.git_root)
900
901 # Make per test copy of Configuration.
902 self.setup_config()
903 self.config.set('zuul', 'layout_config',
904 os.path.join(FIXTURE_DIR, "layout.yaml"))
905 self.config.set('merger', 'git_dir', self.git_root)
906
907 # For each project in config:
908 self.init_repo("org/project")
909 self.init_repo("org/project1")
910 self.init_repo("org/project2")
911 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700912 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700913 self.init_repo("org/project5")
914 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700915 self.init_repo("org/one-job-project")
916 self.init_repo("org/nonvoting-project")
917 self.init_repo("org/templated-project")
918 self.init_repo("org/layered-project")
919 self.init_repo("org/node-project")
920 self.init_repo("org/conflict-project")
921 self.init_repo("org/noop-project")
922 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000923 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700924
925 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000926 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
927 # see: https://github.com/jsocol/pystatsd/issues/61
928 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700929 os.environ['STATSD_PORT'] = str(self.statsd.port)
930 self.statsd.start()
931 # the statsd client object is configured in the statsd module import
932 reload(statsd)
933 reload(zuul.scheduler)
934
935 self.gearman_server = FakeGearmanServer()
936
937 self.config.set('gearman', 'port', str(self.gearman_server.port))
938
939 self.worker = FakeWorker('fake_worker', self)
940 self.worker.addServer('127.0.0.1', self.gearman_server.port)
941 self.gearman_server.worker = self.worker
942
943 self.merge_server = zuul.merger.server.MergeServer(self.config)
944 self.merge_server.start()
945
946 self.sched = zuul.scheduler.Scheduler()
947
948 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
949 FakeSwiftClientConnection))
950 self.swift = zuul.lib.swift.Swift(self.config)
951
952 def URLOpenerFactory(*args, **kw):
953 if isinstance(args[0], urllib2.Request):
954 return old_urlopen(*args, **kw)
955 args = [self.fake_gerrit] + list(args)
956 return FakeURLOpener(self.upstream_root, *args, **kw)
957
958 old_urlopen = urllib2.urlopen
959 urllib2.urlopen = URLOpenerFactory
960
961 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
962 self.swift)
963 self.merge_client = zuul.merger.client.MergeClient(
964 self.config, self.sched)
965
966 self.smtp_messages = []
967
968 def FakeSMTPFactory(*args, **kw):
969 args = [self.smtp_messages] + list(args)
970 return FakeSMTP(*args, **kw)
971
Joshua Hesketh642824b2014-07-01 17:54:59 +1000972 # Set a changes database so multiple FakeGerrit's can report back to
973 # a virtual canonical database given by the configured hostname
974 self.gerrit_changes_dbs = {
975 self.config.get('gerrit', 'server'): {}
976 }
977
978 def FakeGerritFactory(*args, **kw):
979 kw['changes_dbs'] = self.gerrit_changes_dbs
980 return FakeGerrit(*args, **kw)
981
982 self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
983 FakeGerritFactory))
984
Clark Boylanb640e052014-04-03 16:41:46 -0700985 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
986
987 self.gerrit = FakeGerritTrigger(
988 self.upstream_root, self.config, self.sched)
989 self.gerrit.replication_timeout = 1.5
990 self.gerrit.replication_retry_interval = 0.5
991 self.fake_gerrit = self.gerrit.gerrit
992 self.fake_gerrit.upstream_root = self.upstream_root
993
994 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
995 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
996
997 self.sched.setLauncher(self.launcher)
998 self.sched.setMerger(self.merge_client)
999 self.sched.registerTrigger(self.gerrit)
1000 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
1001 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +10001002 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
1003 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -07001004 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -07001005
1006 self.sched.registerReporter(
1007 zuul.reporter.gerrit.Reporter(self.gerrit))
1008 self.smtp_reporter = zuul.reporter.smtp.Reporter(
1009 self.config.get('smtp', 'default_from'),
1010 self.config.get('smtp', 'default_to'),
1011 self.config.get('smtp', 'server'))
1012 self.sched.registerReporter(self.smtp_reporter)
1013
1014 self.sched.start()
1015 self.sched.reconfigure(self.config)
1016 self.sched.resume()
1017 self.webapp.start()
1018 self.rpc.start()
1019 self.launcher.gearman.waitForServer()
1020 self.registerJobs()
1021 self.builds = self.worker.running_builds
1022 self.history = self.worker.build_history
1023
1024 self.addCleanup(self.assertFinalState)
1025 self.addCleanup(self.shutdown)
1026
1027 def setup_config(self):
1028 """Per test config object. Override to set different config."""
1029 self.config = ConfigParser.ConfigParser()
1030 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
1031
1032 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001033 # Make sure that git.Repo objects have been garbage collected.
1034 repos = []
1035 gc.collect()
1036 for obj in gc.get_objects():
1037 if isinstance(obj, git.Repo):
1038 repos.append(obj)
1039 self.assertEqual(len(repos), 0)
1040 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001041 for pipeline in self.sched.layout.pipelines.values():
1042 if isinstance(pipeline.manager,
1043 zuul.scheduler.IndependentPipelineManager):
1044 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001045
1046 def shutdown(self):
1047 self.log.debug("Shutting down after tests")
1048 self.launcher.stop()
1049 self.merge_server.stop()
1050 self.merge_server.join()
1051 self.merge_client.stop()
1052 self.worker.shutdown()
1053 self.gerrit.stop()
1054 self.timer.stop()
1055 self.sched.stop()
1056 self.sched.join()
1057 self.statsd.stop()
1058 self.statsd.join()
1059 self.webapp.stop()
1060 self.webapp.join()
1061 self.rpc.stop()
1062 self.rpc.join()
1063 self.gearman_server.shutdown()
1064 threads = threading.enumerate()
1065 if len(threads) > 1:
1066 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001067
1068 def init_repo(self, project):
1069 parts = project.split('/')
1070 path = os.path.join(self.upstream_root, *parts[:-1])
1071 if not os.path.exists(path):
1072 os.makedirs(path)
1073 path = os.path.join(self.upstream_root, project)
1074 repo = git.Repo.init(path)
1075
1076 repo.config_writer().set_value('user', 'email', 'user@example.com')
1077 repo.config_writer().set_value('user', 'name', 'User Name')
1078 repo.config_writer().write()
1079
1080 fn = os.path.join(path, 'README')
1081 f = open(fn, 'w')
1082 f.write("test\n")
1083 f.close()
1084 repo.index.add([fn])
1085 repo.index.commit('initial commit')
1086 master = repo.create_head('master')
1087 repo.create_tag('init')
1088
James E. Blair97d902e2014-08-21 13:25:56 -07001089 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001090 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001091 repo.git.clean('-x', '-f', '-d')
1092
1093 self.create_branch(project, 'mp')
1094
1095 def create_branch(self, project, branch):
1096 path = os.path.join(self.upstream_root, project)
1097 repo = git.Repo.init(path)
1098 fn = os.path.join(path, 'README')
1099
1100 branch_head = repo.create_head(branch)
1101 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001102 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001103 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001104 f.close()
1105 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001106 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001107
James E. Blair97d902e2014-08-21 13:25:56 -07001108 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001109 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001110 repo.git.clean('-x', '-f', '-d')
1111
1112 def ref_has_change(self, ref, change):
1113 path = os.path.join(self.git_root, change.project)
1114 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001115 try:
1116 for commit in repo.iter_commits(ref):
1117 if commit.message.strip() == ('%s-1' % change.subject):
1118 return True
1119 except GitCommandError:
1120 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001121 return False
1122
1123 def job_has_changes(self, *args):
1124 job = args[0]
1125 commits = args[1:]
1126 if isinstance(job, FakeBuild):
1127 parameters = job.parameters
1128 else:
1129 parameters = json.loads(job.arguments)
1130 project = parameters['ZUUL_PROJECT']
1131 path = os.path.join(self.git_root, project)
1132 repo = git.Repo(path)
1133 ref = parameters['ZUUL_REF']
1134 sha = parameters['ZUUL_COMMIT']
1135 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1136 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1137 commit_messages = ['%s-1' % commit.subject for commit in commits]
1138 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1139 " repo_messages %s; sha %s" % (job, commit_messages,
1140 repo_messages, sha))
1141 for msg in commit_messages:
1142 if msg not in repo_messages:
1143 self.log.debug(" messages do not match")
1144 return False
1145 if repo_shas[0] != sha:
1146 self.log.debug(" sha does not match")
1147 return False
1148 self.log.debug(" OK")
1149 return True
1150
1151 def registerJobs(self):
1152 count = 0
1153 for job in self.sched.layout.jobs.keys():
1154 self.worker.registerFunction('build:' + job)
1155 count += 1
1156 self.worker.registerFunction('stop:' + self.worker.worker_id)
1157 count += 1
1158
1159 while len(self.gearman_server.functions) < count:
1160 time.sleep(0)
1161
James E. Blairb8c16472015-05-05 14:55:26 -07001162 def orderedRelease(self):
1163 # Run one build at a time to ensure non-race order:
1164 while len(self.builds):
1165 self.release(self.builds[0])
1166 self.waitUntilSettled()
1167
Clark Boylanb640e052014-04-03 16:41:46 -07001168 def release(self, job):
1169 if isinstance(job, FakeBuild):
1170 job.release()
1171 else:
1172 job.waiting = False
1173 self.log.debug("Queued job %s released" % job.unique)
1174 self.gearman_server.wakeConnections()
1175
1176 def getParameter(self, job, name):
1177 if isinstance(job, FakeBuild):
1178 return job.parameters[name]
1179 else:
1180 parameters = json.loads(job.arguments)
1181 return parameters[name]
1182
1183 def resetGearmanServer(self):
1184 self.worker.setFunctions([])
1185 while True:
1186 done = True
1187 for connection in self.gearman_server.active_connections:
1188 if (connection.functions and
1189 connection.client_id not in ['Zuul RPC Listener',
1190 'Zuul Merger']):
1191 done = False
1192 if done:
1193 break
1194 time.sleep(0)
1195 self.gearman_server.functions = set()
1196 self.rpc.register()
1197 self.merge_server.register()
1198
1199 def haveAllBuildsReported(self):
1200 # See if Zuul is waiting on a meta job to complete
1201 if self.launcher.meta_jobs:
1202 return False
1203 # Find out if every build that the worker has completed has been
1204 # reported back to Zuul. If it hasn't then that means a Gearman
1205 # event is still in transit and the system is not stable.
1206 for build in self.worker.build_history:
1207 zbuild = self.launcher.builds.get(build.uuid)
1208 if not zbuild:
1209 # It has already been reported
1210 continue
1211 # It hasn't been reported yet.
1212 return False
1213 # Make sure that none of the worker connections are in GRAB_WAIT
1214 for connection in self.worker.active_connections:
1215 if connection.state == 'GRAB_WAIT':
1216 return False
1217 return True
1218
1219 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001220 builds = self.launcher.builds.values()
1221 for build in builds:
1222 client_job = None
1223 for conn in self.launcher.gearman.active_connections:
1224 for j in conn.related_jobs.values():
1225 if j.unique == build.uuid:
1226 client_job = j
1227 break
1228 if not client_job:
1229 self.log.debug("%s is not known to the gearman client" %
1230 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001231 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001232 if not client_job.handle:
1233 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001234 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001235 server_job = self.gearman_server.jobs.get(client_job.handle)
1236 if not server_job:
1237 self.log.debug("%s is not known to the gearman server" %
1238 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001239 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001240 if not hasattr(server_job, 'waiting'):
1241 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001242 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001243 if server_job.waiting:
1244 continue
1245 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1246 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001247 if build.number is None:
1248 self.log.debug("%s has not reported start" % worker_job)
1249 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001250 if worker_job.build.isWaiting():
1251 continue
1252 else:
1253 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001254 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001255 else:
1256 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001257 return False
1258 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001259
1260 def waitUntilSettled(self):
1261 self.log.debug("Waiting until settled...")
1262 start = time.time()
1263 while True:
1264 if time.time() - start > 10:
1265 print 'queue status:',
1266 print self.sched.trigger_event_queue.empty(),
1267 print self.sched.result_event_queue.empty(),
1268 print self.fake_gerrit.event_queue.empty(),
1269 print self.areAllBuildsWaiting()
1270 raise Exception("Timeout waiting for Zuul to settle")
1271 # Make sure no new events show up while we're checking
1272 self.worker.lock.acquire()
1273 # have all build states propogated to zuul?
1274 if self.haveAllBuildsReported():
1275 # Join ensures that the queue is empty _and_ events have been
1276 # processed
1277 self.fake_gerrit.event_queue.join()
1278 self.sched.trigger_event_queue.join()
1279 self.sched.result_event_queue.join()
1280 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001281 if (not self.merge_client.build_sets and
1282 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001283 self.sched.result_event_queue.empty() and
1284 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001285 self.haveAllBuildsReported() and
1286 self.areAllBuildsWaiting()):
1287 self.sched.run_handler_lock.release()
1288 self.worker.lock.release()
1289 self.log.debug("...settled.")
1290 return
1291 self.sched.run_handler_lock.release()
1292 self.worker.lock.release()
1293 self.sched.wake_event.wait(0.1)
1294
1295 def countJobResults(self, jobs, result):
1296 jobs = filter(lambda x: x.result == result, jobs)
1297 return len(jobs)
1298
1299 def getJobFromHistory(self, name):
1300 history = self.worker.build_history
1301 for job in history:
1302 if job.name == name:
1303 return job
1304 raise Exception("Unable to find job %s in history" % name)
1305
1306 def assertEmptyQueues(self):
1307 # Make sure there are no orphaned jobs
1308 for pipeline in self.sched.layout.pipelines.values():
1309 for queue in pipeline.queues:
1310 if len(queue.queue) != 0:
1311 print 'pipeline %s queue %s contents %s' % (
1312 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001313 self.assertEqual(len(queue.queue), 0,
1314 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001315
1316 def assertReportedStat(self, key, value=None, kind=None):
1317 start = time.time()
1318 while time.time() < (start + 5):
1319 for stat in self.statsd.stats:
1320 pprint.pprint(self.statsd.stats)
1321 k, v = stat.split(':')
1322 if key == k:
1323 if value is None and kind is None:
1324 return
1325 elif value:
1326 if value == v:
1327 return
1328 elif kind:
1329 if v.endswith('|' + kind):
1330 return
1331 time.sleep(0.1)
1332
1333 pprint.pprint(self.statsd.stats)
1334 raise Exception("Key %s not found in reported stats" % key)