blob: 1b8294486337fbc6d9168834c1ded11b6746632a [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
43
44import zuul.scheduler
45import zuul.webapp
46import zuul.rpclistener
47import zuul.launcher.gearman
48import zuul.lib.swift
49import zuul.merger.server
50import zuul.merger.client
51import zuul.reporter.gerrit
52import zuul.reporter.smtp
53import zuul.trigger.gerrit
54import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070055import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070056
57FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
58 'fixtures')
59
60logging.basicConfig(level=logging.DEBUG,
61 format='%(asctime)s %(name)-32s '
62 '%(levelname)-8s %(message)s')
63
64
65def repack_repo(path):
66 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
67 output = subprocess.Popen(cmd, close_fds=True,
68 stdout=subprocess.PIPE,
69 stderr=subprocess.PIPE)
70 out = output.communicate()
71 if output.returncode:
72 raise Exception("git repack returned %d" % output.returncode)
73 return out
74
75
76def random_sha1():
77 return hashlib.sha1(str(random.random())).hexdigest()
78
79
80class ChangeReference(git.Reference):
81 _common_path_default = "refs/changes"
82 _points_to_commits_only = True
83
84
85class FakeChange(object):
86 categories = {'APRV': ('Approved', -1, 1),
87 'CRVW': ('Code-Review', -2, 2),
88 'VRFY': ('Verified', -2, 2)}
89
90 def __init__(self, gerrit, number, project, branch, subject,
91 status='NEW', upstream_root=None):
92 self.gerrit = gerrit
93 self.reported = 0
94 self.queried = 0
95 self.patchsets = []
96 self.number = number
97 self.project = project
98 self.branch = branch
99 self.subject = subject
100 self.latest_patchset = 0
101 self.depends_on_change = None
102 self.needed_by_changes = []
103 self.fail_merge = False
104 self.messages = []
105 self.data = {
106 'branch': branch,
107 'comments': [],
108 'commitMessage': subject,
109 'createdOn': time.time(),
110 'id': 'I' + random_sha1(),
111 'lastUpdated': time.time(),
112 'number': str(number),
113 'open': status == 'NEW',
114 'owner': {'email': 'user@example.com',
115 'name': 'User Name',
116 'username': 'username'},
117 'patchSets': self.patchsets,
118 'project': project,
119 'status': status,
120 'subject': subject,
121 'submitRecords': [],
122 'url': 'https://hostname/%s' % number}
123
124 self.upstream_root = upstream_root
125 self.addPatchset()
126 self.data['submitRecords'] = self.getSubmitRecords()
127 self.open = status == 'NEW'
128
129 def add_fake_change_to_repo(self, msg, fn, large):
130 path = os.path.join(self.upstream_root, self.project)
131 repo = git.Repo(path)
132 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
133 self.latest_patchset),
134 'refs/tags/init')
135 repo.head.reference = ref
136 repo.head.reset(index=True, working_tree=True)
137 repo.git.clean('-x', '-f', '-d')
138
139 path = os.path.join(self.upstream_root, self.project)
140 if not large:
141 fn = os.path.join(path, fn)
142 f = open(fn, 'w')
143 f.write("test %s %s %s\n" %
144 (self.branch, self.number, self.latest_patchset))
145 f.close()
146 repo.index.add([fn])
147 else:
148 for fni in range(100):
149 fn = os.path.join(path, str(fni))
150 f = open(fn, 'w')
151 for ci in range(4096):
152 f.write(random.choice(string.printable))
153 f.close()
154 repo.index.add([fn])
155
156 r = repo.index.commit(msg)
157 repo.head.reference = 'master'
158 repo.head.reset(index=True, working_tree=True)
159 repo.git.clean('-x', '-f', '-d')
160 repo.heads['master'].checkout()
161 return r
162
163 def addPatchset(self, files=[], large=False):
164 self.latest_patchset += 1
165 if files:
166 fn = files[0]
167 else:
168 fn = '%s-%s' % (self.branch, self.number)
169 msg = self.subject + '-' + str(self.latest_patchset)
170 c = self.add_fake_change_to_repo(msg, fn, large)
171 ps_files = [{'file': '/COMMIT_MSG',
172 'type': 'ADDED'},
173 {'file': 'README',
174 'type': 'MODIFIED'}]
175 for f in files:
176 ps_files.append({'file': f, 'type': 'ADDED'})
177 d = {'approvals': [],
178 'createdOn': time.time(),
179 'files': ps_files,
180 'number': str(self.latest_patchset),
181 'ref': 'refs/changes/1/%s/%s' % (self.number,
182 self.latest_patchset),
183 'revision': c.hexsha,
184 'uploader': {'email': 'user@example.com',
185 'name': 'User name',
186 'username': 'user'}}
187 self.data['currentPatchSet'] = d
188 self.patchsets.append(d)
189 self.data['submitRecords'] = self.getSubmitRecords()
190
191 def getPatchsetCreatedEvent(self, patchset):
192 event = {"type": "patchset-created",
193 "change": {"project": self.project,
194 "branch": self.branch,
195 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
196 "number": str(self.number),
197 "subject": self.subject,
198 "owner": {"name": "User Name"},
199 "url": "https://hostname/3"},
200 "patchSet": self.patchsets[patchset - 1],
201 "uploader": {"name": "User Name"}}
202 return event
203
204 def getChangeRestoredEvent(self):
205 event = {"type": "change-restored",
206 "change": {"project": self.project,
207 "branch": self.branch,
208 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
209 "number": str(self.number),
210 "subject": self.subject,
211 "owner": {"name": "User Name"},
212 "url": "https://hostname/3"},
213 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100214 "patchSet": self.patchsets[-1],
215 "reason": ""}
216 return event
217
218 def getChangeAbandonedEvent(self):
219 event = {"type": "change-abandoned",
220 "change": {"project": self.project,
221 "branch": self.branch,
222 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
223 "number": str(self.number),
224 "subject": self.subject,
225 "owner": {"name": "User Name"},
226 "url": "https://hostname/3"},
227 "abandoner": {"name": "User Name"},
228 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700229 "reason": ""}
230 return event
231
232 def getChangeCommentEvent(self, patchset):
233 event = {"type": "comment-added",
234 "change": {"project": self.project,
235 "branch": self.branch,
236 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
237 "number": str(self.number),
238 "subject": self.subject,
239 "owner": {"name": "User Name"},
240 "url": "https://hostname/3"},
241 "patchSet": self.patchsets[patchset - 1],
242 "author": {"name": "User Name"},
243 "approvals": [{"type": "Code-Review",
244 "description": "Code-Review",
245 "value": "0"}],
246 "comment": "This is a comment"}
247 return event
248
249 def addApproval(self, category, value, username='jenkins',
250 granted_on=None):
251 if not granted_on:
252 granted_on = time.time()
253 approval = {'description': self.categories[category][0],
254 'type': category,
255 'value': str(value),
256 'by': {
257 'username': username,
258 'email': username + '@example.com',
259 },
260 'grantedOn': int(granted_on)}
261 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
262 if x['by']['username'] == username and x['type'] == category:
263 del self.patchsets[-1]['approvals'][i]
264 self.patchsets[-1]['approvals'].append(approval)
265 event = {'approvals': [approval],
266 'author': {'email': 'user@example.com',
267 'name': 'User Name',
268 'username': 'username'},
269 'change': {'branch': self.branch,
270 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
271 'number': str(self.number),
272 'owner': {'email': 'user@example.com',
273 'name': 'User Name',
274 'username': 'username'},
275 'project': self.project,
276 'subject': self.subject,
277 'topic': 'master',
278 'url': 'https://hostname/459'},
279 'comment': '',
280 'patchSet': self.patchsets[-1],
281 'type': 'comment-added'}
282 self.data['submitRecords'] = self.getSubmitRecords()
283 return json.loads(json.dumps(event))
284
285 def getSubmitRecords(self):
286 status = {}
287 for cat in self.categories.keys():
288 status[cat] = 0
289
290 for a in self.patchsets[-1]['approvals']:
291 cur = status[a['type']]
292 cat_min, cat_max = self.categories[a['type']][1:]
293 new = int(a['value'])
294 if new == cat_min:
295 cur = new
296 elif abs(new) > abs(cur):
297 cur = new
298 status[a['type']] = cur
299
300 labels = []
301 ok = True
302 for typ, cat in self.categories.items():
303 cur = status[typ]
304 cat_min, cat_max = cat[1:]
305 if cur == cat_min:
306 value = 'REJECT'
307 ok = False
308 elif cur == cat_max:
309 value = 'OK'
310 else:
311 value = 'NEED'
312 ok = False
313 labels.append({'label': cat[0], 'status': value})
314 if ok:
315 return [{'status': 'OK'}]
316 return [{'status': 'NOT_READY',
317 'labels': labels}]
318
319 def setDependsOn(self, other, patchset):
320 self.depends_on_change = other
321 d = {'id': other.data['id'],
322 'number': other.data['number'],
323 'ref': other.patchsets[patchset - 1]['ref']
324 }
325 self.data['dependsOn'] = [d]
326
327 other.needed_by_changes.append(self)
328 needed = other.data.get('neededBy', [])
329 d = {'id': self.data['id'],
330 'number': self.data['number'],
331 'ref': self.patchsets[patchset - 1]['ref'],
332 'revision': self.patchsets[patchset - 1]['revision']
333 }
334 needed.append(d)
335 other.data['neededBy'] = needed
336
337 def query(self):
338 self.queried += 1
339 d = self.data.get('dependsOn')
340 if d:
341 d = d[0]
342 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
343 d['isCurrentPatchSet'] = True
344 else:
345 d['isCurrentPatchSet'] = False
346 return json.loads(json.dumps(self.data))
347
348 def setMerged(self):
349 if (self.depends_on_change and
350 self.depends_on_change.data['status'] != 'MERGED'):
351 return
352 if self.fail_merge:
353 return
354 self.data['status'] = 'MERGED'
355 self.open = False
356
357 path = os.path.join(self.upstream_root, self.project)
358 repo = git.Repo(path)
359 repo.heads[self.branch].commit = \
360 repo.commit(self.patchsets[-1]['revision'])
361
362 def setReported(self):
363 self.reported += 1
364
365
366class FakeGerrit(object):
367 def __init__(self, *args, **kw):
368 self.event_queue = Queue.Queue()
369 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
370 self.change_number = 0
371 self.changes = {}
372
373 def addFakeChange(self, project, branch, subject, status='NEW'):
374 self.change_number += 1
375 c = FakeChange(self, self.change_number, project, branch, subject,
376 upstream_root=self.upstream_root,
377 status=status)
378 self.changes[self.change_number] = c
379 return c
380
381 def addEvent(self, data):
382 return self.event_queue.put(data)
383
384 def getEvent(self):
385 return self.event_queue.get()
386
387 def eventDone(self):
388 self.event_queue.task_done()
389
390 def review(self, project, changeid, message, action):
391 number, ps = changeid.split(',')
392 change = self.changes[int(number)]
393 change.messages.append(message)
394 if 'submit' in action:
395 change.setMerged()
396 if message:
397 change.setReported()
398
399 def query(self, number):
400 change = self.changes.get(int(number))
401 if change:
402 return change.query()
403 return {}
404
James E. Blairc494d542014-08-06 09:23:52 -0700405 def simpleQuery(self, query):
406 # This is currently only used to return all open changes for a
407 # project
408 return [change.query() for change in self.changes.values()]
409
Clark Boylanb640e052014-04-03 16:41:46 -0700410 def startWatching(self, *args, **kw):
411 pass
412
413
414class BuildHistory(object):
415 def __init__(self, **kw):
416 self.__dict__.update(kw)
417
418 def __repr__(self):
419 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
420 (self.result, self.name, self.number, self.changes))
421
422
423class FakeURLOpener(object):
424 def __init__(self, upstream_root, fake_gerrit, url):
425 self.upstream_root = upstream_root
426 self.fake_gerrit = fake_gerrit
427 self.url = url
428
429 def read(self):
430 res = urlparse.urlparse(self.url)
431 path = res.path
432 project = '/'.join(path.split('/')[2:-2])
433 ret = '001e# service=git-upload-pack\n'
434 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
435 'multi_ack thin-pack side-band side-band-64k ofs-delta '
436 'shallow no-progress include-tag multi_ack_detailed no-done\n')
437 path = os.path.join(self.upstream_root, project)
438 repo = git.Repo(path)
439 for ref in repo.refs:
440 r = ref.object.hexsha + ' ' + ref.path + '\n'
441 ret += '%04x%s' % (len(r) + 4, r)
442 ret += '0000'
443 return ret
444
445
446class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
447 name = 'gerrit'
448
449 def __init__(self, upstream_root, *args):
450 super(FakeGerritTrigger, self).__init__(*args)
451 self.upstream_root = upstream_root
452
453 def getGitUrl(self, project):
454 return os.path.join(self.upstream_root, project.name)
455
456
457class FakeStatsd(threading.Thread):
458 def __init__(self):
459 threading.Thread.__init__(self)
460 self.daemon = True
461 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
462 self.sock.bind(('', 0))
463 self.port = self.sock.getsockname()[1]
464 self.wake_read, self.wake_write = os.pipe()
465 self.stats = []
466
467 def run(self):
468 while True:
469 poll = select.poll()
470 poll.register(self.sock, select.POLLIN)
471 poll.register(self.wake_read, select.POLLIN)
472 ret = poll.poll()
473 for (fd, event) in ret:
474 if fd == self.sock.fileno():
475 data = self.sock.recvfrom(1024)
476 if not data:
477 return
478 self.stats.append(data[0])
479 if fd == self.wake_read:
480 return
481
482 def stop(self):
483 os.write(self.wake_write, '1\n')
484
485
486class FakeBuild(threading.Thread):
487 log = logging.getLogger("zuul.test")
488
489 def __init__(self, worker, job, number, node):
490 threading.Thread.__init__(self)
491 self.daemon = True
492 self.worker = worker
493 self.job = job
494 self.name = job.name.split(':')[1]
495 self.number = number
496 self.node = node
497 self.parameters = json.loads(job.arguments)
498 self.unique = self.parameters['ZUUL_UUID']
499 self.wait_condition = threading.Condition()
500 self.waiting = False
501 self.aborted = False
502 self.created = time.time()
503 self.description = ''
504 self.run_error = False
505
506 def release(self):
507 self.wait_condition.acquire()
508 self.wait_condition.notify()
509 self.waiting = False
510 self.log.debug("Build %s released" % self.unique)
511 self.wait_condition.release()
512
513 def isWaiting(self):
514 self.wait_condition.acquire()
515 if self.waiting:
516 ret = True
517 else:
518 ret = False
519 self.wait_condition.release()
520 return ret
521
522 def _wait(self):
523 self.wait_condition.acquire()
524 self.waiting = True
525 self.log.debug("Build %s waiting" % self.unique)
526 self.wait_condition.wait()
527 self.wait_condition.release()
528
529 def run(self):
530 data = {
531 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
532 'name': self.name,
533 'number': self.number,
534 'manager': self.worker.worker_id,
535 'worker_name': 'My Worker',
536 'worker_hostname': 'localhost',
537 'worker_ips': ['127.0.0.1', '192.168.1.1'],
538 'worker_fqdn': 'zuul.example.org',
539 'worker_program': 'FakeBuilder',
540 'worker_version': 'v1.1',
541 'worker_extra': {'something': 'else'}
542 }
543
544 self.log.debug('Running build %s' % self.unique)
545
546 self.job.sendWorkData(json.dumps(data))
547 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
548 self.job.sendWorkStatus(0, 100)
549
550 if self.worker.hold_jobs_in_build:
551 self.log.debug('Holding build %s' % self.unique)
552 self._wait()
553 self.log.debug("Build %s continuing" % self.unique)
554
555 self.worker.lock.acquire()
556
557 result = 'SUCCESS'
558 if (('ZUUL_REF' in self.parameters) and
559 self.worker.shouldFailTest(self.name,
560 self.parameters['ZUUL_REF'])):
561 result = 'FAILURE'
562 if self.aborted:
563 result = 'ABORTED'
564
565 if self.run_error:
566 work_fail = True
567 result = 'RUN_ERROR'
568 else:
569 data['result'] = result
570 work_fail = False
571
572 changes = None
573 if 'ZUUL_CHANGE_IDS' in self.parameters:
574 changes = self.parameters['ZUUL_CHANGE_IDS']
575
576 self.worker.build_history.append(
577 BuildHistory(name=self.name, number=self.number,
578 result=result, changes=changes, node=self.node,
579 uuid=self.unique, description=self.description,
580 pipeline=self.parameters['ZUUL_PIPELINE'])
581 )
582
583 self.job.sendWorkData(json.dumps(data))
584 if work_fail:
585 self.job.sendWorkFail()
586 else:
587 self.job.sendWorkComplete(json.dumps(data))
588 del self.worker.gearman_jobs[self.job.unique]
589 self.worker.running_builds.remove(self)
590 self.worker.lock.release()
591
592
593class FakeWorker(gear.Worker):
594 def __init__(self, worker_id, test):
595 super(FakeWorker, self).__init__(worker_id)
596 self.gearman_jobs = {}
597 self.build_history = []
598 self.running_builds = []
599 self.build_counter = 0
600 self.fail_tests = {}
601 self.test = test
602
603 self.hold_jobs_in_build = False
604 self.lock = threading.Lock()
605 self.__work_thread = threading.Thread(target=self.work)
606 self.__work_thread.daemon = True
607 self.__work_thread.start()
608
609 def handleJob(self, job):
610 parts = job.name.split(":")
611 cmd = parts[0]
612 name = parts[1]
613 if len(parts) > 2:
614 node = parts[2]
615 else:
616 node = None
617 if cmd == 'build':
618 self.handleBuild(job, name, node)
619 elif cmd == 'stop':
620 self.handleStop(job, name)
621 elif cmd == 'set_description':
622 self.handleSetDescription(job, name)
623
624 def handleBuild(self, job, name, node):
625 build = FakeBuild(self, job, self.build_counter, node)
626 job.build = build
627 self.gearman_jobs[job.unique] = job
628 self.build_counter += 1
629
630 self.running_builds.append(build)
631 build.start()
632
633 def handleStop(self, job, name):
634 self.log.debug("handle stop")
635 parameters = json.loads(job.arguments)
636 name = parameters['name']
637 number = parameters['number']
638 for build in self.running_builds:
639 if build.name == name and build.number == number:
640 build.aborted = True
641 build.release()
642 job.sendWorkComplete()
643 return
644 job.sendWorkFail()
645
646 def handleSetDescription(self, job, name):
647 self.log.debug("handle set description")
648 parameters = json.loads(job.arguments)
649 name = parameters['name']
650 number = parameters['number']
651 descr = parameters['html_description']
652 for build in self.running_builds:
653 if build.name == name and build.number == number:
654 build.description = descr
655 job.sendWorkComplete()
656 return
657 for build in self.build_history:
658 if build.name == name and build.number == number:
659 build.description = descr
660 job.sendWorkComplete()
661 return
662 job.sendWorkFail()
663
664 def work(self):
665 while self.running:
666 try:
667 job = self.getJob()
668 except gear.InterruptedError:
669 continue
670 try:
671 self.handleJob(job)
672 except:
673 self.log.exception("Worker exception:")
674
675 def addFailTest(self, name, change):
676 l = self.fail_tests.get(name, [])
677 l.append(change)
678 self.fail_tests[name] = l
679
680 def shouldFailTest(self, name, ref):
681 l = self.fail_tests.get(name, [])
682 for change in l:
683 if self.test.ref_has_change(ref, change):
684 return True
685 return False
686
687 def release(self, regex=None):
688 builds = self.running_builds[:]
689 self.log.debug("releasing build %s (%s)" % (regex,
690 len(self.running_builds)))
691 for build in builds:
692 if not regex or re.match(regex, build.name):
693 self.log.debug("releasing build %s" %
694 (build.parameters['ZUUL_UUID']))
695 build.release()
696 else:
697 self.log.debug("not releasing build %s" %
698 (build.parameters['ZUUL_UUID']))
699 self.log.debug("done releasing builds %s (%s)" %
700 (regex, len(self.running_builds)))
701
702
703class FakeGearmanServer(gear.Server):
704 def __init__(self):
705 self.hold_jobs_in_queue = False
706 super(FakeGearmanServer, self).__init__(0)
707
708 def getJobForConnection(self, connection, peek=False):
709 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
710 for job in queue:
711 if not hasattr(job, 'waiting'):
712 if job.name.startswith('build:'):
713 job.waiting = self.hold_jobs_in_queue
714 else:
715 job.waiting = False
716 if job.waiting:
717 continue
718 if job.name in connection.functions:
719 if not peek:
720 queue.remove(job)
721 connection.related_jobs[job.handle] = job
722 job.worker_connection = connection
723 job.running = True
724 return job
725 return None
726
727 def release(self, regex=None):
728 released = False
729 qlen = (len(self.high_queue) + len(self.normal_queue) +
730 len(self.low_queue))
731 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
732 for job in self.getQueue():
733 cmd, name = job.name.split(':')
734 if cmd != 'build':
735 continue
736 if not regex or re.match(regex, name):
737 self.log.debug("releasing queued job %s" %
738 job.unique)
739 job.waiting = False
740 released = True
741 else:
742 self.log.debug("not releasing queued job %s" %
743 job.unique)
744 if released:
745 self.wakeConnections()
746 qlen = (len(self.high_queue) + len(self.normal_queue) +
747 len(self.low_queue))
748 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
749
750
751class FakeSMTP(object):
752 log = logging.getLogger('zuul.FakeSMTP')
753
754 def __init__(self, messages, server, port):
755 self.server = server
756 self.port = port
757 self.messages = messages
758
759 def sendmail(self, from_email, to_email, msg):
760 self.log.info("Sending email from %s, to %s, with msg %s" % (
761 from_email, to_email, msg))
762
763 headers = msg.split('\n\n', 1)[0]
764 body = msg.split('\n\n', 1)[1]
765
766 self.messages.append(dict(
767 from_email=from_email,
768 to_email=to_email,
769 msg=msg,
770 headers=headers,
771 body=body,
772 ))
773
774 return True
775
776 def quit(self):
777 return True
778
779
780class FakeSwiftClientConnection(swiftclient.client.Connection):
781 def post_account(self, headers):
782 # Do nothing
783 pass
784
785 def get_auth(self):
786 # Returns endpoint and (unused) auth token
787 endpoint = os.path.join('https://storage.example.org', 'V1',
788 'AUTH_account')
789 return endpoint, ''
790
791
792class ZuulTestCase(testtools.TestCase):
793 log = logging.getLogger("zuul.test")
794
795 def setUp(self):
796 super(ZuulTestCase, self).setUp()
797 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
798 try:
799 test_timeout = int(test_timeout)
800 except ValueError:
801 # If timeout value is invalid do not set a timeout.
802 test_timeout = 0
803 if test_timeout > 0:
804 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
805
806 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
807 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
808 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
809 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
810 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
811 os.environ.get('OS_STDERR_CAPTURE') == '1'):
812 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
813 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
814 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
815 os.environ.get('OS_LOG_CAPTURE') == '1'):
816 self.useFixture(fixtures.FakeLogger(
817 level=logging.DEBUG,
818 format='%(asctime)s %(name)-32s '
819 '%(levelname)-8s %(message)s'))
820 tmp_root = self.useFixture(fixtures.TempDir(
821 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
822 self.test_root = os.path.join(tmp_root, "zuul-test")
823 self.upstream_root = os.path.join(self.test_root, "upstream")
824 self.git_root = os.path.join(self.test_root, "git")
825
826 if os.path.exists(self.test_root):
827 shutil.rmtree(self.test_root)
828 os.makedirs(self.test_root)
829 os.makedirs(self.upstream_root)
830 os.makedirs(self.git_root)
831
832 # Make per test copy of Configuration.
833 self.setup_config()
834 self.config.set('zuul', 'layout_config',
835 os.path.join(FIXTURE_DIR, "layout.yaml"))
836 self.config.set('merger', 'git_dir', self.git_root)
837
838 # For each project in config:
839 self.init_repo("org/project")
840 self.init_repo("org/project1")
841 self.init_repo("org/project2")
842 self.init_repo("org/project3")
843 self.init_repo("org/one-job-project")
844 self.init_repo("org/nonvoting-project")
845 self.init_repo("org/templated-project")
846 self.init_repo("org/layered-project")
847 self.init_repo("org/node-project")
848 self.init_repo("org/conflict-project")
849 self.init_repo("org/noop-project")
850 self.init_repo("org/experimental-project")
851
852 self.statsd = FakeStatsd()
853 os.environ['STATSD_HOST'] = 'localhost'
854 os.environ['STATSD_PORT'] = str(self.statsd.port)
855 self.statsd.start()
856 # the statsd client object is configured in the statsd module import
857 reload(statsd)
858 reload(zuul.scheduler)
859
860 self.gearman_server = FakeGearmanServer()
861
862 self.config.set('gearman', 'port', str(self.gearman_server.port))
863
864 self.worker = FakeWorker('fake_worker', self)
865 self.worker.addServer('127.0.0.1', self.gearman_server.port)
866 self.gearman_server.worker = self.worker
867
868 self.merge_server = zuul.merger.server.MergeServer(self.config)
869 self.merge_server.start()
870
871 self.sched = zuul.scheduler.Scheduler()
872
873 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
874 FakeSwiftClientConnection))
875 self.swift = zuul.lib.swift.Swift(self.config)
876
877 def URLOpenerFactory(*args, **kw):
878 if isinstance(args[0], urllib2.Request):
879 return old_urlopen(*args, **kw)
880 args = [self.fake_gerrit] + list(args)
881 return FakeURLOpener(self.upstream_root, *args, **kw)
882
883 old_urlopen = urllib2.urlopen
884 urllib2.urlopen = URLOpenerFactory
885
886 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
887 self.swift)
888 self.merge_client = zuul.merger.client.MergeClient(
889 self.config, self.sched)
890
891 self.smtp_messages = []
892
893 def FakeSMTPFactory(*args, **kw):
894 args = [self.smtp_messages] + list(args)
895 return FakeSMTP(*args, **kw)
896
897 zuul.lib.gerrit.Gerrit = FakeGerrit
898 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
899
900 self.gerrit = FakeGerritTrigger(
901 self.upstream_root, self.config, self.sched)
902 self.gerrit.replication_timeout = 1.5
903 self.gerrit.replication_retry_interval = 0.5
904 self.fake_gerrit = self.gerrit.gerrit
905 self.fake_gerrit.upstream_root = self.upstream_root
906
907 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
908 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
909
910 self.sched.setLauncher(self.launcher)
911 self.sched.setMerger(self.merge_client)
912 self.sched.registerTrigger(self.gerrit)
913 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
914 self.sched.registerTrigger(self.timer)
James E. Blairc494d542014-08-06 09:23:52 -0700915 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config, self.sched)
916 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700917
918 self.sched.registerReporter(
919 zuul.reporter.gerrit.Reporter(self.gerrit))
920 self.smtp_reporter = zuul.reporter.smtp.Reporter(
921 self.config.get('smtp', 'default_from'),
922 self.config.get('smtp', 'default_to'),
923 self.config.get('smtp', 'server'))
924 self.sched.registerReporter(self.smtp_reporter)
925
926 self.sched.start()
927 self.sched.reconfigure(self.config)
928 self.sched.resume()
929 self.webapp.start()
930 self.rpc.start()
931 self.launcher.gearman.waitForServer()
932 self.registerJobs()
933 self.builds = self.worker.running_builds
934 self.history = self.worker.build_history
935
936 self.addCleanup(self.assertFinalState)
937 self.addCleanup(self.shutdown)
938
939 def setup_config(self):
940 """Per test config object. Override to set different config."""
941 self.config = ConfigParser.ConfigParser()
942 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
943
944 def assertFinalState(self):
945 # Make sure that the change cache is cleared
Antoine Mussobd86a312014-01-08 14:51:33 +0100946 self.assertEqual(len(self.gerrit._change_cache.keys()), 0,
947 "Change cache should have been cleared")
Clark Boylanb640e052014-04-03 16:41:46 -0700948 # Make sure that git.Repo objects have been garbage collected.
949 repos = []
950 gc.collect()
951 for obj in gc.get_objects():
952 if isinstance(obj, git.Repo):
953 repos.append(obj)
954 self.assertEqual(len(repos), 0)
955 self.assertEmptyQueues()
956
957 def shutdown(self):
958 self.log.debug("Shutting down after tests")
959 self.launcher.stop()
960 self.merge_server.stop()
961 self.merge_server.join()
962 self.merge_client.stop()
963 self.worker.shutdown()
964 self.gerrit.stop()
965 self.timer.stop()
966 self.sched.stop()
967 self.sched.join()
968 self.statsd.stop()
969 self.statsd.join()
970 self.webapp.stop()
971 self.webapp.join()
972 self.rpc.stop()
973 self.rpc.join()
974 self.gearman_server.shutdown()
975 threads = threading.enumerate()
976 if len(threads) > 1:
977 self.log.error("More than one thread is running: %s" % threads)
978 super(ZuulTestCase, self).tearDown()
979
980 def init_repo(self, project):
981 parts = project.split('/')
982 path = os.path.join(self.upstream_root, *parts[:-1])
983 if not os.path.exists(path):
984 os.makedirs(path)
985 path = os.path.join(self.upstream_root, project)
986 repo = git.Repo.init(path)
987
988 repo.config_writer().set_value('user', 'email', 'user@example.com')
989 repo.config_writer().set_value('user', 'name', 'User Name')
990 repo.config_writer().write()
991
992 fn = os.path.join(path, 'README')
993 f = open(fn, 'w')
994 f.write("test\n")
995 f.close()
996 repo.index.add([fn])
997 repo.index.commit('initial commit')
998 master = repo.create_head('master')
999 repo.create_tag('init')
1000
1001 mp = repo.create_head('mp')
1002 repo.head.reference = mp
1003 f = open(fn, 'a')
1004 f.write("test mp\n")
1005 f.close()
1006 repo.index.add([fn])
1007 repo.index.commit('mp commit')
1008
1009 repo.head.reference = master
1010 repo.head.reset(index=True, working_tree=True)
1011 repo.git.clean('-x', '-f', '-d')
1012
1013 def ref_has_change(self, ref, change):
1014 path = os.path.join(self.git_root, change.project)
1015 repo = git.Repo(path)
1016 for commit in repo.iter_commits(ref):
1017 if commit.message.strip() == ('%s-1' % change.subject):
1018 return True
1019 return False
1020
1021 def job_has_changes(self, *args):
1022 job = args[0]
1023 commits = args[1:]
1024 if isinstance(job, FakeBuild):
1025 parameters = job.parameters
1026 else:
1027 parameters = json.loads(job.arguments)
1028 project = parameters['ZUUL_PROJECT']
1029 path = os.path.join(self.git_root, project)
1030 repo = git.Repo(path)
1031 ref = parameters['ZUUL_REF']
1032 sha = parameters['ZUUL_COMMIT']
1033 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1034 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1035 commit_messages = ['%s-1' % commit.subject for commit in commits]
1036 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1037 " repo_messages %s; sha %s" % (job, commit_messages,
1038 repo_messages, sha))
1039 for msg in commit_messages:
1040 if msg not in repo_messages:
1041 self.log.debug(" messages do not match")
1042 return False
1043 if repo_shas[0] != sha:
1044 self.log.debug(" sha does not match")
1045 return False
1046 self.log.debug(" OK")
1047 return True
1048
1049 def registerJobs(self):
1050 count = 0
1051 for job in self.sched.layout.jobs.keys():
1052 self.worker.registerFunction('build:' + job)
1053 count += 1
1054 self.worker.registerFunction('stop:' + self.worker.worker_id)
1055 count += 1
1056
1057 while len(self.gearman_server.functions) < count:
1058 time.sleep(0)
1059
1060 def release(self, job):
1061 if isinstance(job, FakeBuild):
1062 job.release()
1063 else:
1064 job.waiting = False
1065 self.log.debug("Queued job %s released" % job.unique)
1066 self.gearman_server.wakeConnections()
1067
1068 def getParameter(self, job, name):
1069 if isinstance(job, FakeBuild):
1070 return job.parameters[name]
1071 else:
1072 parameters = json.loads(job.arguments)
1073 return parameters[name]
1074
1075 def resetGearmanServer(self):
1076 self.worker.setFunctions([])
1077 while True:
1078 done = True
1079 for connection in self.gearman_server.active_connections:
1080 if (connection.functions and
1081 connection.client_id not in ['Zuul RPC Listener',
1082 'Zuul Merger']):
1083 done = False
1084 if done:
1085 break
1086 time.sleep(0)
1087 self.gearman_server.functions = set()
1088 self.rpc.register()
1089 self.merge_server.register()
1090
1091 def haveAllBuildsReported(self):
1092 # See if Zuul is waiting on a meta job to complete
1093 if self.launcher.meta_jobs:
1094 return False
1095 # Find out if every build that the worker has completed has been
1096 # reported back to Zuul. If it hasn't then that means a Gearman
1097 # event is still in transit and the system is not stable.
1098 for build in self.worker.build_history:
1099 zbuild = self.launcher.builds.get(build.uuid)
1100 if not zbuild:
1101 # It has already been reported
1102 continue
1103 # It hasn't been reported yet.
1104 return False
1105 # Make sure that none of the worker connections are in GRAB_WAIT
1106 for connection in self.worker.active_connections:
1107 if connection.state == 'GRAB_WAIT':
1108 return False
1109 return True
1110
1111 def areAllBuildsWaiting(self):
1112 ret = True
1113
1114 builds = self.launcher.builds.values()
1115 for build in builds:
1116 client_job = None
1117 for conn in self.launcher.gearman.active_connections:
1118 for j in conn.related_jobs.values():
1119 if j.unique == build.uuid:
1120 client_job = j
1121 break
1122 if not client_job:
1123 self.log.debug("%s is not known to the gearman client" %
1124 build)
1125 ret = False
1126 continue
1127 if not client_job.handle:
1128 self.log.debug("%s has no handle" % client_job)
1129 ret = False
1130 continue
1131 server_job = self.gearman_server.jobs.get(client_job.handle)
1132 if not server_job:
1133 self.log.debug("%s is not known to the gearman server" %
1134 client_job)
1135 ret = False
1136 continue
1137 if not hasattr(server_job, 'waiting'):
1138 self.log.debug("%s is being enqueued" % server_job)
1139 ret = False
1140 continue
1141 if server_job.waiting:
1142 continue
1143 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1144 if worker_job:
1145 if worker_job.build.isWaiting():
1146 continue
1147 else:
1148 self.log.debug("%s is running" % worker_job)
1149 ret = False
1150 else:
1151 self.log.debug("%s is unassigned" % server_job)
1152 ret = False
1153 return ret
1154
1155 def waitUntilSettled(self):
1156 self.log.debug("Waiting until settled...")
1157 start = time.time()
1158 while True:
1159 if time.time() - start > 10:
1160 print 'queue status:',
1161 print self.sched.trigger_event_queue.empty(),
1162 print self.sched.result_event_queue.empty(),
1163 print self.fake_gerrit.event_queue.empty(),
1164 print self.areAllBuildsWaiting()
1165 raise Exception("Timeout waiting for Zuul to settle")
1166 # Make sure no new events show up while we're checking
1167 self.worker.lock.acquire()
1168 # have all build states propogated to zuul?
1169 if self.haveAllBuildsReported():
1170 # Join ensures that the queue is empty _and_ events have been
1171 # processed
1172 self.fake_gerrit.event_queue.join()
1173 self.sched.trigger_event_queue.join()
1174 self.sched.result_event_queue.join()
1175 self.sched.run_handler_lock.acquire()
1176 if (self.sched.trigger_event_queue.empty() and
1177 self.sched.result_event_queue.empty() and
1178 self.fake_gerrit.event_queue.empty() and
1179 not self.merge_client.build_sets and
1180 self.haveAllBuildsReported() and
1181 self.areAllBuildsWaiting()):
1182 self.sched.run_handler_lock.release()
1183 self.worker.lock.release()
1184 self.log.debug("...settled.")
1185 return
1186 self.sched.run_handler_lock.release()
1187 self.worker.lock.release()
1188 self.sched.wake_event.wait(0.1)
1189
1190 def countJobResults(self, jobs, result):
1191 jobs = filter(lambda x: x.result == result, jobs)
1192 return len(jobs)
1193
1194 def getJobFromHistory(self, name):
1195 history = self.worker.build_history
1196 for job in history:
1197 if job.name == name:
1198 return job
1199 raise Exception("Unable to find job %s in history" % name)
1200
1201 def assertEmptyQueues(self):
1202 # Make sure there are no orphaned jobs
1203 for pipeline in self.sched.layout.pipelines.values():
1204 for queue in pipeline.queues:
1205 if len(queue.queue) != 0:
1206 print 'pipeline %s queue %s contents %s' % (
1207 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001208 self.assertEqual(len(queue.queue), 0,
1209 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001210
1211 def assertReportedStat(self, key, value=None, kind=None):
1212 start = time.time()
1213 while time.time() < (start + 5):
1214 for stat in self.statsd.stats:
1215 pprint.pprint(self.statsd.stats)
1216 k, v = stat.split(':')
1217 if key == k:
1218 if value is None and kind is None:
1219 return
1220 elif value:
1221 if value == v:
1222 return
1223 elif kind:
1224 if v.endswith('|' + kind):
1225 return
1226 time.sleep(0.1)
1227
1228 pprint.pprint(self.statsd.stats)
1229 raise Exception("Key %s not found in reported stats" % key)