blob: 7cd9de39c29a28d6f12546c96b1722e2952e7b5c [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070050import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070051import zuul.merger.merger
52import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070053import zuul.reporter.gerrit
54import zuul.reporter.smtp
Joshua Hesketh850ccb62014-11-27 11:31:02 +110055import zuul.source.gerrit
Clark Boylanb640e052014-04-03 16:41:46 -070056import zuul.trigger.gerrit
57import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070058import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070059
60FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
61 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070062USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070063
64logging.basicConfig(level=logging.DEBUG,
65 format='%(asctime)s %(name)-32s '
66 '%(levelname)-8s %(message)s')
67
68
69def repack_repo(path):
70 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
71 output = subprocess.Popen(cmd, close_fds=True,
72 stdout=subprocess.PIPE,
73 stderr=subprocess.PIPE)
74 out = output.communicate()
75 if output.returncode:
76 raise Exception("git repack returned %d" % output.returncode)
77 return out
78
79
80def random_sha1():
81 return hashlib.sha1(str(random.random())).hexdigest()
82
83
James E. Blaira190f3b2015-01-05 14:56:54 -080084def iterate_timeout(max_seconds, purpose):
85 start = time.time()
86 count = 0
87 while (time.time() < start + max_seconds):
88 count += 1
89 yield count
90 time.sleep(0)
91 raise Exception("Timeout waiting for %s" % purpose)
92
93
Clark Boylanb640e052014-04-03 16:41:46 -070094class ChangeReference(git.Reference):
95 _common_path_default = "refs/changes"
96 _points_to_commits_only = True
97
98
99class FakeChange(object):
100 categories = {'APRV': ('Approved', -1, 1),
101 'CRVW': ('Code-Review', -2, 2),
102 'VRFY': ('Verified', -2, 2)}
103
104 def __init__(self, gerrit, number, project, branch, subject,
105 status='NEW', upstream_root=None):
106 self.gerrit = gerrit
107 self.reported = 0
108 self.queried = 0
109 self.patchsets = []
110 self.number = number
111 self.project = project
112 self.branch = branch
113 self.subject = subject
114 self.latest_patchset = 0
115 self.depends_on_change = None
116 self.needed_by_changes = []
117 self.fail_merge = False
118 self.messages = []
119 self.data = {
120 'branch': branch,
121 'comments': [],
122 'commitMessage': subject,
123 'createdOn': time.time(),
124 'id': 'I' + random_sha1(),
125 'lastUpdated': time.time(),
126 'number': str(number),
127 'open': status == 'NEW',
128 'owner': {'email': 'user@example.com',
129 'name': 'User Name',
130 'username': 'username'},
131 'patchSets': self.patchsets,
132 'project': project,
133 'status': status,
134 'subject': subject,
135 'submitRecords': [],
136 'url': 'https://hostname/%s' % number}
137
138 self.upstream_root = upstream_root
139 self.addPatchset()
140 self.data['submitRecords'] = self.getSubmitRecords()
141 self.open = status == 'NEW'
142
143 def add_fake_change_to_repo(self, msg, fn, large):
144 path = os.path.join(self.upstream_root, self.project)
145 repo = git.Repo(path)
146 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
147 self.latest_patchset),
148 'refs/tags/init')
149 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700150 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700151 repo.git.clean('-x', '-f', '-d')
152
153 path = os.path.join(self.upstream_root, self.project)
154 if not large:
155 fn = os.path.join(path, fn)
156 f = open(fn, 'w')
157 f.write("test %s %s %s\n" %
158 (self.branch, self.number, self.latest_patchset))
159 f.close()
160 repo.index.add([fn])
161 else:
162 for fni in range(100):
163 fn = os.path.join(path, str(fni))
164 f = open(fn, 'w')
165 for ci in range(4096):
166 f.write(random.choice(string.printable))
167 f.close()
168 repo.index.add([fn])
169
170 r = repo.index.commit(msg)
171 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700172 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700173 repo.git.clean('-x', '-f', '-d')
174 repo.heads['master'].checkout()
175 return r
176
177 def addPatchset(self, files=[], large=False):
178 self.latest_patchset += 1
179 if files:
180 fn = files[0]
181 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700182 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700183 msg = self.subject + '-' + str(self.latest_patchset)
184 c = self.add_fake_change_to_repo(msg, fn, large)
185 ps_files = [{'file': '/COMMIT_MSG',
186 'type': 'ADDED'},
187 {'file': 'README',
188 'type': 'MODIFIED'}]
189 for f in files:
190 ps_files.append({'file': f, 'type': 'ADDED'})
191 d = {'approvals': [],
192 'createdOn': time.time(),
193 'files': ps_files,
194 'number': str(self.latest_patchset),
195 'ref': 'refs/changes/1/%s/%s' % (self.number,
196 self.latest_patchset),
197 'revision': c.hexsha,
198 'uploader': {'email': 'user@example.com',
199 'name': 'User name',
200 'username': 'user'}}
201 self.data['currentPatchSet'] = d
202 self.patchsets.append(d)
203 self.data['submitRecords'] = self.getSubmitRecords()
204
205 def getPatchsetCreatedEvent(self, patchset):
206 event = {"type": "patchset-created",
207 "change": {"project": self.project,
208 "branch": self.branch,
209 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
210 "number": str(self.number),
211 "subject": self.subject,
212 "owner": {"name": "User Name"},
213 "url": "https://hostname/3"},
214 "patchSet": self.patchsets[patchset - 1],
215 "uploader": {"name": "User Name"}}
216 return event
217
218 def getChangeRestoredEvent(self):
219 event = {"type": "change-restored",
220 "change": {"project": self.project,
221 "branch": self.branch,
222 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
223 "number": str(self.number),
224 "subject": self.subject,
225 "owner": {"name": "User Name"},
226 "url": "https://hostname/3"},
227 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100228 "patchSet": self.patchsets[-1],
229 "reason": ""}
230 return event
231
232 def getChangeAbandonedEvent(self):
233 event = {"type": "change-abandoned",
234 "change": {"project": self.project,
235 "branch": self.branch,
236 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
237 "number": str(self.number),
238 "subject": self.subject,
239 "owner": {"name": "User Name"},
240 "url": "https://hostname/3"},
241 "abandoner": {"name": "User Name"},
242 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700243 "reason": ""}
244 return event
245
246 def getChangeCommentEvent(self, patchset):
247 event = {"type": "comment-added",
248 "change": {"project": self.project,
249 "branch": self.branch,
250 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
251 "number": str(self.number),
252 "subject": self.subject,
253 "owner": {"name": "User Name"},
254 "url": "https://hostname/3"},
255 "patchSet": self.patchsets[patchset - 1],
256 "author": {"name": "User Name"},
257 "approvals": [{"type": "Code-Review",
258 "description": "Code-Review",
259 "value": "0"}],
260 "comment": "This is a comment"}
261 return event
262
Joshua Hesketh642824b2014-07-01 17:54:59 +1000263 def addApproval(self, category, value, username='reviewer_john',
264 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700265 if not granted_on:
266 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000267 approval = {
268 'description': self.categories[category][0],
269 'type': category,
270 'value': str(value),
271 'by': {
272 'username': username,
273 'email': username + '@example.com',
274 },
275 'grantedOn': int(granted_on)
276 }
Clark Boylanb640e052014-04-03 16:41:46 -0700277 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
278 if x['by']['username'] == username and x['type'] == category:
279 del self.patchsets[-1]['approvals'][i]
280 self.patchsets[-1]['approvals'].append(approval)
281 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000282 'author': {'email': 'author@example.com',
283 'name': 'Patchset Author',
284 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700285 'change': {'branch': self.branch,
286 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
287 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000288 'owner': {'email': 'owner@example.com',
289 'name': 'Change Owner',
290 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700291 'project': self.project,
292 'subject': self.subject,
293 'topic': 'master',
294 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000295 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700296 'patchSet': self.patchsets[-1],
297 'type': 'comment-added'}
298 self.data['submitRecords'] = self.getSubmitRecords()
299 return json.loads(json.dumps(event))
300
301 def getSubmitRecords(self):
302 status = {}
303 for cat in self.categories.keys():
304 status[cat] = 0
305
306 for a in self.patchsets[-1]['approvals']:
307 cur = status[a['type']]
308 cat_min, cat_max = self.categories[a['type']][1:]
309 new = int(a['value'])
310 if new == cat_min:
311 cur = new
312 elif abs(new) > abs(cur):
313 cur = new
314 status[a['type']] = cur
315
316 labels = []
317 ok = True
318 for typ, cat in self.categories.items():
319 cur = status[typ]
320 cat_min, cat_max = cat[1:]
321 if cur == cat_min:
322 value = 'REJECT'
323 ok = False
324 elif cur == cat_max:
325 value = 'OK'
326 else:
327 value = 'NEED'
328 ok = False
329 labels.append({'label': cat[0], 'status': value})
330 if ok:
331 return [{'status': 'OK'}]
332 return [{'status': 'NOT_READY',
333 'labels': labels}]
334
335 def setDependsOn(self, other, patchset):
336 self.depends_on_change = other
337 d = {'id': other.data['id'],
338 'number': other.data['number'],
339 'ref': other.patchsets[patchset - 1]['ref']
340 }
341 self.data['dependsOn'] = [d]
342
343 other.needed_by_changes.append(self)
344 needed = other.data.get('neededBy', [])
345 d = {'id': self.data['id'],
346 'number': self.data['number'],
347 'ref': self.patchsets[patchset - 1]['ref'],
348 'revision': self.patchsets[patchset - 1]['revision']
349 }
350 needed.append(d)
351 other.data['neededBy'] = needed
352
353 def query(self):
354 self.queried += 1
355 d = self.data.get('dependsOn')
356 if d:
357 d = d[0]
358 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
359 d['isCurrentPatchSet'] = True
360 else:
361 d['isCurrentPatchSet'] = False
362 return json.loads(json.dumps(self.data))
363
364 def setMerged(self):
365 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000366 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700367 return
368 if self.fail_merge:
369 return
370 self.data['status'] = 'MERGED'
371 self.open = False
372
373 path = os.path.join(self.upstream_root, self.project)
374 repo = git.Repo(path)
375 repo.heads[self.branch].commit = \
376 repo.commit(self.patchsets[-1]['revision'])
377
378 def setReported(self):
379 self.reported += 1
380
381
382class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700383 log = logging.getLogger("zuul.test.FakeGerrit")
384
Joshua Hesketh642824b2014-07-01 17:54:59 +1000385 def __init__(self, hostname, username, port=29418, keyfile=None,
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100386 changes_dbs={}, queues_dbs={}):
Joshua Hesketh642824b2014-07-01 17:54:59 +1000387 self.hostname = hostname
388 self.username = username
389 self.port = port
390 self.keyfile = keyfile
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100391 self.event_queue = queues_dbs.get(hostname, {})
Clark Boylanb640e052014-04-03 16:41:46 -0700392 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
393 self.change_number = 0
Joshua Hesketh642824b2014-07-01 17:54:59 +1000394 self.changes = changes_dbs.get(hostname, {})
James E. Blairf8ff9932014-08-15 15:24:24 -0700395 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700396
397 def addFakeChange(self, project, branch, subject, status='NEW'):
398 self.change_number += 1
399 c = FakeChange(self, self.change_number, project, branch, subject,
400 upstream_root=self.upstream_root,
401 status=status)
402 self.changes[self.change_number] = c
403 return c
404
405 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700406 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700407
408 def getEvent(self):
409 return self.event_queue.get()
410
411 def eventDone(self):
412 self.event_queue.task_done()
413
414 def review(self, project, changeid, message, action):
415 number, ps = changeid.split(',')
416 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000417
418 # Add the approval back onto the change (ie simulate what gerrit would
419 # do).
420 # Usually when zuul leaves a review it'll create a feedback loop where
421 # zuul's review enters another gerrit event (which is then picked up by
422 # zuul). However, we can't mimic this behaviour (by adding this
423 # approval event into the queue) as it stops jobs from checking what
424 # happens before this event is triggered. If a job needs to see what
425 # happens they can add their own verified event into the queue.
426 # Nevertheless, we can update change with the new review in gerrit.
427
428 for cat in ['CRVW', 'VRFY', 'APRV']:
429 if cat in action:
430 change.addApproval(cat, action[cat], username=self.username)
431
432 if 'label' in action:
433 parts = action['label'].split('=')
434 change.addApproval(parts[0], parts[2], username=self.username)
435
Clark Boylanb640e052014-04-03 16:41:46 -0700436 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000437
Clark Boylanb640e052014-04-03 16:41:46 -0700438 if 'submit' in action:
439 change.setMerged()
440 if message:
441 change.setReported()
442
443 def query(self, number):
444 change = self.changes.get(int(number))
445 if change:
446 return change.query()
447 return {}
448
James E. Blairc494d542014-08-06 09:23:52 -0700449 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700450 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700451 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800452 if query.startswith('change:'):
453 # Query a specific changeid
454 changeid = query[len('change:'):]
455 l = [change.query() for change in self.changes.values()
456 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700457 elif query.startswith('message:'):
458 # Query the content of a commit message
459 msg = query[len('message:'):].strip()
460 l = [change.query() for change in self.changes.values()
461 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800462 else:
463 # Query all open changes
464 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700465 return l
James E. Blairc494d542014-08-06 09:23:52 -0700466
Clark Boylanb640e052014-04-03 16:41:46 -0700467 def startWatching(self, *args, **kw):
468 pass
469
470
471class BuildHistory(object):
472 def __init__(self, **kw):
473 self.__dict__.update(kw)
474
475 def __repr__(self):
476 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
477 (self.result, self.name, self.number, self.changes))
478
479
480class FakeURLOpener(object):
481 def __init__(self, upstream_root, fake_gerrit, url):
482 self.upstream_root = upstream_root
483 self.fake_gerrit = fake_gerrit
484 self.url = url
485
486 def read(self):
487 res = urlparse.urlparse(self.url)
488 path = res.path
489 project = '/'.join(path.split('/')[2:-2])
490 ret = '001e# service=git-upload-pack\n'
491 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
492 'multi_ack thin-pack side-band side-band-64k ofs-delta '
493 'shallow no-progress include-tag multi_ack_detailed no-done\n')
494 path = os.path.join(self.upstream_root, project)
495 repo = git.Repo(path)
496 for ref in repo.refs:
497 r = ref.object.hexsha + ' ' + ref.path + '\n'
498 ret += '%04x%s' % (len(r) + 4, r)
499 ret += '0000'
500 return ret
501
502
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100503class FakeGerritSource(zuul.source.gerrit.Gerrit):
Clark Boylanb640e052014-04-03 16:41:46 -0700504 name = 'gerrit'
505
506 def __init__(self, upstream_root, *args):
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100507 super(FakeGerritSource, self).__init__(*args)
Clark Boylanb640e052014-04-03 16:41:46 -0700508 self.upstream_root = upstream_root
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100509 self.replication_timeout = 1.5
510 self.replication_retry_interval = 0.5
Clark Boylanb640e052014-04-03 16:41:46 -0700511
512 def getGitUrl(self, project):
513 return os.path.join(self.upstream_root, project.name)
514
515
516class FakeStatsd(threading.Thread):
517 def __init__(self):
518 threading.Thread.__init__(self)
519 self.daemon = True
520 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
521 self.sock.bind(('', 0))
522 self.port = self.sock.getsockname()[1]
523 self.wake_read, self.wake_write = os.pipe()
524 self.stats = []
525
526 def run(self):
527 while True:
528 poll = select.poll()
529 poll.register(self.sock, select.POLLIN)
530 poll.register(self.wake_read, select.POLLIN)
531 ret = poll.poll()
532 for (fd, event) in ret:
533 if fd == self.sock.fileno():
534 data = self.sock.recvfrom(1024)
535 if not data:
536 return
537 self.stats.append(data[0])
538 if fd == self.wake_read:
539 return
540
541 def stop(self):
542 os.write(self.wake_write, '1\n')
543
544
545class FakeBuild(threading.Thread):
546 log = logging.getLogger("zuul.test")
547
548 def __init__(self, worker, job, number, node):
549 threading.Thread.__init__(self)
550 self.daemon = True
551 self.worker = worker
552 self.job = job
553 self.name = job.name.split(':')[1]
554 self.number = number
555 self.node = node
556 self.parameters = json.loads(job.arguments)
557 self.unique = self.parameters['ZUUL_UUID']
558 self.wait_condition = threading.Condition()
559 self.waiting = False
560 self.aborted = False
561 self.created = time.time()
562 self.description = ''
563 self.run_error = False
564
565 def release(self):
566 self.wait_condition.acquire()
567 self.wait_condition.notify()
568 self.waiting = False
569 self.log.debug("Build %s released" % self.unique)
570 self.wait_condition.release()
571
572 def isWaiting(self):
573 self.wait_condition.acquire()
574 if self.waiting:
575 ret = True
576 else:
577 ret = False
578 self.wait_condition.release()
579 return ret
580
581 def _wait(self):
582 self.wait_condition.acquire()
583 self.waiting = True
584 self.log.debug("Build %s waiting" % self.unique)
585 self.wait_condition.wait()
586 self.wait_condition.release()
587
588 def run(self):
589 data = {
590 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
591 'name': self.name,
592 'number': self.number,
593 'manager': self.worker.worker_id,
594 'worker_name': 'My Worker',
595 'worker_hostname': 'localhost',
596 'worker_ips': ['127.0.0.1', '192.168.1.1'],
597 'worker_fqdn': 'zuul.example.org',
598 'worker_program': 'FakeBuilder',
599 'worker_version': 'v1.1',
600 'worker_extra': {'something': 'else'}
601 }
602
603 self.log.debug('Running build %s' % self.unique)
604
605 self.job.sendWorkData(json.dumps(data))
606 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
607 self.job.sendWorkStatus(0, 100)
608
609 if self.worker.hold_jobs_in_build:
610 self.log.debug('Holding build %s' % self.unique)
611 self._wait()
612 self.log.debug("Build %s continuing" % self.unique)
613
614 self.worker.lock.acquire()
615
616 result = 'SUCCESS'
617 if (('ZUUL_REF' in self.parameters) and
618 self.worker.shouldFailTest(self.name,
619 self.parameters['ZUUL_REF'])):
620 result = 'FAILURE'
621 if self.aborted:
622 result = 'ABORTED'
623
624 if self.run_error:
625 work_fail = True
626 result = 'RUN_ERROR'
627 else:
628 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500629 data['node_labels'] = ['bare-necessities']
630 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700631 work_fail = False
632
633 changes = None
634 if 'ZUUL_CHANGE_IDS' in self.parameters:
635 changes = self.parameters['ZUUL_CHANGE_IDS']
636
637 self.worker.build_history.append(
638 BuildHistory(name=self.name, number=self.number,
639 result=result, changes=changes, node=self.node,
640 uuid=self.unique, description=self.description,
641 pipeline=self.parameters['ZUUL_PIPELINE'])
642 )
643
644 self.job.sendWorkData(json.dumps(data))
645 if work_fail:
646 self.job.sendWorkFail()
647 else:
648 self.job.sendWorkComplete(json.dumps(data))
649 del self.worker.gearman_jobs[self.job.unique]
650 self.worker.running_builds.remove(self)
651 self.worker.lock.release()
652
653
654class FakeWorker(gear.Worker):
655 def __init__(self, worker_id, test):
656 super(FakeWorker, self).__init__(worker_id)
657 self.gearman_jobs = {}
658 self.build_history = []
659 self.running_builds = []
660 self.build_counter = 0
661 self.fail_tests = {}
662 self.test = test
663
664 self.hold_jobs_in_build = False
665 self.lock = threading.Lock()
666 self.__work_thread = threading.Thread(target=self.work)
667 self.__work_thread.daemon = True
668 self.__work_thread.start()
669
670 def handleJob(self, job):
671 parts = job.name.split(":")
672 cmd = parts[0]
673 name = parts[1]
674 if len(parts) > 2:
675 node = parts[2]
676 else:
677 node = None
678 if cmd == 'build':
679 self.handleBuild(job, name, node)
680 elif cmd == 'stop':
681 self.handleStop(job, name)
682 elif cmd == 'set_description':
683 self.handleSetDescription(job, name)
684
685 def handleBuild(self, job, name, node):
686 build = FakeBuild(self, job, self.build_counter, node)
687 job.build = build
688 self.gearman_jobs[job.unique] = job
689 self.build_counter += 1
690
691 self.running_builds.append(build)
692 build.start()
693
694 def handleStop(self, job, name):
695 self.log.debug("handle stop")
696 parameters = json.loads(job.arguments)
697 name = parameters['name']
698 number = parameters['number']
699 for build in self.running_builds:
700 if build.name == name and build.number == number:
701 build.aborted = True
702 build.release()
703 job.sendWorkComplete()
704 return
705 job.sendWorkFail()
706
707 def handleSetDescription(self, job, name):
708 self.log.debug("handle set description")
709 parameters = json.loads(job.arguments)
710 name = parameters['name']
711 number = parameters['number']
712 descr = parameters['html_description']
713 for build in self.running_builds:
714 if build.name == name and build.number == number:
715 build.description = descr
716 job.sendWorkComplete()
717 return
718 for build in self.build_history:
719 if build.name == name and build.number == number:
720 build.description = descr
721 job.sendWorkComplete()
722 return
723 job.sendWorkFail()
724
725 def work(self):
726 while self.running:
727 try:
728 job = self.getJob()
729 except gear.InterruptedError:
730 continue
731 try:
732 self.handleJob(job)
733 except:
734 self.log.exception("Worker exception:")
735
736 def addFailTest(self, name, change):
737 l = self.fail_tests.get(name, [])
738 l.append(change)
739 self.fail_tests[name] = l
740
741 def shouldFailTest(self, name, ref):
742 l = self.fail_tests.get(name, [])
743 for change in l:
744 if self.test.ref_has_change(ref, change):
745 return True
746 return False
747
748 def release(self, regex=None):
749 builds = self.running_builds[:]
750 self.log.debug("releasing build %s (%s)" % (regex,
751 len(self.running_builds)))
752 for build in builds:
753 if not regex or re.match(regex, build.name):
754 self.log.debug("releasing build %s" %
755 (build.parameters['ZUUL_UUID']))
756 build.release()
757 else:
758 self.log.debug("not releasing build %s" %
759 (build.parameters['ZUUL_UUID']))
760 self.log.debug("done releasing builds %s (%s)" %
761 (regex, len(self.running_builds)))
762
763
764class FakeGearmanServer(gear.Server):
765 def __init__(self):
766 self.hold_jobs_in_queue = False
767 super(FakeGearmanServer, self).__init__(0)
768
769 def getJobForConnection(self, connection, peek=False):
770 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
771 for job in queue:
772 if not hasattr(job, 'waiting'):
773 if job.name.startswith('build:'):
774 job.waiting = self.hold_jobs_in_queue
775 else:
776 job.waiting = False
777 if job.waiting:
778 continue
779 if job.name in connection.functions:
780 if not peek:
781 queue.remove(job)
782 connection.related_jobs[job.handle] = job
783 job.worker_connection = connection
784 job.running = True
785 return job
786 return None
787
788 def release(self, regex=None):
789 released = False
790 qlen = (len(self.high_queue) + len(self.normal_queue) +
791 len(self.low_queue))
792 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
793 for job in self.getQueue():
794 cmd, name = job.name.split(':')
795 if cmd != 'build':
796 continue
797 if not regex or re.match(regex, name):
798 self.log.debug("releasing queued job %s" %
799 job.unique)
800 job.waiting = False
801 released = True
802 else:
803 self.log.debug("not releasing queued job %s" %
804 job.unique)
805 if released:
806 self.wakeConnections()
807 qlen = (len(self.high_queue) + len(self.normal_queue) +
808 len(self.low_queue))
809 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
810
811
812class FakeSMTP(object):
813 log = logging.getLogger('zuul.FakeSMTP')
814
815 def __init__(self, messages, server, port):
816 self.server = server
817 self.port = port
818 self.messages = messages
819
820 def sendmail(self, from_email, to_email, msg):
821 self.log.info("Sending email from %s, to %s, with msg %s" % (
822 from_email, to_email, msg))
823
824 headers = msg.split('\n\n', 1)[0]
825 body = msg.split('\n\n', 1)[1]
826
827 self.messages.append(dict(
828 from_email=from_email,
829 to_email=to_email,
830 msg=msg,
831 headers=headers,
832 body=body,
833 ))
834
835 return True
836
837 def quit(self):
838 return True
839
840
841class FakeSwiftClientConnection(swiftclient.client.Connection):
842 def post_account(self, headers):
843 # Do nothing
844 pass
845
846 def get_auth(self):
847 # Returns endpoint and (unused) auth token
848 endpoint = os.path.join('https://storage.example.org', 'V1',
849 'AUTH_account')
850 return endpoint, ''
851
852
Maru Newby3fe5f852015-01-13 04:22:14 +0000853class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700854 log = logging.getLogger("zuul.test")
855
856 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000857 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700858 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
859 try:
860 test_timeout = int(test_timeout)
861 except ValueError:
862 # If timeout value is invalid do not set a timeout.
863 test_timeout = 0
864 if test_timeout > 0:
865 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
866
867 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
868 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
869 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
870 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
871 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
872 os.environ.get('OS_STDERR_CAPTURE') == '1'):
873 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
874 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
875 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
876 os.environ.get('OS_LOG_CAPTURE') == '1'):
877 self.useFixture(fixtures.FakeLogger(
878 level=logging.DEBUG,
879 format='%(asctime)s %(name)-32s '
880 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000881
882
883class ZuulTestCase(BaseTestCase):
884
885 def setUp(self):
886 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700887 if USE_TEMPDIR:
888 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000889 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
890 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700891 else:
892 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700893 self.test_root = os.path.join(tmp_root, "zuul-test")
894 self.upstream_root = os.path.join(self.test_root, "upstream")
895 self.git_root = os.path.join(self.test_root, "git")
896
897 if os.path.exists(self.test_root):
898 shutil.rmtree(self.test_root)
899 os.makedirs(self.test_root)
900 os.makedirs(self.upstream_root)
901 os.makedirs(self.git_root)
902
903 # Make per test copy of Configuration.
904 self.setup_config()
905 self.config.set('zuul', 'layout_config',
906 os.path.join(FIXTURE_DIR, "layout.yaml"))
907 self.config.set('merger', 'git_dir', self.git_root)
908
909 # For each project in config:
910 self.init_repo("org/project")
911 self.init_repo("org/project1")
912 self.init_repo("org/project2")
913 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700914 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700915 self.init_repo("org/project5")
916 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700917 self.init_repo("org/one-job-project")
918 self.init_repo("org/nonvoting-project")
919 self.init_repo("org/templated-project")
920 self.init_repo("org/layered-project")
921 self.init_repo("org/node-project")
922 self.init_repo("org/conflict-project")
923 self.init_repo("org/noop-project")
924 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000925 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700926
927 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000928 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
929 # see: https://github.com/jsocol/pystatsd/issues/61
930 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700931 os.environ['STATSD_PORT'] = str(self.statsd.port)
932 self.statsd.start()
933 # the statsd client object is configured in the statsd module import
934 reload(statsd)
935 reload(zuul.scheduler)
936
937 self.gearman_server = FakeGearmanServer()
938
939 self.config.set('gearman', 'port', str(self.gearman_server.port))
940
941 self.worker = FakeWorker('fake_worker', self)
942 self.worker.addServer('127.0.0.1', self.gearman_server.port)
943 self.gearman_server.worker = self.worker
944
945 self.merge_server = zuul.merger.server.MergeServer(self.config)
946 self.merge_server.start()
947
948 self.sched = zuul.scheduler.Scheduler()
949
950 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
951 FakeSwiftClientConnection))
952 self.swift = zuul.lib.swift.Swift(self.config)
953
954 def URLOpenerFactory(*args, **kw):
955 if isinstance(args[0], urllib2.Request):
956 return old_urlopen(*args, **kw)
957 args = [self.fake_gerrit] + list(args)
958 return FakeURLOpener(self.upstream_root, *args, **kw)
959
960 old_urlopen = urllib2.urlopen
961 urllib2.urlopen = URLOpenerFactory
962
Clark Boylanb640e052014-04-03 16:41:46 -0700963 self.smtp_messages = []
964
965 def FakeSMTPFactory(*args, **kw):
966 args = [self.smtp_messages] + list(args)
967 return FakeSMTP(*args, **kw)
968
Joshua Hesketh642824b2014-07-01 17:54:59 +1000969 # Set a changes database so multiple FakeGerrit's can report back to
970 # a virtual canonical database given by the configured hostname
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100971 self.gerrit_queues_dbs = {
972 self.config.get('gerrit', 'server'): Queue.Queue()
973 }
Joshua Hesketh642824b2014-07-01 17:54:59 +1000974 self.gerrit_changes_dbs = {
975 self.config.get('gerrit', 'server'): {}
976 }
977
978 def FakeGerritFactory(*args, **kw):
979 kw['changes_dbs'] = self.gerrit_changes_dbs
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100980 kw['queues_dbs'] = self.gerrit_queues_dbs
Joshua Hesketh642824b2014-07-01 17:54:59 +1000981 return FakeGerrit(*args, **kw)
982
983 self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
984 FakeGerritFactory))
985
Clark Boylanb640e052014-04-03 16:41:46 -0700986 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
987
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100988 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
989 self.swift)
990 self.merge_client = zuul.merger.client.MergeClient(
991 self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700992
993 self.sched.setLauncher(self.launcher)
994 self.sched.setMerger(self.merge_client)
Clark Boylanb640e052014-04-03 16:41:46 -0700995
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100996 self.register_sources()
997 self.fake_gerrit = self.gerrit_source.gerrit
998 self.fake_gerrit.upstream_root = self.upstream_root
999
1000 self.register_triggers()
1001 self.register_reporters()
1002
1003 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
1004 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -07001005
1006 self.sched.start()
1007 self.sched.reconfigure(self.config)
1008 self.sched.resume()
1009 self.webapp.start()
1010 self.rpc.start()
1011 self.launcher.gearman.waitForServer()
1012 self.registerJobs()
1013 self.builds = self.worker.running_builds
1014 self.history = self.worker.build_history
1015
1016 self.addCleanup(self.assertFinalState)
1017 self.addCleanup(self.shutdown)
1018
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001019 def register_sources(self):
1020 # Register the available sources
1021 self.gerrit_source = FakeGerritSource(
1022 self.upstream_root, self.config, self.sched)
1023 self.gerrit_source.replication_timeout = 1.5
1024 self.gerrit_source.replication_retry_interval = 0.5
1025
1026 self.sched.registerSource(self.gerrit_source)
1027
1028 def register_triggers(self):
1029 # Register the available triggers
1030 self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(
1031 self.fake_gerrit, self.config, self.sched, self.gerrit_source)
1032 self.gerrit_trigger.gerrit_connector.delay = 0.0
1033
1034 self.sched.registerTrigger(self.gerrit_trigger)
1035 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
1036 self.sched.registerTrigger(self.timer)
1037 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
1038 self.sched)
1039 self.sched.registerTrigger(self.zuultrigger)
1040
1041 def register_reporters(self):
1042 # Register the available reporters
1043 self.sched.registerReporter(
Joshua Heskethffe42062014-09-05 21:43:52 +10001044 zuul.reporter.gerrit.GerritReporter(self.fake_gerrit))
1045 self.smtp_reporter = zuul.reporter.smtp.SMTPReporter(
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001046 self.config.get('smtp', 'default_from'),
1047 self.config.get('smtp', 'default_to'),
1048 self.config.get('smtp', 'server'))
1049 self.sched.registerReporter(self.smtp_reporter)
1050
Clark Boylanb640e052014-04-03 16:41:46 -07001051 def setup_config(self):
1052 """Per test config object. Override to set different config."""
1053 self.config = ConfigParser.ConfigParser()
1054 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
1055
1056 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001057 # Make sure that git.Repo objects have been garbage collected.
1058 repos = []
1059 gc.collect()
1060 for obj in gc.get_objects():
1061 if isinstance(obj, git.Repo):
1062 repos.append(obj)
1063 self.assertEqual(len(repos), 0)
1064 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001065 for pipeline in self.sched.layout.pipelines.values():
1066 if isinstance(pipeline.manager,
1067 zuul.scheduler.IndependentPipelineManager):
1068 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001069
1070 def shutdown(self):
1071 self.log.debug("Shutting down after tests")
1072 self.launcher.stop()
1073 self.merge_server.stop()
1074 self.merge_server.join()
1075 self.merge_client.stop()
1076 self.worker.shutdown()
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001077 self.gerrit_trigger.stop()
Clark Boylanb640e052014-04-03 16:41:46 -07001078 self.timer.stop()
1079 self.sched.stop()
1080 self.sched.join()
1081 self.statsd.stop()
1082 self.statsd.join()
1083 self.webapp.stop()
1084 self.webapp.join()
1085 self.rpc.stop()
1086 self.rpc.join()
1087 self.gearman_server.shutdown()
1088 threads = threading.enumerate()
1089 if len(threads) > 1:
1090 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001091
1092 def init_repo(self, project):
1093 parts = project.split('/')
1094 path = os.path.join(self.upstream_root, *parts[:-1])
1095 if not os.path.exists(path):
1096 os.makedirs(path)
1097 path = os.path.join(self.upstream_root, project)
1098 repo = git.Repo.init(path)
1099
1100 repo.config_writer().set_value('user', 'email', 'user@example.com')
1101 repo.config_writer().set_value('user', 'name', 'User Name')
1102 repo.config_writer().write()
1103
1104 fn = os.path.join(path, 'README')
1105 f = open(fn, 'w')
1106 f.write("test\n")
1107 f.close()
1108 repo.index.add([fn])
1109 repo.index.commit('initial commit')
1110 master = repo.create_head('master')
1111 repo.create_tag('init')
1112
James E. Blair97d902e2014-08-21 13:25:56 -07001113 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001114 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001115 repo.git.clean('-x', '-f', '-d')
1116
1117 self.create_branch(project, 'mp')
1118
1119 def create_branch(self, project, branch):
1120 path = os.path.join(self.upstream_root, project)
1121 repo = git.Repo.init(path)
1122 fn = os.path.join(path, 'README')
1123
1124 branch_head = repo.create_head(branch)
1125 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001126 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001127 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001128 f.close()
1129 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001130 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001131
James E. Blair97d902e2014-08-21 13:25:56 -07001132 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001133 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001134 repo.git.clean('-x', '-f', '-d')
1135
1136 def ref_has_change(self, ref, change):
1137 path = os.path.join(self.git_root, change.project)
1138 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001139 try:
1140 for commit in repo.iter_commits(ref):
1141 if commit.message.strip() == ('%s-1' % change.subject):
1142 return True
1143 except GitCommandError:
1144 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001145 return False
1146
1147 def job_has_changes(self, *args):
1148 job = args[0]
1149 commits = args[1:]
1150 if isinstance(job, FakeBuild):
1151 parameters = job.parameters
1152 else:
1153 parameters = json.loads(job.arguments)
1154 project = parameters['ZUUL_PROJECT']
1155 path = os.path.join(self.git_root, project)
1156 repo = git.Repo(path)
1157 ref = parameters['ZUUL_REF']
1158 sha = parameters['ZUUL_COMMIT']
1159 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1160 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1161 commit_messages = ['%s-1' % commit.subject for commit in commits]
1162 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1163 " repo_messages %s; sha %s" % (job, commit_messages,
1164 repo_messages, sha))
1165 for msg in commit_messages:
1166 if msg not in repo_messages:
1167 self.log.debug(" messages do not match")
1168 return False
1169 if repo_shas[0] != sha:
1170 self.log.debug(" sha does not match")
1171 return False
1172 self.log.debug(" OK")
1173 return True
1174
1175 def registerJobs(self):
1176 count = 0
1177 for job in self.sched.layout.jobs.keys():
1178 self.worker.registerFunction('build:' + job)
1179 count += 1
1180 self.worker.registerFunction('stop:' + self.worker.worker_id)
1181 count += 1
1182
1183 while len(self.gearman_server.functions) < count:
1184 time.sleep(0)
1185
James E. Blairb8c16472015-05-05 14:55:26 -07001186 def orderedRelease(self):
1187 # Run one build at a time to ensure non-race order:
1188 while len(self.builds):
1189 self.release(self.builds[0])
1190 self.waitUntilSettled()
1191
Clark Boylanb640e052014-04-03 16:41:46 -07001192 def release(self, job):
1193 if isinstance(job, FakeBuild):
1194 job.release()
1195 else:
1196 job.waiting = False
1197 self.log.debug("Queued job %s released" % job.unique)
1198 self.gearman_server.wakeConnections()
1199
1200 def getParameter(self, job, name):
1201 if isinstance(job, FakeBuild):
1202 return job.parameters[name]
1203 else:
1204 parameters = json.loads(job.arguments)
1205 return parameters[name]
1206
1207 def resetGearmanServer(self):
1208 self.worker.setFunctions([])
1209 while True:
1210 done = True
1211 for connection in self.gearman_server.active_connections:
1212 if (connection.functions and
1213 connection.client_id not in ['Zuul RPC Listener',
1214 'Zuul Merger']):
1215 done = False
1216 if done:
1217 break
1218 time.sleep(0)
1219 self.gearman_server.functions = set()
1220 self.rpc.register()
1221 self.merge_server.register()
1222
1223 def haveAllBuildsReported(self):
1224 # See if Zuul is waiting on a meta job to complete
1225 if self.launcher.meta_jobs:
1226 return False
1227 # Find out if every build that the worker has completed has been
1228 # reported back to Zuul. If it hasn't then that means a Gearman
1229 # event is still in transit and the system is not stable.
1230 for build in self.worker.build_history:
1231 zbuild = self.launcher.builds.get(build.uuid)
1232 if not zbuild:
1233 # It has already been reported
1234 continue
1235 # It hasn't been reported yet.
1236 return False
1237 # Make sure that none of the worker connections are in GRAB_WAIT
1238 for connection in self.worker.active_connections:
1239 if connection.state == 'GRAB_WAIT':
1240 return False
1241 return True
1242
1243 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001244 builds = self.launcher.builds.values()
1245 for build in builds:
1246 client_job = None
1247 for conn in self.launcher.gearman.active_connections:
1248 for j in conn.related_jobs.values():
1249 if j.unique == build.uuid:
1250 client_job = j
1251 break
1252 if not client_job:
1253 self.log.debug("%s is not known to the gearman client" %
1254 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001255 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001256 if not client_job.handle:
1257 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001258 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001259 server_job = self.gearman_server.jobs.get(client_job.handle)
1260 if not server_job:
1261 self.log.debug("%s is not known to the gearman server" %
1262 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001263 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001264 if not hasattr(server_job, 'waiting'):
1265 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001266 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001267 if server_job.waiting:
1268 continue
1269 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1270 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001271 if build.number is None:
1272 self.log.debug("%s has not reported start" % worker_job)
1273 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001274 if worker_job.build.isWaiting():
1275 continue
1276 else:
1277 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001278 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001279 else:
1280 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001281 return False
1282 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001283
1284 def waitUntilSettled(self):
1285 self.log.debug("Waiting until settled...")
1286 start = time.time()
1287 while True:
1288 if time.time() - start > 10:
1289 print 'queue status:',
1290 print self.sched.trigger_event_queue.empty(),
1291 print self.sched.result_event_queue.empty(),
1292 print self.fake_gerrit.event_queue.empty(),
1293 print self.areAllBuildsWaiting()
1294 raise Exception("Timeout waiting for Zuul to settle")
1295 # Make sure no new events show up while we're checking
1296 self.worker.lock.acquire()
1297 # have all build states propogated to zuul?
1298 if self.haveAllBuildsReported():
1299 # Join ensures that the queue is empty _and_ events have been
1300 # processed
1301 self.fake_gerrit.event_queue.join()
1302 self.sched.trigger_event_queue.join()
1303 self.sched.result_event_queue.join()
1304 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001305 if (not self.merge_client.build_sets and
1306 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001307 self.sched.result_event_queue.empty() and
1308 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001309 self.haveAllBuildsReported() and
1310 self.areAllBuildsWaiting()):
1311 self.sched.run_handler_lock.release()
1312 self.worker.lock.release()
1313 self.log.debug("...settled.")
1314 return
1315 self.sched.run_handler_lock.release()
1316 self.worker.lock.release()
1317 self.sched.wake_event.wait(0.1)
1318
1319 def countJobResults(self, jobs, result):
1320 jobs = filter(lambda x: x.result == result, jobs)
1321 return len(jobs)
1322
1323 def getJobFromHistory(self, name):
1324 history = self.worker.build_history
1325 for job in history:
1326 if job.name == name:
1327 return job
1328 raise Exception("Unable to find job %s in history" % name)
1329
1330 def assertEmptyQueues(self):
1331 # Make sure there are no orphaned jobs
1332 for pipeline in self.sched.layout.pipelines.values():
1333 for queue in pipeline.queues:
1334 if len(queue.queue) != 0:
1335 print 'pipeline %s queue %s contents %s' % (
1336 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001337 self.assertEqual(len(queue.queue), 0,
1338 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001339
1340 def assertReportedStat(self, key, value=None, kind=None):
1341 start = time.time()
1342 while time.time() < (start + 5):
1343 for stat in self.statsd.stats:
1344 pprint.pprint(self.statsd.stats)
1345 k, v = stat.split(':')
1346 if key == k:
1347 if value is None and kind is None:
1348 return
1349 elif value:
1350 if value == v:
1351 return
1352 elif kind:
1353 if v.endswith('|' + kind):
1354 return
1355 time.sleep(0.1)
1356
1357 pprint.pprint(self.statsd.stats)
1358 raise Exception("Key %s not found in reported stats" % key)