blob: 1cafc46aaeaf298308f89b928d62ebfcd418b4e9 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
82class ChangeReference(git.Reference):
83 _common_path_default = "refs/changes"
84 _points_to_commits_only = True
85
86
87class FakeChange(object):
88 categories = {'APRV': ('Approved', -1, 1),
89 'CRVW': ('Code-Review', -2, 2),
90 'VRFY': ('Verified', -2, 2)}
91
92 def __init__(self, gerrit, number, project, branch, subject,
93 status='NEW', upstream_root=None):
94 self.gerrit = gerrit
95 self.reported = 0
96 self.queried = 0
97 self.patchsets = []
98 self.number = number
99 self.project = project
100 self.branch = branch
101 self.subject = subject
102 self.latest_patchset = 0
103 self.depends_on_change = None
104 self.needed_by_changes = []
105 self.fail_merge = False
106 self.messages = []
107 self.data = {
108 'branch': branch,
109 'comments': [],
110 'commitMessage': subject,
111 'createdOn': time.time(),
112 'id': 'I' + random_sha1(),
113 'lastUpdated': time.time(),
114 'number': str(number),
115 'open': status == 'NEW',
116 'owner': {'email': 'user@example.com',
117 'name': 'User Name',
118 'username': 'username'},
119 'patchSets': self.patchsets,
120 'project': project,
121 'status': status,
122 'subject': subject,
123 'submitRecords': [],
124 'url': 'https://hostname/%s' % number}
125
126 self.upstream_root = upstream_root
127 self.addPatchset()
128 self.data['submitRecords'] = self.getSubmitRecords()
129 self.open = status == 'NEW'
130
131 def add_fake_change_to_repo(self, msg, fn, large):
132 path = os.path.join(self.upstream_root, self.project)
133 repo = git.Repo(path)
134 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
135 self.latest_patchset),
136 'refs/tags/init')
137 repo.head.reference = ref
138 repo.head.reset(index=True, working_tree=True)
139 repo.git.clean('-x', '-f', '-d')
140
141 path = os.path.join(self.upstream_root, self.project)
142 if not large:
143 fn = os.path.join(path, fn)
144 f = open(fn, 'w')
145 f.write("test %s %s %s\n" %
146 (self.branch, self.number, self.latest_patchset))
147 f.close()
148 repo.index.add([fn])
149 else:
150 for fni in range(100):
151 fn = os.path.join(path, str(fni))
152 f = open(fn, 'w')
153 for ci in range(4096):
154 f.write(random.choice(string.printable))
155 f.close()
156 repo.index.add([fn])
157
158 r = repo.index.commit(msg)
159 repo.head.reference = 'master'
160 repo.head.reset(index=True, working_tree=True)
161 repo.git.clean('-x', '-f', '-d')
162 repo.heads['master'].checkout()
163 return r
164
165 def addPatchset(self, files=[], large=False):
166 self.latest_patchset += 1
167 if files:
168 fn = files[0]
169 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700170 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700171 msg = self.subject + '-' + str(self.latest_patchset)
172 c = self.add_fake_change_to_repo(msg, fn, large)
173 ps_files = [{'file': '/COMMIT_MSG',
174 'type': 'ADDED'},
175 {'file': 'README',
176 'type': 'MODIFIED'}]
177 for f in files:
178 ps_files.append({'file': f, 'type': 'ADDED'})
179 d = {'approvals': [],
180 'createdOn': time.time(),
181 'files': ps_files,
182 'number': str(self.latest_patchset),
183 'ref': 'refs/changes/1/%s/%s' % (self.number,
184 self.latest_patchset),
185 'revision': c.hexsha,
186 'uploader': {'email': 'user@example.com',
187 'name': 'User name',
188 'username': 'user'}}
189 self.data['currentPatchSet'] = d
190 self.patchsets.append(d)
191 self.data['submitRecords'] = self.getSubmitRecords()
192
193 def getPatchsetCreatedEvent(self, patchset):
194 event = {"type": "patchset-created",
195 "change": {"project": self.project,
196 "branch": self.branch,
197 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
198 "number": str(self.number),
199 "subject": self.subject,
200 "owner": {"name": "User Name"},
201 "url": "https://hostname/3"},
202 "patchSet": self.patchsets[patchset - 1],
203 "uploader": {"name": "User Name"}}
204 return event
205
206 def getChangeRestoredEvent(self):
207 event = {"type": "change-restored",
208 "change": {"project": self.project,
209 "branch": self.branch,
210 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
211 "number": str(self.number),
212 "subject": self.subject,
213 "owner": {"name": "User Name"},
214 "url": "https://hostname/3"},
215 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100216 "patchSet": self.patchsets[-1],
217 "reason": ""}
218 return event
219
220 def getChangeAbandonedEvent(self):
221 event = {"type": "change-abandoned",
222 "change": {"project": self.project,
223 "branch": self.branch,
224 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
225 "number": str(self.number),
226 "subject": self.subject,
227 "owner": {"name": "User Name"},
228 "url": "https://hostname/3"},
229 "abandoner": {"name": "User Name"},
230 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700231 "reason": ""}
232 return event
233
234 def getChangeCommentEvent(self, patchset):
235 event = {"type": "comment-added",
236 "change": {"project": self.project,
237 "branch": self.branch,
238 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
239 "number": str(self.number),
240 "subject": self.subject,
241 "owner": {"name": "User Name"},
242 "url": "https://hostname/3"},
243 "patchSet": self.patchsets[patchset - 1],
244 "author": {"name": "User Name"},
245 "approvals": [{"type": "Code-Review",
246 "description": "Code-Review",
247 "value": "0"}],
248 "comment": "This is a comment"}
249 return event
250
251 def addApproval(self, category, value, username='jenkins',
252 granted_on=None):
253 if not granted_on:
254 granted_on = time.time()
255 approval = {'description': self.categories[category][0],
256 'type': category,
257 'value': str(value),
258 'by': {
259 'username': username,
260 'email': username + '@example.com',
261 },
262 'grantedOn': int(granted_on)}
263 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
264 if x['by']['username'] == username and x['type'] == category:
265 del self.patchsets[-1]['approvals'][i]
266 self.patchsets[-1]['approvals'].append(approval)
267 event = {'approvals': [approval],
268 'author': {'email': 'user@example.com',
269 'name': 'User Name',
270 'username': 'username'},
271 'change': {'branch': self.branch,
272 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
273 'number': str(self.number),
274 'owner': {'email': 'user@example.com',
275 'name': 'User Name',
276 'username': 'username'},
277 'project': self.project,
278 'subject': self.subject,
279 'topic': 'master',
280 'url': 'https://hostname/459'},
281 'comment': '',
282 'patchSet': self.patchsets[-1],
283 'type': 'comment-added'}
284 self.data['submitRecords'] = self.getSubmitRecords()
285 return json.loads(json.dumps(event))
286
287 def getSubmitRecords(self):
288 status = {}
289 for cat in self.categories.keys():
290 status[cat] = 0
291
292 for a in self.patchsets[-1]['approvals']:
293 cur = status[a['type']]
294 cat_min, cat_max = self.categories[a['type']][1:]
295 new = int(a['value'])
296 if new == cat_min:
297 cur = new
298 elif abs(new) > abs(cur):
299 cur = new
300 status[a['type']] = cur
301
302 labels = []
303 ok = True
304 for typ, cat in self.categories.items():
305 cur = status[typ]
306 cat_min, cat_max = cat[1:]
307 if cur == cat_min:
308 value = 'REJECT'
309 ok = False
310 elif cur == cat_max:
311 value = 'OK'
312 else:
313 value = 'NEED'
314 ok = False
315 labels.append({'label': cat[0], 'status': value})
316 if ok:
317 return [{'status': 'OK'}]
318 return [{'status': 'NOT_READY',
319 'labels': labels}]
320
321 def setDependsOn(self, other, patchset):
322 self.depends_on_change = other
323 d = {'id': other.data['id'],
324 'number': other.data['number'],
325 'ref': other.patchsets[patchset - 1]['ref']
326 }
327 self.data['dependsOn'] = [d]
328
329 other.needed_by_changes.append(self)
330 needed = other.data.get('neededBy', [])
331 d = {'id': self.data['id'],
332 'number': self.data['number'],
333 'ref': self.patchsets[patchset - 1]['ref'],
334 'revision': self.patchsets[patchset - 1]['revision']
335 }
336 needed.append(d)
337 other.data['neededBy'] = needed
338
339 def query(self):
340 self.queried += 1
341 d = self.data.get('dependsOn')
342 if d:
343 d = d[0]
344 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
345 d['isCurrentPatchSet'] = True
346 else:
347 d['isCurrentPatchSet'] = False
348 return json.loads(json.dumps(self.data))
349
350 def setMerged(self):
351 if (self.depends_on_change and
352 self.depends_on_change.data['status'] != 'MERGED'):
353 return
354 if self.fail_merge:
355 return
356 self.data['status'] = 'MERGED'
357 self.open = False
358
359 path = os.path.join(self.upstream_root, self.project)
360 repo = git.Repo(path)
361 repo.heads[self.branch].commit = \
362 repo.commit(self.patchsets[-1]['revision'])
363
364 def setReported(self):
365 self.reported += 1
366
367
368class FakeGerrit(object):
369 def __init__(self, *args, **kw):
370 self.event_queue = Queue.Queue()
371 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
372 self.change_number = 0
373 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700374 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700375
376 def addFakeChange(self, project, branch, subject, status='NEW'):
377 self.change_number += 1
378 c = FakeChange(self, self.change_number, project, branch, subject,
379 upstream_root=self.upstream_root,
380 status=status)
381 self.changes[self.change_number] = c
382 return c
383
384 def addEvent(self, data):
385 return self.event_queue.put(data)
386
387 def getEvent(self):
388 return self.event_queue.get()
389
390 def eventDone(self):
391 self.event_queue.task_done()
392
393 def review(self, project, changeid, message, action):
394 number, ps = changeid.split(',')
395 change = self.changes[int(number)]
396 change.messages.append(message)
397 if 'submit' in action:
398 change.setMerged()
399 if message:
400 change.setReported()
401
402 def query(self, number):
403 change = self.changes.get(int(number))
404 if change:
405 return change.query()
406 return {}
407
James E. Blairc494d542014-08-06 09:23:52 -0700408 def simpleQuery(self, query):
409 # This is currently only used to return all open changes for a
410 # project
James E. Blairf8ff9932014-08-15 15:24:24 -0700411 self.queries.append(query)
412 l = [change.query() for change in self.changes.values()]
413 l.append({"type":"stats","rowCount":1,"runTimeMilliseconds":3})
414 return l
James E. Blairc494d542014-08-06 09:23:52 -0700415
Clark Boylanb640e052014-04-03 16:41:46 -0700416 def startWatching(self, *args, **kw):
417 pass
418
419
420class BuildHistory(object):
421 def __init__(self, **kw):
422 self.__dict__.update(kw)
423
424 def __repr__(self):
425 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
426 (self.result, self.name, self.number, self.changes))
427
428
429class FakeURLOpener(object):
430 def __init__(self, upstream_root, fake_gerrit, url):
431 self.upstream_root = upstream_root
432 self.fake_gerrit = fake_gerrit
433 self.url = url
434
435 def read(self):
436 res = urlparse.urlparse(self.url)
437 path = res.path
438 project = '/'.join(path.split('/')[2:-2])
439 ret = '001e# service=git-upload-pack\n'
440 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
441 'multi_ack thin-pack side-band side-band-64k ofs-delta '
442 'shallow no-progress include-tag multi_ack_detailed no-done\n')
443 path = os.path.join(self.upstream_root, project)
444 repo = git.Repo(path)
445 for ref in repo.refs:
446 r = ref.object.hexsha + ' ' + ref.path + '\n'
447 ret += '%04x%s' % (len(r) + 4, r)
448 ret += '0000'
449 return ret
450
451
452class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
453 name = 'gerrit'
454
455 def __init__(self, upstream_root, *args):
456 super(FakeGerritTrigger, self).__init__(*args)
457 self.upstream_root = upstream_root
458
459 def getGitUrl(self, project):
460 return os.path.join(self.upstream_root, project.name)
461
462
463class FakeStatsd(threading.Thread):
464 def __init__(self):
465 threading.Thread.__init__(self)
466 self.daemon = True
467 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
468 self.sock.bind(('', 0))
469 self.port = self.sock.getsockname()[1]
470 self.wake_read, self.wake_write = os.pipe()
471 self.stats = []
472
473 def run(self):
474 while True:
475 poll = select.poll()
476 poll.register(self.sock, select.POLLIN)
477 poll.register(self.wake_read, select.POLLIN)
478 ret = poll.poll()
479 for (fd, event) in ret:
480 if fd == self.sock.fileno():
481 data = self.sock.recvfrom(1024)
482 if not data:
483 return
484 self.stats.append(data[0])
485 if fd == self.wake_read:
486 return
487
488 def stop(self):
489 os.write(self.wake_write, '1\n')
490
491
492class FakeBuild(threading.Thread):
493 log = logging.getLogger("zuul.test")
494
495 def __init__(self, worker, job, number, node):
496 threading.Thread.__init__(self)
497 self.daemon = True
498 self.worker = worker
499 self.job = job
500 self.name = job.name.split(':')[1]
501 self.number = number
502 self.node = node
503 self.parameters = json.loads(job.arguments)
504 self.unique = self.parameters['ZUUL_UUID']
505 self.wait_condition = threading.Condition()
506 self.waiting = False
507 self.aborted = False
508 self.created = time.time()
509 self.description = ''
510 self.run_error = False
511
512 def release(self):
513 self.wait_condition.acquire()
514 self.wait_condition.notify()
515 self.waiting = False
516 self.log.debug("Build %s released" % self.unique)
517 self.wait_condition.release()
518
519 def isWaiting(self):
520 self.wait_condition.acquire()
521 if self.waiting:
522 ret = True
523 else:
524 ret = False
525 self.wait_condition.release()
526 return ret
527
528 def _wait(self):
529 self.wait_condition.acquire()
530 self.waiting = True
531 self.log.debug("Build %s waiting" % self.unique)
532 self.wait_condition.wait()
533 self.wait_condition.release()
534
535 def run(self):
536 data = {
537 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
538 'name': self.name,
539 'number': self.number,
540 'manager': self.worker.worker_id,
541 'worker_name': 'My Worker',
542 'worker_hostname': 'localhost',
543 'worker_ips': ['127.0.0.1', '192.168.1.1'],
544 'worker_fqdn': 'zuul.example.org',
545 'worker_program': 'FakeBuilder',
546 'worker_version': 'v1.1',
547 'worker_extra': {'something': 'else'}
548 }
549
550 self.log.debug('Running build %s' % self.unique)
551
552 self.job.sendWorkData(json.dumps(data))
553 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
554 self.job.sendWorkStatus(0, 100)
555
556 if self.worker.hold_jobs_in_build:
557 self.log.debug('Holding build %s' % self.unique)
558 self._wait()
559 self.log.debug("Build %s continuing" % self.unique)
560
561 self.worker.lock.acquire()
562
563 result = 'SUCCESS'
564 if (('ZUUL_REF' in self.parameters) and
565 self.worker.shouldFailTest(self.name,
566 self.parameters['ZUUL_REF'])):
567 result = 'FAILURE'
568 if self.aborted:
569 result = 'ABORTED'
570
571 if self.run_error:
572 work_fail = True
573 result = 'RUN_ERROR'
574 else:
575 data['result'] = result
576 work_fail = False
577
578 changes = None
579 if 'ZUUL_CHANGE_IDS' in self.parameters:
580 changes = self.parameters['ZUUL_CHANGE_IDS']
581
582 self.worker.build_history.append(
583 BuildHistory(name=self.name, number=self.number,
584 result=result, changes=changes, node=self.node,
585 uuid=self.unique, description=self.description,
586 pipeline=self.parameters['ZUUL_PIPELINE'])
587 )
588
589 self.job.sendWorkData(json.dumps(data))
590 if work_fail:
591 self.job.sendWorkFail()
592 else:
593 self.job.sendWorkComplete(json.dumps(data))
594 del self.worker.gearman_jobs[self.job.unique]
595 self.worker.running_builds.remove(self)
596 self.worker.lock.release()
597
598
599class FakeWorker(gear.Worker):
600 def __init__(self, worker_id, test):
601 super(FakeWorker, self).__init__(worker_id)
602 self.gearman_jobs = {}
603 self.build_history = []
604 self.running_builds = []
605 self.build_counter = 0
606 self.fail_tests = {}
607 self.test = test
608
609 self.hold_jobs_in_build = False
610 self.lock = threading.Lock()
611 self.__work_thread = threading.Thread(target=self.work)
612 self.__work_thread.daemon = True
613 self.__work_thread.start()
614
615 def handleJob(self, job):
616 parts = job.name.split(":")
617 cmd = parts[0]
618 name = parts[1]
619 if len(parts) > 2:
620 node = parts[2]
621 else:
622 node = None
623 if cmd == 'build':
624 self.handleBuild(job, name, node)
625 elif cmd == 'stop':
626 self.handleStop(job, name)
627 elif cmd == 'set_description':
628 self.handleSetDescription(job, name)
629
630 def handleBuild(self, job, name, node):
631 build = FakeBuild(self, job, self.build_counter, node)
632 job.build = build
633 self.gearman_jobs[job.unique] = job
634 self.build_counter += 1
635
636 self.running_builds.append(build)
637 build.start()
638
639 def handleStop(self, job, name):
640 self.log.debug("handle stop")
641 parameters = json.loads(job.arguments)
642 name = parameters['name']
643 number = parameters['number']
644 for build in self.running_builds:
645 if build.name == name and build.number == number:
646 build.aborted = True
647 build.release()
648 job.sendWorkComplete()
649 return
650 job.sendWorkFail()
651
652 def handleSetDescription(self, job, name):
653 self.log.debug("handle set description")
654 parameters = json.loads(job.arguments)
655 name = parameters['name']
656 number = parameters['number']
657 descr = parameters['html_description']
658 for build in self.running_builds:
659 if build.name == name and build.number == number:
660 build.description = descr
661 job.sendWorkComplete()
662 return
663 for build in self.build_history:
664 if build.name == name and build.number == number:
665 build.description = descr
666 job.sendWorkComplete()
667 return
668 job.sendWorkFail()
669
670 def work(self):
671 while self.running:
672 try:
673 job = self.getJob()
674 except gear.InterruptedError:
675 continue
676 try:
677 self.handleJob(job)
678 except:
679 self.log.exception("Worker exception:")
680
681 def addFailTest(self, name, change):
682 l = self.fail_tests.get(name, [])
683 l.append(change)
684 self.fail_tests[name] = l
685
686 def shouldFailTest(self, name, ref):
687 l = self.fail_tests.get(name, [])
688 for change in l:
689 if self.test.ref_has_change(ref, change):
690 return True
691 return False
692
693 def release(self, regex=None):
694 builds = self.running_builds[:]
695 self.log.debug("releasing build %s (%s)" % (regex,
696 len(self.running_builds)))
697 for build in builds:
698 if not regex or re.match(regex, build.name):
699 self.log.debug("releasing build %s" %
700 (build.parameters['ZUUL_UUID']))
701 build.release()
702 else:
703 self.log.debug("not releasing build %s" %
704 (build.parameters['ZUUL_UUID']))
705 self.log.debug("done releasing builds %s (%s)" %
706 (regex, len(self.running_builds)))
707
708
709class FakeGearmanServer(gear.Server):
710 def __init__(self):
711 self.hold_jobs_in_queue = False
712 super(FakeGearmanServer, self).__init__(0)
713
714 def getJobForConnection(self, connection, peek=False):
715 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
716 for job in queue:
717 if not hasattr(job, 'waiting'):
718 if job.name.startswith('build:'):
719 job.waiting = self.hold_jobs_in_queue
720 else:
721 job.waiting = False
722 if job.waiting:
723 continue
724 if job.name in connection.functions:
725 if not peek:
726 queue.remove(job)
727 connection.related_jobs[job.handle] = job
728 job.worker_connection = connection
729 job.running = True
730 return job
731 return None
732
733 def release(self, regex=None):
734 released = False
735 qlen = (len(self.high_queue) + len(self.normal_queue) +
736 len(self.low_queue))
737 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
738 for job in self.getQueue():
739 cmd, name = job.name.split(':')
740 if cmd != 'build':
741 continue
742 if not regex or re.match(regex, name):
743 self.log.debug("releasing queued job %s" %
744 job.unique)
745 job.waiting = False
746 released = True
747 else:
748 self.log.debug("not releasing queued job %s" %
749 job.unique)
750 if released:
751 self.wakeConnections()
752 qlen = (len(self.high_queue) + len(self.normal_queue) +
753 len(self.low_queue))
754 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
755
756
757class FakeSMTP(object):
758 log = logging.getLogger('zuul.FakeSMTP')
759
760 def __init__(self, messages, server, port):
761 self.server = server
762 self.port = port
763 self.messages = messages
764
765 def sendmail(self, from_email, to_email, msg):
766 self.log.info("Sending email from %s, to %s, with msg %s" % (
767 from_email, to_email, msg))
768
769 headers = msg.split('\n\n', 1)[0]
770 body = msg.split('\n\n', 1)[1]
771
772 self.messages.append(dict(
773 from_email=from_email,
774 to_email=to_email,
775 msg=msg,
776 headers=headers,
777 body=body,
778 ))
779
780 return True
781
782 def quit(self):
783 return True
784
785
786class FakeSwiftClientConnection(swiftclient.client.Connection):
787 def post_account(self, headers):
788 # Do nothing
789 pass
790
791 def get_auth(self):
792 # Returns endpoint and (unused) auth token
793 endpoint = os.path.join('https://storage.example.org', 'V1',
794 'AUTH_account')
795 return endpoint, ''
796
797
798class ZuulTestCase(testtools.TestCase):
799 log = logging.getLogger("zuul.test")
800
801 def setUp(self):
802 super(ZuulTestCase, self).setUp()
803 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
804 try:
805 test_timeout = int(test_timeout)
806 except ValueError:
807 # If timeout value is invalid do not set a timeout.
808 test_timeout = 0
809 if test_timeout > 0:
810 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
811
812 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
813 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
814 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
815 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
816 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
817 os.environ.get('OS_STDERR_CAPTURE') == '1'):
818 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
819 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
820 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
821 os.environ.get('OS_LOG_CAPTURE') == '1'):
822 self.useFixture(fixtures.FakeLogger(
823 level=logging.DEBUG,
824 format='%(asctime)s %(name)-32s '
825 '%(levelname)-8s %(message)s'))
James E. Blair97d902e2014-08-21 13:25:56 -0700826 if USE_TEMPDIR:
827 tmp_root = self.useFixture(fixtures.TempDir(
828 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
829 else:
830 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700831 self.test_root = os.path.join(tmp_root, "zuul-test")
832 self.upstream_root = os.path.join(self.test_root, "upstream")
833 self.git_root = os.path.join(self.test_root, "git")
834
835 if os.path.exists(self.test_root):
836 shutil.rmtree(self.test_root)
837 os.makedirs(self.test_root)
838 os.makedirs(self.upstream_root)
839 os.makedirs(self.git_root)
840
841 # Make per test copy of Configuration.
842 self.setup_config()
843 self.config.set('zuul', 'layout_config',
844 os.path.join(FIXTURE_DIR, "layout.yaml"))
845 self.config.set('merger', 'git_dir', self.git_root)
846
847 # For each project in config:
848 self.init_repo("org/project")
849 self.init_repo("org/project1")
850 self.init_repo("org/project2")
851 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700852 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700853 self.init_repo("org/project5")
854 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700855 self.init_repo("org/one-job-project")
856 self.init_repo("org/nonvoting-project")
857 self.init_repo("org/templated-project")
858 self.init_repo("org/layered-project")
859 self.init_repo("org/node-project")
860 self.init_repo("org/conflict-project")
861 self.init_repo("org/noop-project")
862 self.init_repo("org/experimental-project")
863
864 self.statsd = FakeStatsd()
865 os.environ['STATSD_HOST'] = 'localhost'
866 os.environ['STATSD_PORT'] = str(self.statsd.port)
867 self.statsd.start()
868 # the statsd client object is configured in the statsd module import
869 reload(statsd)
870 reload(zuul.scheduler)
871
872 self.gearman_server = FakeGearmanServer()
873
874 self.config.set('gearman', 'port', str(self.gearman_server.port))
875
876 self.worker = FakeWorker('fake_worker', self)
877 self.worker.addServer('127.0.0.1', self.gearman_server.port)
878 self.gearman_server.worker = self.worker
879
880 self.merge_server = zuul.merger.server.MergeServer(self.config)
881 self.merge_server.start()
882
883 self.sched = zuul.scheduler.Scheduler()
884
885 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
886 FakeSwiftClientConnection))
887 self.swift = zuul.lib.swift.Swift(self.config)
888
889 def URLOpenerFactory(*args, **kw):
890 if isinstance(args[0], urllib2.Request):
891 return old_urlopen(*args, **kw)
892 args = [self.fake_gerrit] + list(args)
893 return FakeURLOpener(self.upstream_root, *args, **kw)
894
895 old_urlopen = urllib2.urlopen
896 urllib2.urlopen = URLOpenerFactory
897
898 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
899 self.swift)
900 self.merge_client = zuul.merger.client.MergeClient(
901 self.config, self.sched)
902
903 self.smtp_messages = []
904
905 def FakeSMTPFactory(*args, **kw):
906 args = [self.smtp_messages] + list(args)
907 return FakeSMTP(*args, **kw)
908
909 zuul.lib.gerrit.Gerrit = FakeGerrit
910 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
911
912 self.gerrit = FakeGerritTrigger(
913 self.upstream_root, self.config, self.sched)
914 self.gerrit.replication_timeout = 1.5
915 self.gerrit.replication_retry_interval = 0.5
916 self.fake_gerrit = self.gerrit.gerrit
917 self.fake_gerrit.upstream_root = self.upstream_root
918
919 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
920 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
921
922 self.sched.setLauncher(self.launcher)
923 self.sched.setMerger(self.merge_client)
924 self.sched.registerTrigger(self.gerrit)
925 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
926 self.sched.registerTrigger(self.timer)
James E. Blairc494d542014-08-06 09:23:52 -0700927 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config, self.sched)
928 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700929
930 self.sched.registerReporter(
931 zuul.reporter.gerrit.Reporter(self.gerrit))
932 self.smtp_reporter = zuul.reporter.smtp.Reporter(
933 self.config.get('smtp', 'default_from'),
934 self.config.get('smtp', 'default_to'),
935 self.config.get('smtp', 'server'))
936 self.sched.registerReporter(self.smtp_reporter)
937
938 self.sched.start()
939 self.sched.reconfigure(self.config)
940 self.sched.resume()
941 self.webapp.start()
942 self.rpc.start()
943 self.launcher.gearman.waitForServer()
944 self.registerJobs()
945 self.builds = self.worker.running_builds
946 self.history = self.worker.build_history
947
948 self.addCleanup(self.assertFinalState)
949 self.addCleanup(self.shutdown)
950
951 def setup_config(self):
952 """Per test config object. Override to set different config."""
953 self.config = ConfigParser.ConfigParser()
954 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
955
956 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700957 # Make sure that git.Repo objects have been garbage collected.
958 repos = []
959 gc.collect()
960 for obj in gc.get_objects():
961 if isinstance(obj, git.Repo):
962 repos.append(obj)
963 self.assertEqual(len(repos), 0)
964 self.assertEmptyQueues()
965
966 def shutdown(self):
967 self.log.debug("Shutting down after tests")
968 self.launcher.stop()
969 self.merge_server.stop()
970 self.merge_server.join()
971 self.merge_client.stop()
972 self.worker.shutdown()
973 self.gerrit.stop()
974 self.timer.stop()
975 self.sched.stop()
976 self.sched.join()
977 self.statsd.stop()
978 self.statsd.join()
979 self.webapp.stop()
980 self.webapp.join()
981 self.rpc.stop()
982 self.rpc.join()
983 self.gearman_server.shutdown()
984 threads = threading.enumerate()
985 if len(threads) > 1:
986 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -0700987
988 def init_repo(self, project):
989 parts = project.split('/')
990 path = os.path.join(self.upstream_root, *parts[:-1])
991 if not os.path.exists(path):
992 os.makedirs(path)
993 path = os.path.join(self.upstream_root, project)
994 repo = git.Repo.init(path)
995
996 repo.config_writer().set_value('user', 'email', 'user@example.com')
997 repo.config_writer().set_value('user', 'name', 'User Name')
998 repo.config_writer().write()
999
1000 fn = os.path.join(path, 'README')
1001 f = open(fn, 'w')
1002 f.write("test\n")
1003 f.close()
1004 repo.index.add([fn])
1005 repo.index.commit('initial commit')
1006 master = repo.create_head('master')
1007 repo.create_tag('init')
1008
James E. Blair97d902e2014-08-21 13:25:56 -07001009 repo.head.reference = master
1010 repo.head.reset(index=True, working_tree=True)
1011 repo.git.clean('-x', '-f', '-d')
1012
1013 self.create_branch(project, 'mp')
1014
1015 def create_branch(self, project, branch):
1016 path = os.path.join(self.upstream_root, project)
1017 repo = git.Repo.init(path)
1018 fn = os.path.join(path, 'README')
1019
1020 branch_head = repo.create_head(branch)
1021 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001022 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001023 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001024 f.close()
1025 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001026 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001027
James E. Blair97d902e2014-08-21 13:25:56 -07001028 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001029 repo.head.reset(index=True, working_tree=True)
1030 repo.git.clean('-x', '-f', '-d')
1031
1032 def ref_has_change(self, ref, change):
1033 path = os.path.join(self.git_root, change.project)
1034 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001035 try:
1036 for commit in repo.iter_commits(ref):
1037 if commit.message.strip() == ('%s-1' % change.subject):
1038 return True
1039 except GitCommandError:
1040 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001041 return False
1042
1043 def job_has_changes(self, *args):
1044 job = args[0]
1045 commits = args[1:]
1046 if isinstance(job, FakeBuild):
1047 parameters = job.parameters
1048 else:
1049 parameters = json.loads(job.arguments)
1050 project = parameters['ZUUL_PROJECT']
1051 path = os.path.join(self.git_root, project)
1052 repo = git.Repo(path)
1053 ref = parameters['ZUUL_REF']
1054 sha = parameters['ZUUL_COMMIT']
1055 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1056 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1057 commit_messages = ['%s-1' % commit.subject for commit in commits]
1058 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1059 " repo_messages %s; sha %s" % (job, commit_messages,
1060 repo_messages, sha))
1061 for msg in commit_messages:
1062 if msg not in repo_messages:
1063 self.log.debug(" messages do not match")
1064 return False
1065 if repo_shas[0] != sha:
1066 self.log.debug(" sha does not match")
1067 return False
1068 self.log.debug(" OK")
1069 return True
1070
1071 def registerJobs(self):
1072 count = 0
1073 for job in self.sched.layout.jobs.keys():
1074 self.worker.registerFunction('build:' + job)
1075 count += 1
1076 self.worker.registerFunction('stop:' + self.worker.worker_id)
1077 count += 1
1078
1079 while len(self.gearman_server.functions) < count:
1080 time.sleep(0)
1081
1082 def release(self, job):
1083 if isinstance(job, FakeBuild):
1084 job.release()
1085 else:
1086 job.waiting = False
1087 self.log.debug("Queued job %s released" % job.unique)
1088 self.gearman_server.wakeConnections()
1089
1090 def getParameter(self, job, name):
1091 if isinstance(job, FakeBuild):
1092 return job.parameters[name]
1093 else:
1094 parameters = json.loads(job.arguments)
1095 return parameters[name]
1096
1097 def resetGearmanServer(self):
1098 self.worker.setFunctions([])
1099 while True:
1100 done = True
1101 for connection in self.gearman_server.active_connections:
1102 if (connection.functions and
1103 connection.client_id not in ['Zuul RPC Listener',
1104 'Zuul Merger']):
1105 done = False
1106 if done:
1107 break
1108 time.sleep(0)
1109 self.gearman_server.functions = set()
1110 self.rpc.register()
1111 self.merge_server.register()
1112
1113 def haveAllBuildsReported(self):
1114 # See if Zuul is waiting on a meta job to complete
1115 if self.launcher.meta_jobs:
1116 return False
1117 # Find out if every build that the worker has completed has been
1118 # reported back to Zuul. If it hasn't then that means a Gearman
1119 # event is still in transit and the system is not stable.
1120 for build in self.worker.build_history:
1121 zbuild = self.launcher.builds.get(build.uuid)
1122 if not zbuild:
1123 # It has already been reported
1124 continue
1125 # It hasn't been reported yet.
1126 return False
1127 # Make sure that none of the worker connections are in GRAB_WAIT
1128 for connection in self.worker.active_connections:
1129 if connection.state == 'GRAB_WAIT':
1130 return False
1131 return True
1132
1133 def areAllBuildsWaiting(self):
1134 ret = True
1135
1136 builds = self.launcher.builds.values()
1137 for build in builds:
1138 client_job = None
1139 for conn in self.launcher.gearman.active_connections:
1140 for j in conn.related_jobs.values():
1141 if j.unique == build.uuid:
1142 client_job = j
1143 break
1144 if not client_job:
1145 self.log.debug("%s is not known to the gearman client" %
1146 build)
1147 ret = False
1148 continue
1149 if not client_job.handle:
1150 self.log.debug("%s has no handle" % client_job)
1151 ret = False
1152 continue
1153 server_job = self.gearman_server.jobs.get(client_job.handle)
1154 if not server_job:
1155 self.log.debug("%s is not known to the gearman server" %
1156 client_job)
1157 ret = False
1158 continue
1159 if not hasattr(server_job, 'waiting'):
1160 self.log.debug("%s is being enqueued" % server_job)
1161 ret = False
1162 continue
1163 if server_job.waiting:
1164 continue
1165 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1166 if worker_job:
1167 if worker_job.build.isWaiting():
1168 continue
1169 else:
1170 self.log.debug("%s is running" % worker_job)
1171 ret = False
1172 else:
1173 self.log.debug("%s is unassigned" % server_job)
1174 ret = False
1175 return ret
1176
1177 def waitUntilSettled(self):
1178 self.log.debug("Waiting until settled...")
1179 start = time.time()
1180 while True:
1181 if time.time() - start > 10:
1182 print 'queue status:',
1183 print self.sched.trigger_event_queue.empty(),
1184 print self.sched.result_event_queue.empty(),
1185 print self.fake_gerrit.event_queue.empty(),
1186 print self.areAllBuildsWaiting()
1187 raise Exception("Timeout waiting for Zuul to settle")
1188 # Make sure no new events show up while we're checking
1189 self.worker.lock.acquire()
1190 # have all build states propogated to zuul?
1191 if self.haveAllBuildsReported():
1192 # Join ensures that the queue is empty _and_ events have been
1193 # processed
1194 self.fake_gerrit.event_queue.join()
1195 self.sched.trigger_event_queue.join()
1196 self.sched.result_event_queue.join()
1197 self.sched.run_handler_lock.acquire()
1198 if (self.sched.trigger_event_queue.empty() and
1199 self.sched.result_event_queue.empty() and
1200 self.fake_gerrit.event_queue.empty() and
1201 not self.merge_client.build_sets and
1202 self.haveAllBuildsReported() and
1203 self.areAllBuildsWaiting()):
1204 self.sched.run_handler_lock.release()
1205 self.worker.lock.release()
1206 self.log.debug("...settled.")
1207 return
1208 self.sched.run_handler_lock.release()
1209 self.worker.lock.release()
1210 self.sched.wake_event.wait(0.1)
1211
1212 def countJobResults(self, jobs, result):
1213 jobs = filter(lambda x: x.result == result, jobs)
1214 return len(jobs)
1215
1216 def getJobFromHistory(self, name):
1217 history = self.worker.build_history
1218 for job in history:
1219 if job.name == name:
1220 return job
1221 raise Exception("Unable to find job %s in history" % name)
1222
1223 def assertEmptyQueues(self):
1224 # Make sure there are no orphaned jobs
1225 for pipeline in self.sched.layout.pipelines.values():
1226 for queue in pipeline.queues:
1227 if len(queue.queue) != 0:
1228 print 'pipeline %s queue %s contents %s' % (
1229 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001230 self.assertEqual(len(queue.queue), 0,
1231 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001232
1233 def assertReportedStat(self, key, value=None, kind=None):
1234 start = time.time()
1235 while time.time() < (start + 5):
1236 for stat in self.statsd.stats:
1237 pprint.pprint(self.statsd.stats)
1238 k, v = stat.split(':')
1239 if key == k:
1240 if value is None and kind is None:
1241 return
1242 elif value:
1243 if value == v:
1244 return
1245 elif kind:
1246 if v.endswith('|' + kind):
1247 return
1248 time.sleep(0.1)
1249
1250 pprint.pprint(self.statsd.stats)
1251 raise Exception("Key %s not found in reported stats" % key)