blob: 01b42f29ce06aae8006b2219de549759b40013d9 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17import ConfigParser
18import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
24import Queue
25import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
43
44import zuul.scheduler
45import zuul.webapp
46import zuul.rpclistener
47import zuul.launcher.gearman
48import zuul.lib.swift
49import zuul.merger.server
50import zuul.merger.client
51import zuul.reporter.gerrit
52import zuul.reporter.smtp
53import zuul.trigger.gerrit
54import zuul.trigger.timer
55
56FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
57 'fixtures')
58
59logging.basicConfig(level=logging.DEBUG,
60 format='%(asctime)s %(name)-32s '
61 '%(levelname)-8s %(message)s')
62
63
64def repack_repo(path):
65 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
66 output = subprocess.Popen(cmd, close_fds=True,
67 stdout=subprocess.PIPE,
68 stderr=subprocess.PIPE)
69 out = output.communicate()
70 if output.returncode:
71 raise Exception("git repack returned %d" % output.returncode)
72 return out
73
74
75def random_sha1():
76 return hashlib.sha1(str(random.random())).hexdigest()
77
78
79class ChangeReference(git.Reference):
80 _common_path_default = "refs/changes"
81 _points_to_commits_only = True
82
83
84class FakeChange(object):
85 categories = {'APRV': ('Approved', -1, 1),
86 'CRVW': ('Code-Review', -2, 2),
87 'VRFY': ('Verified', -2, 2)}
88
89 def __init__(self, gerrit, number, project, branch, subject,
90 status='NEW', upstream_root=None):
91 self.gerrit = gerrit
92 self.reported = 0
93 self.queried = 0
94 self.patchsets = []
95 self.number = number
96 self.project = project
97 self.branch = branch
98 self.subject = subject
99 self.latest_patchset = 0
100 self.depends_on_change = None
101 self.needed_by_changes = []
102 self.fail_merge = False
103 self.messages = []
104 self.data = {
105 'branch': branch,
106 'comments': [],
107 'commitMessage': subject,
108 'createdOn': time.time(),
109 'id': 'I' + random_sha1(),
110 'lastUpdated': time.time(),
111 'number': str(number),
112 'open': status == 'NEW',
113 'owner': {'email': 'user@example.com',
114 'name': 'User Name',
115 'username': 'username'},
116 'patchSets': self.patchsets,
117 'project': project,
118 'status': status,
119 'subject': subject,
120 'submitRecords': [],
121 'url': 'https://hostname/%s' % number}
122
123 self.upstream_root = upstream_root
124 self.addPatchset()
125 self.data['submitRecords'] = self.getSubmitRecords()
126 self.open = status == 'NEW'
127
128 def add_fake_change_to_repo(self, msg, fn, large):
129 path = os.path.join(self.upstream_root, self.project)
130 repo = git.Repo(path)
131 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
132 self.latest_patchset),
133 'refs/tags/init')
134 repo.head.reference = ref
135 repo.head.reset(index=True, working_tree=True)
136 repo.git.clean('-x', '-f', '-d')
137
138 path = os.path.join(self.upstream_root, self.project)
139 if not large:
140 fn = os.path.join(path, fn)
141 f = open(fn, 'w')
142 f.write("test %s %s %s\n" %
143 (self.branch, self.number, self.latest_patchset))
144 f.close()
145 repo.index.add([fn])
146 else:
147 for fni in range(100):
148 fn = os.path.join(path, str(fni))
149 f = open(fn, 'w')
150 for ci in range(4096):
151 f.write(random.choice(string.printable))
152 f.close()
153 repo.index.add([fn])
154
155 r = repo.index.commit(msg)
156 repo.head.reference = 'master'
157 repo.head.reset(index=True, working_tree=True)
158 repo.git.clean('-x', '-f', '-d')
159 repo.heads['master'].checkout()
160 return r
161
162 def addPatchset(self, files=[], large=False):
163 self.latest_patchset += 1
164 if files:
165 fn = files[0]
166 else:
167 fn = '%s-%s' % (self.branch, self.number)
168 msg = self.subject + '-' + str(self.latest_patchset)
169 c = self.add_fake_change_to_repo(msg, fn, large)
170 ps_files = [{'file': '/COMMIT_MSG',
171 'type': 'ADDED'},
172 {'file': 'README',
173 'type': 'MODIFIED'}]
174 for f in files:
175 ps_files.append({'file': f, 'type': 'ADDED'})
176 d = {'approvals': [],
177 'createdOn': time.time(),
178 'files': ps_files,
179 'number': str(self.latest_patchset),
180 'ref': 'refs/changes/1/%s/%s' % (self.number,
181 self.latest_patchset),
182 'revision': c.hexsha,
183 'uploader': {'email': 'user@example.com',
184 'name': 'User name',
185 'username': 'user'}}
186 self.data['currentPatchSet'] = d
187 self.patchsets.append(d)
188 self.data['submitRecords'] = self.getSubmitRecords()
189
190 def getPatchsetCreatedEvent(self, patchset):
191 event = {"type": "patchset-created",
192 "change": {"project": self.project,
193 "branch": self.branch,
194 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
195 "number": str(self.number),
196 "subject": self.subject,
197 "owner": {"name": "User Name"},
198 "url": "https://hostname/3"},
199 "patchSet": self.patchsets[patchset - 1],
200 "uploader": {"name": "User Name"}}
201 return event
202
203 def getChangeRestoredEvent(self):
204 event = {"type": "change-restored",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "restorer": {"name": "User Name"},
213 "reason": ""}
214 return event
215
216 def getChangeCommentEvent(self, patchset):
217 event = {"type": "comment-added",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "patchSet": self.patchsets[patchset - 1],
226 "author": {"name": "User Name"},
227 "approvals": [{"type": "Code-Review",
228 "description": "Code-Review",
229 "value": "0"}],
230 "comment": "This is a comment"}
231 return event
232
233 def addApproval(self, category, value, username='jenkins',
234 granted_on=None):
235 if not granted_on:
236 granted_on = time.time()
237 approval = {'description': self.categories[category][0],
238 'type': category,
239 'value': str(value),
240 'by': {
241 'username': username,
242 'email': username + '@example.com',
243 },
244 'grantedOn': int(granted_on)}
245 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
246 if x['by']['username'] == username and x['type'] == category:
247 del self.patchsets[-1]['approvals'][i]
248 self.patchsets[-1]['approvals'].append(approval)
249 event = {'approvals': [approval],
250 'author': {'email': 'user@example.com',
251 'name': 'User Name',
252 'username': 'username'},
253 'change': {'branch': self.branch,
254 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
255 'number': str(self.number),
256 'owner': {'email': 'user@example.com',
257 'name': 'User Name',
258 'username': 'username'},
259 'project': self.project,
260 'subject': self.subject,
261 'topic': 'master',
262 'url': 'https://hostname/459'},
263 'comment': '',
264 'patchSet': self.patchsets[-1],
265 'type': 'comment-added'}
266 self.data['submitRecords'] = self.getSubmitRecords()
267 return json.loads(json.dumps(event))
268
269 def getSubmitRecords(self):
270 status = {}
271 for cat in self.categories.keys():
272 status[cat] = 0
273
274 for a in self.patchsets[-1]['approvals']:
275 cur = status[a['type']]
276 cat_min, cat_max = self.categories[a['type']][1:]
277 new = int(a['value'])
278 if new == cat_min:
279 cur = new
280 elif abs(new) > abs(cur):
281 cur = new
282 status[a['type']] = cur
283
284 labels = []
285 ok = True
286 for typ, cat in self.categories.items():
287 cur = status[typ]
288 cat_min, cat_max = cat[1:]
289 if cur == cat_min:
290 value = 'REJECT'
291 ok = False
292 elif cur == cat_max:
293 value = 'OK'
294 else:
295 value = 'NEED'
296 ok = False
297 labels.append({'label': cat[0], 'status': value})
298 if ok:
299 return [{'status': 'OK'}]
300 return [{'status': 'NOT_READY',
301 'labels': labels}]
302
303 def setDependsOn(self, other, patchset):
304 self.depends_on_change = other
305 d = {'id': other.data['id'],
306 'number': other.data['number'],
307 'ref': other.patchsets[patchset - 1]['ref']
308 }
309 self.data['dependsOn'] = [d]
310
311 other.needed_by_changes.append(self)
312 needed = other.data.get('neededBy', [])
313 d = {'id': self.data['id'],
314 'number': self.data['number'],
315 'ref': self.patchsets[patchset - 1]['ref'],
316 'revision': self.patchsets[patchset - 1]['revision']
317 }
318 needed.append(d)
319 other.data['neededBy'] = needed
320
321 def query(self):
322 self.queried += 1
323 d = self.data.get('dependsOn')
324 if d:
325 d = d[0]
326 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
327 d['isCurrentPatchSet'] = True
328 else:
329 d['isCurrentPatchSet'] = False
330 return json.loads(json.dumps(self.data))
331
332 def setMerged(self):
333 if (self.depends_on_change and
334 self.depends_on_change.data['status'] != 'MERGED'):
335 return
336 if self.fail_merge:
337 return
338 self.data['status'] = 'MERGED'
339 self.open = False
340
341 path = os.path.join(self.upstream_root, self.project)
342 repo = git.Repo(path)
343 repo.heads[self.branch].commit = \
344 repo.commit(self.patchsets[-1]['revision'])
345
346 def setReported(self):
347 self.reported += 1
348
349
350class FakeGerrit(object):
351 def __init__(self, *args, **kw):
352 self.event_queue = Queue.Queue()
353 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
354 self.change_number = 0
355 self.changes = {}
356
357 def addFakeChange(self, project, branch, subject, status='NEW'):
358 self.change_number += 1
359 c = FakeChange(self, self.change_number, project, branch, subject,
360 upstream_root=self.upstream_root,
361 status=status)
362 self.changes[self.change_number] = c
363 return c
364
365 def addEvent(self, data):
366 return self.event_queue.put(data)
367
368 def getEvent(self):
369 return self.event_queue.get()
370
371 def eventDone(self):
372 self.event_queue.task_done()
373
374 def review(self, project, changeid, message, action):
375 number, ps = changeid.split(',')
376 change = self.changes[int(number)]
377 change.messages.append(message)
378 if 'submit' in action:
379 change.setMerged()
380 if message:
381 change.setReported()
382
383 def query(self, number):
384 change = self.changes.get(int(number))
385 if change:
386 return change.query()
387 return {}
388
389 def startWatching(self, *args, **kw):
390 pass
391
392
393class BuildHistory(object):
394 def __init__(self, **kw):
395 self.__dict__.update(kw)
396
397 def __repr__(self):
398 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
399 (self.result, self.name, self.number, self.changes))
400
401
402class FakeURLOpener(object):
403 def __init__(self, upstream_root, fake_gerrit, url):
404 self.upstream_root = upstream_root
405 self.fake_gerrit = fake_gerrit
406 self.url = url
407
408 def read(self):
409 res = urlparse.urlparse(self.url)
410 path = res.path
411 project = '/'.join(path.split('/')[2:-2])
412 ret = '001e# service=git-upload-pack\n'
413 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
414 'multi_ack thin-pack side-band side-band-64k ofs-delta '
415 'shallow no-progress include-tag multi_ack_detailed no-done\n')
416 path = os.path.join(self.upstream_root, project)
417 repo = git.Repo(path)
418 for ref in repo.refs:
419 r = ref.object.hexsha + ' ' + ref.path + '\n'
420 ret += '%04x%s' % (len(r) + 4, r)
421 ret += '0000'
422 return ret
423
424
425class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
426 name = 'gerrit'
427
428 def __init__(self, upstream_root, *args):
429 super(FakeGerritTrigger, self).__init__(*args)
430 self.upstream_root = upstream_root
431
432 def getGitUrl(self, project):
433 return os.path.join(self.upstream_root, project.name)
434
435
436class FakeStatsd(threading.Thread):
437 def __init__(self):
438 threading.Thread.__init__(self)
439 self.daemon = True
440 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
441 self.sock.bind(('', 0))
442 self.port = self.sock.getsockname()[1]
443 self.wake_read, self.wake_write = os.pipe()
444 self.stats = []
445
446 def run(self):
447 while True:
448 poll = select.poll()
449 poll.register(self.sock, select.POLLIN)
450 poll.register(self.wake_read, select.POLLIN)
451 ret = poll.poll()
452 for (fd, event) in ret:
453 if fd == self.sock.fileno():
454 data = self.sock.recvfrom(1024)
455 if not data:
456 return
457 self.stats.append(data[0])
458 if fd == self.wake_read:
459 return
460
461 def stop(self):
462 os.write(self.wake_write, '1\n')
463
464
465class FakeBuild(threading.Thread):
466 log = logging.getLogger("zuul.test")
467
468 def __init__(self, worker, job, number, node):
469 threading.Thread.__init__(self)
470 self.daemon = True
471 self.worker = worker
472 self.job = job
473 self.name = job.name.split(':')[1]
474 self.number = number
475 self.node = node
476 self.parameters = json.loads(job.arguments)
477 self.unique = self.parameters['ZUUL_UUID']
478 self.wait_condition = threading.Condition()
479 self.waiting = False
480 self.aborted = False
481 self.created = time.time()
482 self.description = ''
483 self.run_error = False
484
485 def release(self):
486 self.wait_condition.acquire()
487 self.wait_condition.notify()
488 self.waiting = False
489 self.log.debug("Build %s released" % self.unique)
490 self.wait_condition.release()
491
492 def isWaiting(self):
493 self.wait_condition.acquire()
494 if self.waiting:
495 ret = True
496 else:
497 ret = False
498 self.wait_condition.release()
499 return ret
500
501 def _wait(self):
502 self.wait_condition.acquire()
503 self.waiting = True
504 self.log.debug("Build %s waiting" % self.unique)
505 self.wait_condition.wait()
506 self.wait_condition.release()
507
508 def run(self):
509 data = {
510 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
511 'name': self.name,
512 'number': self.number,
513 'manager': self.worker.worker_id,
514 'worker_name': 'My Worker',
515 'worker_hostname': 'localhost',
516 'worker_ips': ['127.0.0.1', '192.168.1.1'],
517 'worker_fqdn': 'zuul.example.org',
518 'worker_program': 'FakeBuilder',
519 'worker_version': 'v1.1',
520 'worker_extra': {'something': 'else'}
521 }
522
523 self.log.debug('Running build %s' % self.unique)
524
525 self.job.sendWorkData(json.dumps(data))
526 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
527 self.job.sendWorkStatus(0, 100)
528
529 if self.worker.hold_jobs_in_build:
530 self.log.debug('Holding build %s' % self.unique)
531 self._wait()
532 self.log.debug("Build %s continuing" % self.unique)
533
534 self.worker.lock.acquire()
535
536 result = 'SUCCESS'
537 if (('ZUUL_REF' in self.parameters) and
538 self.worker.shouldFailTest(self.name,
539 self.parameters['ZUUL_REF'])):
540 result = 'FAILURE'
541 if self.aborted:
542 result = 'ABORTED'
543
544 if self.run_error:
545 work_fail = True
546 result = 'RUN_ERROR'
547 else:
548 data['result'] = result
549 work_fail = False
550
551 changes = None
552 if 'ZUUL_CHANGE_IDS' in self.parameters:
553 changes = self.parameters['ZUUL_CHANGE_IDS']
554
555 self.worker.build_history.append(
556 BuildHistory(name=self.name, number=self.number,
557 result=result, changes=changes, node=self.node,
558 uuid=self.unique, description=self.description,
559 pipeline=self.parameters['ZUUL_PIPELINE'])
560 )
561
562 self.job.sendWorkData(json.dumps(data))
563 if work_fail:
564 self.job.sendWorkFail()
565 else:
566 self.job.sendWorkComplete(json.dumps(data))
567 del self.worker.gearman_jobs[self.job.unique]
568 self.worker.running_builds.remove(self)
569 self.worker.lock.release()
570
571
572class FakeWorker(gear.Worker):
573 def __init__(self, worker_id, test):
574 super(FakeWorker, self).__init__(worker_id)
575 self.gearman_jobs = {}
576 self.build_history = []
577 self.running_builds = []
578 self.build_counter = 0
579 self.fail_tests = {}
580 self.test = test
581
582 self.hold_jobs_in_build = False
583 self.lock = threading.Lock()
584 self.__work_thread = threading.Thread(target=self.work)
585 self.__work_thread.daemon = True
586 self.__work_thread.start()
587
588 def handleJob(self, job):
589 parts = job.name.split(":")
590 cmd = parts[0]
591 name = parts[1]
592 if len(parts) > 2:
593 node = parts[2]
594 else:
595 node = None
596 if cmd == 'build':
597 self.handleBuild(job, name, node)
598 elif cmd == 'stop':
599 self.handleStop(job, name)
600 elif cmd == 'set_description':
601 self.handleSetDescription(job, name)
602
603 def handleBuild(self, job, name, node):
604 build = FakeBuild(self, job, self.build_counter, node)
605 job.build = build
606 self.gearman_jobs[job.unique] = job
607 self.build_counter += 1
608
609 self.running_builds.append(build)
610 build.start()
611
612 def handleStop(self, job, name):
613 self.log.debug("handle stop")
614 parameters = json.loads(job.arguments)
615 name = parameters['name']
616 number = parameters['number']
617 for build in self.running_builds:
618 if build.name == name and build.number == number:
619 build.aborted = True
620 build.release()
621 job.sendWorkComplete()
622 return
623 job.sendWorkFail()
624
625 def handleSetDescription(self, job, name):
626 self.log.debug("handle set description")
627 parameters = json.loads(job.arguments)
628 name = parameters['name']
629 number = parameters['number']
630 descr = parameters['html_description']
631 for build in self.running_builds:
632 if build.name == name and build.number == number:
633 build.description = descr
634 job.sendWorkComplete()
635 return
636 for build in self.build_history:
637 if build.name == name and build.number == number:
638 build.description = descr
639 job.sendWorkComplete()
640 return
641 job.sendWorkFail()
642
643 def work(self):
644 while self.running:
645 try:
646 job = self.getJob()
647 except gear.InterruptedError:
648 continue
649 try:
650 self.handleJob(job)
651 except:
652 self.log.exception("Worker exception:")
653
654 def addFailTest(self, name, change):
655 l = self.fail_tests.get(name, [])
656 l.append(change)
657 self.fail_tests[name] = l
658
659 def shouldFailTest(self, name, ref):
660 l = self.fail_tests.get(name, [])
661 for change in l:
662 if self.test.ref_has_change(ref, change):
663 return True
664 return False
665
666 def release(self, regex=None):
667 builds = self.running_builds[:]
668 self.log.debug("releasing build %s (%s)" % (regex,
669 len(self.running_builds)))
670 for build in builds:
671 if not regex or re.match(regex, build.name):
672 self.log.debug("releasing build %s" %
673 (build.parameters['ZUUL_UUID']))
674 build.release()
675 else:
676 self.log.debug("not releasing build %s" %
677 (build.parameters['ZUUL_UUID']))
678 self.log.debug("done releasing builds %s (%s)" %
679 (regex, len(self.running_builds)))
680
681
682class FakeGearmanServer(gear.Server):
683 def __init__(self):
684 self.hold_jobs_in_queue = False
685 super(FakeGearmanServer, self).__init__(0)
686
687 def getJobForConnection(self, connection, peek=False):
688 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
689 for job in queue:
690 if not hasattr(job, 'waiting'):
691 if job.name.startswith('build:'):
692 job.waiting = self.hold_jobs_in_queue
693 else:
694 job.waiting = False
695 if job.waiting:
696 continue
697 if job.name in connection.functions:
698 if not peek:
699 queue.remove(job)
700 connection.related_jobs[job.handle] = job
701 job.worker_connection = connection
702 job.running = True
703 return job
704 return None
705
706 def release(self, regex=None):
707 released = False
708 qlen = (len(self.high_queue) + len(self.normal_queue) +
709 len(self.low_queue))
710 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
711 for job in self.getQueue():
712 cmd, name = job.name.split(':')
713 if cmd != 'build':
714 continue
715 if not regex or re.match(regex, name):
716 self.log.debug("releasing queued job %s" %
717 job.unique)
718 job.waiting = False
719 released = True
720 else:
721 self.log.debug("not releasing queued job %s" %
722 job.unique)
723 if released:
724 self.wakeConnections()
725 qlen = (len(self.high_queue) + len(self.normal_queue) +
726 len(self.low_queue))
727 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
728
729
730class FakeSMTP(object):
731 log = logging.getLogger('zuul.FakeSMTP')
732
733 def __init__(self, messages, server, port):
734 self.server = server
735 self.port = port
736 self.messages = messages
737
738 def sendmail(self, from_email, to_email, msg):
739 self.log.info("Sending email from %s, to %s, with msg %s" % (
740 from_email, to_email, msg))
741
742 headers = msg.split('\n\n', 1)[0]
743 body = msg.split('\n\n', 1)[1]
744
745 self.messages.append(dict(
746 from_email=from_email,
747 to_email=to_email,
748 msg=msg,
749 headers=headers,
750 body=body,
751 ))
752
753 return True
754
755 def quit(self):
756 return True
757
758
759class FakeSwiftClientConnection(swiftclient.client.Connection):
760 def post_account(self, headers):
761 # Do nothing
762 pass
763
764 def get_auth(self):
765 # Returns endpoint and (unused) auth token
766 endpoint = os.path.join('https://storage.example.org', 'V1',
767 'AUTH_account')
768 return endpoint, ''
769
770
771class ZuulTestCase(testtools.TestCase):
772 log = logging.getLogger("zuul.test")
773
774 def setUp(self):
775 super(ZuulTestCase, self).setUp()
776 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
777 try:
778 test_timeout = int(test_timeout)
779 except ValueError:
780 # If timeout value is invalid do not set a timeout.
781 test_timeout = 0
782 if test_timeout > 0:
783 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
784
785 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
786 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
787 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
788 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
789 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
790 os.environ.get('OS_STDERR_CAPTURE') == '1'):
791 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
792 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
793 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
794 os.environ.get('OS_LOG_CAPTURE') == '1'):
795 self.useFixture(fixtures.FakeLogger(
796 level=logging.DEBUG,
797 format='%(asctime)s %(name)-32s '
798 '%(levelname)-8s %(message)s'))
799 tmp_root = self.useFixture(fixtures.TempDir(
800 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
801 self.test_root = os.path.join(tmp_root, "zuul-test")
802 self.upstream_root = os.path.join(self.test_root, "upstream")
803 self.git_root = os.path.join(self.test_root, "git")
804
805 if os.path.exists(self.test_root):
806 shutil.rmtree(self.test_root)
807 os.makedirs(self.test_root)
808 os.makedirs(self.upstream_root)
809 os.makedirs(self.git_root)
810
811 # Make per test copy of Configuration.
812 self.setup_config()
813 self.config.set('zuul', 'layout_config',
814 os.path.join(FIXTURE_DIR, "layout.yaml"))
815 self.config.set('merger', 'git_dir', self.git_root)
816
817 # For each project in config:
818 self.init_repo("org/project")
819 self.init_repo("org/project1")
820 self.init_repo("org/project2")
821 self.init_repo("org/project3")
822 self.init_repo("org/one-job-project")
823 self.init_repo("org/nonvoting-project")
824 self.init_repo("org/templated-project")
825 self.init_repo("org/layered-project")
826 self.init_repo("org/node-project")
827 self.init_repo("org/conflict-project")
828 self.init_repo("org/noop-project")
829 self.init_repo("org/experimental-project")
830
831 self.statsd = FakeStatsd()
832 os.environ['STATSD_HOST'] = 'localhost'
833 os.environ['STATSD_PORT'] = str(self.statsd.port)
834 self.statsd.start()
835 # the statsd client object is configured in the statsd module import
836 reload(statsd)
837 reload(zuul.scheduler)
838
839 self.gearman_server = FakeGearmanServer()
840
841 self.config.set('gearman', 'port', str(self.gearman_server.port))
842
843 self.worker = FakeWorker('fake_worker', self)
844 self.worker.addServer('127.0.0.1', self.gearman_server.port)
845 self.gearman_server.worker = self.worker
846
847 self.merge_server = zuul.merger.server.MergeServer(self.config)
848 self.merge_server.start()
849
850 self.sched = zuul.scheduler.Scheduler()
851
852 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
853 FakeSwiftClientConnection))
854 self.swift = zuul.lib.swift.Swift(self.config)
855
856 def URLOpenerFactory(*args, **kw):
857 if isinstance(args[0], urllib2.Request):
858 return old_urlopen(*args, **kw)
859 args = [self.fake_gerrit] + list(args)
860 return FakeURLOpener(self.upstream_root, *args, **kw)
861
862 old_urlopen = urllib2.urlopen
863 urllib2.urlopen = URLOpenerFactory
864
865 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
866 self.swift)
867 self.merge_client = zuul.merger.client.MergeClient(
868 self.config, self.sched)
869
870 self.smtp_messages = []
871
872 def FakeSMTPFactory(*args, **kw):
873 args = [self.smtp_messages] + list(args)
874 return FakeSMTP(*args, **kw)
875
876 zuul.lib.gerrit.Gerrit = FakeGerrit
877 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
878
879 self.gerrit = FakeGerritTrigger(
880 self.upstream_root, self.config, self.sched)
881 self.gerrit.replication_timeout = 1.5
882 self.gerrit.replication_retry_interval = 0.5
883 self.fake_gerrit = self.gerrit.gerrit
884 self.fake_gerrit.upstream_root = self.upstream_root
885
886 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
887 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
888
889 self.sched.setLauncher(self.launcher)
890 self.sched.setMerger(self.merge_client)
891 self.sched.registerTrigger(self.gerrit)
892 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
893 self.sched.registerTrigger(self.timer)
894
895 self.sched.registerReporter(
896 zuul.reporter.gerrit.Reporter(self.gerrit))
897 self.smtp_reporter = zuul.reporter.smtp.Reporter(
898 self.config.get('smtp', 'default_from'),
899 self.config.get('smtp', 'default_to'),
900 self.config.get('smtp', 'server'))
901 self.sched.registerReporter(self.smtp_reporter)
902
903 self.sched.start()
904 self.sched.reconfigure(self.config)
905 self.sched.resume()
906 self.webapp.start()
907 self.rpc.start()
908 self.launcher.gearman.waitForServer()
909 self.registerJobs()
910 self.builds = self.worker.running_builds
911 self.history = self.worker.build_history
912
913 self.addCleanup(self.assertFinalState)
914 self.addCleanup(self.shutdown)
915
916 def setup_config(self):
917 """Per test config object. Override to set different config."""
918 self.config = ConfigParser.ConfigParser()
919 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
920
921 def assertFinalState(self):
922 # Make sure that the change cache is cleared
923 self.assertEqual(len(self.gerrit._change_cache.keys()), 0)
924 # Make sure that git.Repo objects have been garbage collected.
925 repos = []
926 gc.collect()
927 for obj in gc.get_objects():
928 if isinstance(obj, git.Repo):
929 repos.append(obj)
930 self.assertEqual(len(repos), 0)
931 self.assertEmptyQueues()
932
933 def shutdown(self):
934 self.log.debug("Shutting down after tests")
935 self.launcher.stop()
936 self.merge_server.stop()
937 self.merge_server.join()
938 self.merge_client.stop()
939 self.worker.shutdown()
940 self.gerrit.stop()
941 self.timer.stop()
942 self.sched.stop()
943 self.sched.join()
944 self.statsd.stop()
945 self.statsd.join()
946 self.webapp.stop()
947 self.webapp.join()
948 self.rpc.stop()
949 self.rpc.join()
950 self.gearman_server.shutdown()
951 threads = threading.enumerate()
952 if len(threads) > 1:
953 self.log.error("More than one thread is running: %s" % threads)
954 super(ZuulTestCase, self).tearDown()
955
956 def init_repo(self, project):
957 parts = project.split('/')
958 path = os.path.join(self.upstream_root, *parts[:-1])
959 if not os.path.exists(path):
960 os.makedirs(path)
961 path = os.path.join(self.upstream_root, project)
962 repo = git.Repo.init(path)
963
964 repo.config_writer().set_value('user', 'email', 'user@example.com')
965 repo.config_writer().set_value('user', 'name', 'User Name')
966 repo.config_writer().write()
967
968 fn = os.path.join(path, 'README')
969 f = open(fn, 'w')
970 f.write("test\n")
971 f.close()
972 repo.index.add([fn])
973 repo.index.commit('initial commit')
974 master = repo.create_head('master')
975 repo.create_tag('init')
976
977 mp = repo.create_head('mp')
978 repo.head.reference = mp
979 f = open(fn, 'a')
980 f.write("test mp\n")
981 f.close()
982 repo.index.add([fn])
983 repo.index.commit('mp commit')
984
985 repo.head.reference = master
986 repo.head.reset(index=True, working_tree=True)
987 repo.git.clean('-x', '-f', '-d')
988
989 def ref_has_change(self, ref, change):
990 path = os.path.join(self.git_root, change.project)
991 repo = git.Repo(path)
992 for commit in repo.iter_commits(ref):
993 if commit.message.strip() == ('%s-1' % change.subject):
994 return True
995 return False
996
997 def job_has_changes(self, *args):
998 job = args[0]
999 commits = args[1:]
1000 if isinstance(job, FakeBuild):
1001 parameters = job.parameters
1002 else:
1003 parameters = json.loads(job.arguments)
1004 project = parameters['ZUUL_PROJECT']
1005 path = os.path.join(self.git_root, project)
1006 repo = git.Repo(path)
1007 ref = parameters['ZUUL_REF']
1008 sha = parameters['ZUUL_COMMIT']
1009 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1010 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1011 commit_messages = ['%s-1' % commit.subject for commit in commits]
1012 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1013 " repo_messages %s; sha %s" % (job, commit_messages,
1014 repo_messages, sha))
1015 for msg in commit_messages:
1016 if msg not in repo_messages:
1017 self.log.debug(" messages do not match")
1018 return False
1019 if repo_shas[0] != sha:
1020 self.log.debug(" sha does not match")
1021 return False
1022 self.log.debug(" OK")
1023 return True
1024
1025 def registerJobs(self):
1026 count = 0
1027 for job in self.sched.layout.jobs.keys():
1028 self.worker.registerFunction('build:' + job)
1029 count += 1
1030 self.worker.registerFunction('stop:' + self.worker.worker_id)
1031 count += 1
1032
1033 while len(self.gearman_server.functions) < count:
1034 time.sleep(0)
1035
1036 def release(self, job):
1037 if isinstance(job, FakeBuild):
1038 job.release()
1039 else:
1040 job.waiting = False
1041 self.log.debug("Queued job %s released" % job.unique)
1042 self.gearman_server.wakeConnections()
1043
1044 def getParameter(self, job, name):
1045 if isinstance(job, FakeBuild):
1046 return job.parameters[name]
1047 else:
1048 parameters = json.loads(job.arguments)
1049 return parameters[name]
1050
1051 def resetGearmanServer(self):
1052 self.worker.setFunctions([])
1053 while True:
1054 done = True
1055 for connection in self.gearman_server.active_connections:
1056 if (connection.functions and
1057 connection.client_id not in ['Zuul RPC Listener',
1058 'Zuul Merger']):
1059 done = False
1060 if done:
1061 break
1062 time.sleep(0)
1063 self.gearman_server.functions = set()
1064 self.rpc.register()
1065 self.merge_server.register()
1066
1067 def haveAllBuildsReported(self):
1068 # See if Zuul is waiting on a meta job to complete
1069 if self.launcher.meta_jobs:
1070 return False
1071 # Find out if every build that the worker has completed has been
1072 # reported back to Zuul. If it hasn't then that means a Gearman
1073 # event is still in transit and the system is not stable.
1074 for build in self.worker.build_history:
1075 zbuild = self.launcher.builds.get(build.uuid)
1076 if not zbuild:
1077 # It has already been reported
1078 continue
1079 # It hasn't been reported yet.
1080 return False
1081 # Make sure that none of the worker connections are in GRAB_WAIT
1082 for connection in self.worker.active_connections:
1083 if connection.state == 'GRAB_WAIT':
1084 return False
1085 return True
1086
1087 def areAllBuildsWaiting(self):
1088 ret = True
1089
1090 builds = self.launcher.builds.values()
1091 for build in builds:
1092 client_job = None
1093 for conn in self.launcher.gearman.active_connections:
1094 for j in conn.related_jobs.values():
1095 if j.unique == build.uuid:
1096 client_job = j
1097 break
1098 if not client_job:
1099 self.log.debug("%s is not known to the gearman client" %
1100 build)
1101 ret = False
1102 continue
1103 if not client_job.handle:
1104 self.log.debug("%s has no handle" % client_job)
1105 ret = False
1106 continue
1107 server_job = self.gearman_server.jobs.get(client_job.handle)
1108 if not server_job:
1109 self.log.debug("%s is not known to the gearman server" %
1110 client_job)
1111 ret = False
1112 continue
1113 if not hasattr(server_job, 'waiting'):
1114 self.log.debug("%s is being enqueued" % server_job)
1115 ret = False
1116 continue
1117 if server_job.waiting:
1118 continue
1119 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1120 if worker_job:
1121 if worker_job.build.isWaiting():
1122 continue
1123 else:
1124 self.log.debug("%s is running" % worker_job)
1125 ret = False
1126 else:
1127 self.log.debug("%s is unassigned" % server_job)
1128 ret = False
1129 return ret
1130
1131 def waitUntilSettled(self):
1132 self.log.debug("Waiting until settled...")
1133 start = time.time()
1134 while True:
1135 if time.time() - start > 10:
1136 print 'queue status:',
1137 print self.sched.trigger_event_queue.empty(),
1138 print self.sched.result_event_queue.empty(),
1139 print self.fake_gerrit.event_queue.empty(),
1140 print self.areAllBuildsWaiting()
1141 raise Exception("Timeout waiting for Zuul to settle")
1142 # Make sure no new events show up while we're checking
1143 self.worker.lock.acquire()
1144 # have all build states propogated to zuul?
1145 if self.haveAllBuildsReported():
1146 # Join ensures that the queue is empty _and_ events have been
1147 # processed
1148 self.fake_gerrit.event_queue.join()
1149 self.sched.trigger_event_queue.join()
1150 self.sched.result_event_queue.join()
1151 self.sched.run_handler_lock.acquire()
1152 if (self.sched.trigger_event_queue.empty() and
1153 self.sched.result_event_queue.empty() and
1154 self.fake_gerrit.event_queue.empty() and
1155 not self.merge_client.build_sets and
1156 self.haveAllBuildsReported() and
1157 self.areAllBuildsWaiting()):
1158 self.sched.run_handler_lock.release()
1159 self.worker.lock.release()
1160 self.log.debug("...settled.")
1161 return
1162 self.sched.run_handler_lock.release()
1163 self.worker.lock.release()
1164 self.sched.wake_event.wait(0.1)
1165
1166 def countJobResults(self, jobs, result):
1167 jobs = filter(lambda x: x.result == result, jobs)
1168 return len(jobs)
1169
1170 def getJobFromHistory(self, name):
1171 history = self.worker.build_history
1172 for job in history:
1173 if job.name == name:
1174 return job
1175 raise Exception("Unable to find job %s in history" % name)
1176
1177 def assertEmptyQueues(self):
1178 # Make sure there are no orphaned jobs
1179 for pipeline in self.sched.layout.pipelines.values():
1180 for queue in pipeline.queues:
1181 if len(queue.queue) != 0:
1182 print 'pipeline %s queue %s contents %s' % (
1183 pipeline.name, queue.name, queue.queue)
1184 self.assertEqual(len(queue.queue), 0)
1185
1186 def assertReportedStat(self, key, value=None, kind=None):
1187 start = time.time()
1188 while time.time() < (start + 5):
1189 for stat in self.statsd.stats:
1190 pprint.pprint(self.statsd.stats)
1191 k, v = stat.split(':')
1192 if key == k:
1193 if value is None and kind is None:
1194 return
1195 elif value:
1196 if value == v:
1197 return
1198 elif kind:
1199 if v.endswith('|' + kind):
1200 return
1201 time.sleep(0.1)
1202
1203 pprint.pprint(self.statsd.stats)
1204 raise Exception("Key %s not found in reported stats" % key)