blob: a86de82d73a0981d830358a7037a2936b970e03b [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
43
44import zuul.scheduler
45import zuul.webapp
46import zuul.rpclistener
47import zuul.launcher.gearman
48import zuul.lib.swift
49import zuul.merger.server
50import zuul.merger.client
51import zuul.reporter.gerrit
52import zuul.reporter.smtp
53import zuul.trigger.gerrit
54import zuul.trigger.timer
55
56FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
57 'fixtures')
58
59logging.basicConfig(level=logging.DEBUG,
60 format='%(asctime)s %(name)-32s '
61 '%(levelname)-8s %(message)s')
62
63
64def repack_repo(path):
65 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
66 output = subprocess.Popen(cmd, close_fds=True,
67 stdout=subprocess.PIPE,
68 stderr=subprocess.PIPE)
69 out = output.communicate()
70 if output.returncode:
71 raise Exception("git repack returned %d" % output.returncode)
72 return out
73
74
75def random_sha1():
76 return hashlib.sha1(str(random.random())).hexdigest()
77
78
79class ChangeReference(git.Reference):
80 _common_path_default = "refs/changes"
81 _points_to_commits_only = True
82
83
84class FakeChange(object):
85 categories = {'APRV': ('Approved', -1, 1),
86 'CRVW': ('Code-Review', -2, 2),
87 'VRFY': ('Verified', -2, 2)}
88
89 def __init__(self, gerrit, number, project, branch, subject,
90 status='NEW', upstream_root=None):
91 self.gerrit = gerrit
92 self.reported = 0
93 self.queried = 0
94 self.patchsets = []
95 self.number = number
96 self.project = project
97 self.branch = branch
98 self.subject = subject
99 self.latest_patchset = 0
100 self.depends_on_change = None
101 self.needed_by_changes = []
102 self.fail_merge = False
103 self.messages = []
104 self.data = {
105 'branch': branch,
106 'comments': [],
107 'commitMessage': subject,
108 'createdOn': time.time(),
109 'id': 'I' + random_sha1(),
110 'lastUpdated': time.time(),
111 'number': str(number),
112 'open': status == 'NEW',
113 'owner': {'email': 'user@example.com',
114 'name': 'User Name',
115 'username': 'username'},
116 'patchSets': self.patchsets,
117 'project': project,
118 'status': status,
119 'subject': subject,
120 'submitRecords': [],
121 'url': 'https://hostname/%s' % number}
122
123 self.upstream_root = upstream_root
124 self.addPatchset()
125 self.data['submitRecords'] = self.getSubmitRecords()
126 self.open = status == 'NEW'
127
128 def add_fake_change_to_repo(self, msg, fn, large):
129 path = os.path.join(self.upstream_root, self.project)
130 repo = git.Repo(path)
131 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
132 self.latest_patchset),
133 'refs/tags/init')
134 repo.head.reference = ref
135 repo.head.reset(index=True, working_tree=True)
136 repo.git.clean('-x', '-f', '-d')
137
138 path = os.path.join(self.upstream_root, self.project)
139 if not large:
140 fn = os.path.join(path, fn)
141 f = open(fn, 'w')
142 f.write("test %s %s %s\n" %
143 (self.branch, self.number, self.latest_patchset))
144 f.close()
145 repo.index.add([fn])
146 else:
147 for fni in range(100):
148 fn = os.path.join(path, str(fni))
149 f = open(fn, 'w')
150 for ci in range(4096):
151 f.write(random.choice(string.printable))
152 f.close()
153 repo.index.add([fn])
154
155 r = repo.index.commit(msg)
156 repo.head.reference = 'master'
157 repo.head.reset(index=True, working_tree=True)
158 repo.git.clean('-x', '-f', '-d')
159 repo.heads['master'].checkout()
160 return r
161
162 def addPatchset(self, files=[], large=False):
163 self.latest_patchset += 1
164 if files:
165 fn = files[0]
166 else:
167 fn = '%s-%s' % (self.branch, self.number)
168 msg = self.subject + '-' + str(self.latest_patchset)
169 c = self.add_fake_change_to_repo(msg, fn, large)
170 ps_files = [{'file': '/COMMIT_MSG',
171 'type': 'ADDED'},
172 {'file': 'README',
173 'type': 'MODIFIED'}]
174 for f in files:
175 ps_files.append({'file': f, 'type': 'ADDED'})
176 d = {'approvals': [],
177 'createdOn': time.time(),
178 'files': ps_files,
179 'number': str(self.latest_patchset),
180 'ref': 'refs/changes/1/%s/%s' % (self.number,
181 self.latest_patchset),
182 'revision': c.hexsha,
183 'uploader': {'email': 'user@example.com',
184 'name': 'User name',
185 'username': 'user'}}
186 self.data['currentPatchSet'] = d
187 self.patchsets.append(d)
188 self.data['submitRecords'] = self.getSubmitRecords()
189
190 def getPatchsetCreatedEvent(self, patchset):
191 event = {"type": "patchset-created",
192 "change": {"project": self.project,
193 "branch": self.branch,
194 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
195 "number": str(self.number),
196 "subject": self.subject,
197 "owner": {"name": "User Name"},
198 "url": "https://hostname/3"},
199 "patchSet": self.patchsets[patchset - 1],
200 "uploader": {"name": "User Name"}}
201 return event
202
203 def getChangeRestoredEvent(self):
204 event = {"type": "change-restored",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100213 "patchSet": self.patchsets[-1],
214 "reason": ""}
215 return event
216
217 def getChangeAbandonedEvent(self):
218 event = {"type": "change-abandoned",
219 "change": {"project": self.project,
220 "branch": self.branch,
221 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
222 "number": str(self.number),
223 "subject": self.subject,
224 "owner": {"name": "User Name"},
225 "url": "https://hostname/3"},
226 "abandoner": {"name": "User Name"},
227 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700228 "reason": ""}
229 return event
230
231 def getChangeCommentEvent(self, patchset):
232 event = {"type": "comment-added",
233 "change": {"project": self.project,
234 "branch": self.branch,
235 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
236 "number": str(self.number),
237 "subject": self.subject,
238 "owner": {"name": "User Name"},
239 "url": "https://hostname/3"},
240 "patchSet": self.patchsets[patchset - 1],
241 "author": {"name": "User Name"},
242 "approvals": [{"type": "Code-Review",
243 "description": "Code-Review",
244 "value": "0"}],
245 "comment": "This is a comment"}
246 return event
247
248 def addApproval(self, category, value, username='jenkins',
249 granted_on=None):
250 if not granted_on:
251 granted_on = time.time()
252 approval = {'description': self.categories[category][0],
253 'type': category,
254 'value': str(value),
255 'by': {
256 'username': username,
257 'email': username + '@example.com',
258 },
259 'grantedOn': int(granted_on)}
260 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
261 if x['by']['username'] == username and x['type'] == category:
262 del self.patchsets[-1]['approvals'][i]
263 self.patchsets[-1]['approvals'].append(approval)
264 event = {'approvals': [approval],
265 'author': {'email': 'user@example.com',
266 'name': 'User Name',
267 'username': 'username'},
268 'change': {'branch': self.branch,
269 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
270 'number': str(self.number),
271 'owner': {'email': 'user@example.com',
272 'name': 'User Name',
273 'username': 'username'},
274 'project': self.project,
275 'subject': self.subject,
276 'topic': 'master',
277 'url': 'https://hostname/459'},
278 'comment': '',
279 'patchSet': self.patchsets[-1],
280 'type': 'comment-added'}
281 self.data['submitRecords'] = self.getSubmitRecords()
282 return json.loads(json.dumps(event))
283
284 def getSubmitRecords(self):
285 status = {}
286 for cat in self.categories.keys():
287 status[cat] = 0
288
289 for a in self.patchsets[-1]['approvals']:
290 cur = status[a['type']]
291 cat_min, cat_max = self.categories[a['type']][1:]
292 new = int(a['value'])
293 if new == cat_min:
294 cur = new
295 elif abs(new) > abs(cur):
296 cur = new
297 status[a['type']] = cur
298
299 labels = []
300 ok = True
301 for typ, cat in self.categories.items():
302 cur = status[typ]
303 cat_min, cat_max = cat[1:]
304 if cur == cat_min:
305 value = 'REJECT'
306 ok = False
307 elif cur == cat_max:
308 value = 'OK'
309 else:
310 value = 'NEED'
311 ok = False
312 labels.append({'label': cat[0], 'status': value})
313 if ok:
314 return [{'status': 'OK'}]
315 return [{'status': 'NOT_READY',
316 'labels': labels}]
317
318 def setDependsOn(self, other, patchset):
319 self.depends_on_change = other
320 d = {'id': other.data['id'],
321 'number': other.data['number'],
322 'ref': other.patchsets[patchset - 1]['ref']
323 }
324 self.data['dependsOn'] = [d]
325
326 other.needed_by_changes.append(self)
327 needed = other.data.get('neededBy', [])
328 d = {'id': self.data['id'],
329 'number': self.data['number'],
330 'ref': self.patchsets[patchset - 1]['ref'],
331 'revision': self.patchsets[patchset - 1]['revision']
332 }
333 needed.append(d)
334 other.data['neededBy'] = needed
335
336 def query(self):
337 self.queried += 1
338 d = self.data.get('dependsOn')
339 if d:
340 d = d[0]
341 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
342 d['isCurrentPatchSet'] = True
343 else:
344 d['isCurrentPatchSet'] = False
345 return json.loads(json.dumps(self.data))
346
347 def setMerged(self):
348 if (self.depends_on_change and
349 self.depends_on_change.data['status'] != 'MERGED'):
350 return
351 if self.fail_merge:
352 return
353 self.data['status'] = 'MERGED'
354 self.open = False
355
356 path = os.path.join(self.upstream_root, self.project)
357 repo = git.Repo(path)
358 repo.heads[self.branch].commit = \
359 repo.commit(self.patchsets[-1]['revision'])
360
361 def setReported(self):
362 self.reported += 1
363
364
365class FakeGerrit(object):
366 def __init__(self, *args, **kw):
367 self.event_queue = Queue.Queue()
368 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
369 self.change_number = 0
370 self.changes = {}
371
372 def addFakeChange(self, project, branch, subject, status='NEW'):
373 self.change_number += 1
374 c = FakeChange(self, self.change_number, project, branch, subject,
375 upstream_root=self.upstream_root,
376 status=status)
377 self.changes[self.change_number] = c
378 return c
379
380 def addEvent(self, data):
381 return self.event_queue.put(data)
382
383 def getEvent(self):
384 return self.event_queue.get()
385
386 def eventDone(self):
387 self.event_queue.task_done()
388
389 def review(self, project, changeid, message, action):
390 number, ps = changeid.split(',')
391 change = self.changes[int(number)]
392 change.messages.append(message)
393 if 'submit' in action:
394 change.setMerged()
395 if message:
396 change.setReported()
397
398 def query(self, number):
399 change = self.changes.get(int(number))
400 if change:
401 return change.query()
402 return {}
403
404 def startWatching(self, *args, **kw):
405 pass
406
407
408class BuildHistory(object):
409 def __init__(self, **kw):
410 self.__dict__.update(kw)
411
412 def __repr__(self):
413 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
414 (self.result, self.name, self.number, self.changes))
415
416
417class FakeURLOpener(object):
418 def __init__(self, upstream_root, fake_gerrit, url):
419 self.upstream_root = upstream_root
420 self.fake_gerrit = fake_gerrit
421 self.url = url
422
423 def read(self):
424 res = urlparse.urlparse(self.url)
425 path = res.path
426 project = '/'.join(path.split('/')[2:-2])
427 ret = '001e# service=git-upload-pack\n'
428 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
429 'multi_ack thin-pack side-band side-band-64k ofs-delta '
430 'shallow no-progress include-tag multi_ack_detailed no-done\n')
431 path = os.path.join(self.upstream_root, project)
432 repo = git.Repo(path)
433 for ref in repo.refs:
434 r = ref.object.hexsha + ' ' + ref.path + '\n'
435 ret += '%04x%s' % (len(r) + 4, r)
436 ret += '0000'
437 return ret
438
439
440class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
441 name = 'gerrit'
442
443 def __init__(self, upstream_root, *args):
444 super(FakeGerritTrigger, self).__init__(*args)
445 self.upstream_root = upstream_root
446
447 def getGitUrl(self, project):
448 return os.path.join(self.upstream_root, project.name)
449
450
451class FakeStatsd(threading.Thread):
452 def __init__(self):
453 threading.Thread.__init__(self)
454 self.daemon = True
455 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
456 self.sock.bind(('', 0))
457 self.port = self.sock.getsockname()[1]
458 self.wake_read, self.wake_write = os.pipe()
459 self.stats = []
460
461 def run(self):
462 while True:
463 poll = select.poll()
464 poll.register(self.sock, select.POLLIN)
465 poll.register(self.wake_read, select.POLLIN)
466 ret = poll.poll()
467 for (fd, event) in ret:
468 if fd == self.sock.fileno():
469 data = self.sock.recvfrom(1024)
470 if not data:
471 return
472 self.stats.append(data[0])
473 if fd == self.wake_read:
474 return
475
476 def stop(self):
477 os.write(self.wake_write, '1\n')
478
479
480class FakeBuild(threading.Thread):
481 log = logging.getLogger("zuul.test")
482
483 def __init__(self, worker, job, number, node):
484 threading.Thread.__init__(self)
485 self.daemon = True
486 self.worker = worker
487 self.job = job
488 self.name = job.name.split(':')[1]
489 self.number = number
490 self.node = node
491 self.parameters = json.loads(job.arguments)
492 self.unique = self.parameters['ZUUL_UUID']
493 self.wait_condition = threading.Condition()
494 self.waiting = False
495 self.aborted = False
496 self.created = time.time()
497 self.description = ''
498 self.run_error = False
499
500 def release(self):
501 self.wait_condition.acquire()
502 self.wait_condition.notify()
503 self.waiting = False
504 self.log.debug("Build %s released" % self.unique)
505 self.wait_condition.release()
506
507 def isWaiting(self):
508 self.wait_condition.acquire()
509 if self.waiting:
510 ret = True
511 else:
512 ret = False
513 self.wait_condition.release()
514 return ret
515
516 def _wait(self):
517 self.wait_condition.acquire()
518 self.waiting = True
519 self.log.debug("Build %s waiting" % self.unique)
520 self.wait_condition.wait()
521 self.wait_condition.release()
522
523 def run(self):
524 data = {
525 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
526 'name': self.name,
527 'number': self.number,
528 'manager': self.worker.worker_id,
529 'worker_name': 'My Worker',
530 'worker_hostname': 'localhost',
531 'worker_ips': ['127.0.0.1', '192.168.1.1'],
532 'worker_fqdn': 'zuul.example.org',
533 'worker_program': 'FakeBuilder',
534 'worker_version': 'v1.1',
535 'worker_extra': {'something': 'else'}
536 }
537
538 self.log.debug('Running build %s' % self.unique)
539
540 self.job.sendWorkData(json.dumps(data))
541 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
542 self.job.sendWorkStatus(0, 100)
543
544 if self.worker.hold_jobs_in_build:
545 self.log.debug('Holding build %s' % self.unique)
546 self._wait()
547 self.log.debug("Build %s continuing" % self.unique)
548
549 self.worker.lock.acquire()
550
551 result = 'SUCCESS'
552 if (('ZUUL_REF' in self.parameters) and
553 self.worker.shouldFailTest(self.name,
554 self.parameters['ZUUL_REF'])):
555 result = 'FAILURE'
556 if self.aborted:
557 result = 'ABORTED'
558
559 if self.run_error:
560 work_fail = True
561 result = 'RUN_ERROR'
562 else:
563 data['result'] = result
564 work_fail = False
565
566 changes = None
567 if 'ZUUL_CHANGE_IDS' in self.parameters:
568 changes = self.parameters['ZUUL_CHANGE_IDS']
569
570 self.worker.build_history.append(
571 BuildHistory(name=self.name, number=self.number,
572 result=result, changes=changes, node=self.node,
573 uuid=self.unique, description=self.description,
574 pipeline=self.parameters['ZUUL_PIPELINE'])
575 )
576
577 self.job.sendWorkData(json.dumps(data))
578 if work_fail:
579 self.job.sendWorkFail()
580 else:
581 self.job.sendWorkComplete(json.dumps(data))
582 del self.worker.gearman_jobs[self.job.unique]
583 self.worker.running_builds.remove(self)
584 self.worker.lock.release()
585
586
587class FakeWorker(gear.Worker):
588 def __init__(self, worker_id, test):
589 super(FakeWorker, self).__init__(worker_id)
590 self.gearman_jobs = {}
591 self.build_history = []
592 self.running_builds = []
593 self.build_counter = 0
594 self.fail_tests = {}
595 self.test = test
596
597 self.hold_jobs_in_build = False
598 self.lock = threading.Lock()
599 self.__work_thread = threading.Thread(target=self.work)
600 self.__work_thread.daemon = True
601 self.__work_thread.start()
602
603 def handleJob(self, job):
604 parts = job.name.split(":")
605 cmd = parts[0]
606 name = parts[1]
607 if len(parts) > 2:
608 node = parts[2]
609 else:
610 node = None
611 if cmd == 'build':
612 self.handleBuild(job, name, node)
613 elif cmd == 'stop':
614 self.handleStop(job, name)
615 elif cmd == 'set_description':
616 self.handleSetDescription(job, name)
617
618 def handleBuild(self, job, name, node):
619 build = FakeBuild(self, job, self.build_counter, node)
620 job.build = build
621 self.gearman_jobs[job.unique] = job
622 self.build_counter += 1
623
624 self.running_builds.append(build)
625 build.start()
626
627 def handleStop(self, job, name):
628 self.log.debug("handle stop")
629 parameters = json.loads(job.arguments)
630 name = parameters['name']
631 number = parameters['number']
632 for build in self.running_builds:
633 if build.name == name and build.number == number:
634 build.aborted = True
635 build.release()
636 job.sendWorkComplete()
637 return
638 job.sendWorkFail()
639
640 def handleSetDescription(self, job, name):
641 self.log.debug("handle set description")
642 parameters = json.loads(job.arguments)
643 name = parameters['name']
644 number = parameters['number']
645 descr = parameters['html_description']
646 for build in self.running_builds:
647 if build.name == name and build.number == number:
648 build.description = descr
649 job.sendWorkComplete()
650 return
651 for build in self.build_history:
652 if build.name == name and build.number == number:
653 build.description = descr
654 job.sendWorkComplete()
655 return
656 job.sendWorkFail()
657
658 def work(self):
659 while self.running:
660 try:
661 job = self.getJob()
662 except gear.InterruptedError:
663 continue
664 try:
665 self.handleJob(job)
666 except:
667 self.log.exception("Worker exception:")
668
669 def addFailTest(self, name, change):
670 l = self.fail_tests.get(name, [])
671 l.append(change)
672 self.fail_tests[name] = l
673
674 def shouldFailTest(self, name, ref):
675 l = self.fail_tests.get(name, [])
676 for change in l:
677 if self.test.ref_has_change(ref, change):
678 return True
679 return False
680
681 def release(self, regex=None):
682 builds = self.running_builds[:]
683 self.log.debug("releasing build %s (%s)" % (regex,
684 len(self.running_builds)))
685 for build in builds:
686 if not regex or re.match(regex, build.name):
687 self.log.debug("releasing build %s" %
688 (build.parameters['ZUUL_UUID']))
689 build.release()
690 else:
691 self.log.debug("not releasing build %s" %
692 (build.parameters['ZUUL_UUID']))
693 self.log.debug("done releasing builds %s (%s)" %
694 (regex, len(self.running_builds)))
695
696
697class FakeGearmanServer(gear.Server):
698 def __init__(self):
699 self.hold_jobs_in_queue = False
700 super(FakeGearmanServer, self).__init__(0)
701
702 def getJobForConnection(self, connection, peek=False):
703 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
704 for job in queue:
705 if not hasattr(job, 'waiting'):
706 if job.name.startswith('build:'):
707 job.waiting = self.hold_jobs_in_queue
708 else:
709 job.waiting = False
710 if job.waiting:
711 continue
712 if job.name in connection.functions:
713 if not peek:
714 queue.remove(job)
715 connection.related_jobs[job.handle] = job
716 job.worker_connection = connection
717 job.running = True
718 return job
719 return None
720
721 def release(self, regex=None):
722 released = False
723 qlen = (len(self.high_queue) + len(self.normal_queue) +
724 len(self.low_queue))
725 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
726 for job in self.getQueue():
727 cmd, name = job.name.split(':')
728 if cmd != 'build':
729 continue
730 if not regex or re.match(regex, name):
731 self.log.debug("releasing queued job %s" %
732 job.unique)
733 job.waiting = False
734 released = True
735 else:
736 self.log.debug("not releasing queued job %s" %
737 job.unique)
738 if released:
739 self.wakeConnections()
740 qlen = (len(self.high_queue) + len(self.normal_queue) +
741 len(self.low_queue))
742 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
743
744
745class FakeSMTP(object):
746 log = logging.getLogger('zuul.FakeSMTP')
747
748 def __init__(self, messages, server, port):
749 self.server = server
750 self.port = port
751 self.messages = messages
752
753 def sendmail(self, from_email, to_email, msg):
754 self.log.info("Sending email from %s, to %s, with msg %s" % (
755 from_email, to_email, msg))
756
757 headers = msg.split('\n\n', 1)[0]
758 body = msg.split('\n\n', 1)[1]
759
760 self.messages.append(dict(
761 from_email=from_email,
762 to_email=to_email,
763 msg=msg,
764 headers=headers,
765 body=body,
766 ))
767
768 return True
769
770 def quit(self):
771 return True
772
773
774class FakeSwiftClientConnection(swiftclient.client.Connection):
775 def post_account(self, headers):
776 # Do nothing
777 pass
778
779 def get_auth(self):
780 # Returns endpoint and (unused) auth token
781 endpoint = os.path.join('https://storage.example.org', 'V1',
782 'AUTH_account')
783 return endpoint, ''
784
785
786class ZuulTestCase(testtools.TestCase):
787 log = logging.getLogger("zuul.test")
788
789 def setUp(self):
790 super(ZuulTestCase, self).setUp()
791 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
792 try:
793 test_timeout = int(test_timeout)
794 except ValueError:
795 # If timeout value is invalid do not set a timeout.
796 test_timeout = 0
797 if test_timeout > 0:
798 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
799
800 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
801 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
802 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
803 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
804 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
805 os.environ.get('OS_STDERR_CAPTURE') == '1'):
806 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
807 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
808 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
809 os.environ.get('OS_LOG_CAPTURE') == '1'):
810 self.useFixture(fixtures.FakeLogger(
811 level=logging.DEBUG,
812 format='%(asctime)s %(name)-32s '
813 '%(levelname)-8s %(message)s'))
814 tmp_root = self.useFixture(fixtures.TempDir(
815 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
816 self.test_root = os.path.join(tmp_root, "zuul-test")
817 self.upstream_root = os.path.join(self.test_root, "upstream")
818 self.git_root = os.path.join(self.test_root, "git")
819
820 if os.path.exists(self.test_root):
821 shutil.rmtree(self.test_root)
822 os.makedirs(self.test_root)
823 os.makedirs(self.upstream_root)
824 os.makedirs(self.git_root)
825
826 # Make per test copy of Configuration.
827 self.setup_config()
828 self.config.set('zuul', 'layout_config',
829 os.path.join(FIXTURE_DIR, "layout.yaml"))
830 self.config.set('merger', 'git_dir', self.git_root)
831
832 # For each project in config:
833 self.init_repo("org/project")
834 self.init_repo("org/project1")
835 self.init_repo("org/project2")
836 self.init_repo("org/project3")
837 self.init_repo("org/one-job-project")
838 self.init_repo("org/nonvoting-project")
839 self.init_repo("org/templated-project")
840 self.init_repo("org/layered-project")
841 self.init_repo("org/node-project")
842 self.init_repo("org/conflict-project")
843 self.init_repo("org/noop-project")
844 self.init_repo("org/experimental-project")
845
846 self.statsd = FakeStatsd()
847 os.environ['STATSD_HOST'] = 'localhost'
848 os.environ['STATSD_PORT'] = str(self.statsd.port)
849 self.statsd.start()
850 # the statsd client object is configured in the statsd module import
851 reload(statsd)
852 reload(zuul.scheduler)
853
854 self.gearman_server = FakeGearmanServer()
855
856 self.config.set('gearman', 'port', str(self.gearman_server.port))
857
858 self.worker = FakeWorker('fake_worker', self)
859 self.worker.addServer('127.0.0.1', self.gearman_server.port)
860 self.gearman_server.worker = self.worker
861
862 self.merge_server = zuul.merger.server.MergeServer(self.config)
863 self.merge_server.start()
864
865 self.sched = zuul.scheduler.Scheduler()
866
867 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
868 FakeSwiftClientConnection))
869 self.swift = zuul.lib.swift.Swift(self.config)
870
871 def URLOpenerFactory(*args, **kw):
872 if isinstance(args[0], urllib2.Request):
873 return old_urlopen(*args, **kw)
874 args = [self.fake_gerrit] + list(args)
875 return FakeURLOpener(self.upstream_root, *args, **kw)
876
877 old_urlopen = urllib2.urlopen
878 urllib2.urlopen = URLOpenerFactory
879
880 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
881 self.swift)
882 self.merge_client = zuul.merger.client.MergeClient(
883 self.config, self.sched)
884
885 self.smtp_messages = []
886
887 def FakeSMTPFactory(*args, **kw):
888 args = [self.smtp_messages] + list(args)
889 return FakeSMTP(*args, **kw)
890
891 zuul.lib.gerrit.Gerrit = FakeGerrit
892 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
893
894 self.gerrit = FakeGerritTrigger(
895 self.upstream_root, self.config, self.sched)
896 self.gerrit.replication_timeout = 1.5
897 self.gerrit.replication_retry_interval = 0.5
898 self.fake_gerrit = self.gerrit.gerrit
899 self.fake_gerrit.upstream_root = self.upstream_root
900
901 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
902 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
903
904 self.sched.setLauncher(self.launcher)
905 self.sched.setMerger(self.merge_client)
906 self.sched.registerTrigger(self.gerrit)
907 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
908 self.sched.registerTrigger(self.timer)
909
910 self.sched.registerReporter(
911 zuul.reporter.gerrit.Reporter(self.gerrit))
912 self.smtp_reporter = zuul.reporter.smtp.Reporter(
913 self.config.get('smtp', 'default_from'),
914 self.config.get('smtp', 'default_to'),
915 self.config.get('smtp', 'server'))
916 self.sched.registerReporter(self.smtp_reporter)
917
918 self.sched.start()
919 self.sched.reconfigure(self.config)
920 self.sched.resume()
921 self.webapp.start()
922 self.rpc.start()
923 self.launcher.gearman.waitForServer()
924 self.registerJobs()
925 self.builds = self.worker.running_builds
926 self.history = self.worker.build_history
927
928 self.addCleanup(self.assertFinalState)
929 self.addCleanup(self.shutdown)
930
931 def setup_config(self):
932 """Per test config object. Override to set different config."""
933 self.config = ConfigParser.ConfigParser()
934 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
935
936 def assertFinalState(self):
937 # Make sure that the change cache is cleared
Antoine Mussobd86a312014-01-08 14:51:33 +0100938 self.assertEqual(len(self.gerrit._change_cache.keys()), 0,
939 "Change cache should have been cleared")
Clark Boylanb640e052014-04-03 16:41:46 -0700940 # Make sure that git.Repo objects have been garbage collected.
941 repos = []
942 gc.collect()
943 for obj in gc.get_objects():
944 if isinstance(obj, git.Repo):
945 repos.append(obj)
946 self.assertEqual(len(repos), 0)
947 self.assertEmptyQueues()
948
949 def shutdown(self):
950 self.log.debug("Shutting down after tests")
951 self.launcher.stop()
952 self.merge_server.stop()
953 self.merge_server.join()
954 self.merge_client.stop()
955 self.worker.shutdown()
956 self.gerrit.stop()
957 self.timer.stop()
958 self.sched.stop()
959 self.sched.join()
960 self.statsd.stop()
961 self.statsd.join()
962 self.webapp.stop()
963 self.webapp.join()
964 self.rpc.stop()
965 self.rpc.join()
966 self.gearman_server.shutdown()
967 threads = threading.enumerate()
968 if len(threads) > 1:
969 self.log.error("More than one thread is running: %s" % threads)
970 super(ZuulTestCase, self).tearDown()
971
972 def init_repo(self, project):
973 parts = project.split('/')
974 path = os.path.join(self.upstream_root, *parts[:-1])
975 if not os.path.exists(path):
976 os.makedirs(path)
977 path = os.path.join(self.upstream_root, project)
978 repo = git.Repo.init(path)
979
980 repo.config_writer().set_value('user', 'email', 'user@example.com')
981 repo.config_writer().set_value('user', 'name', 'User Name')
982 repo.config_writer().write()
983
984 fn = os.path.join(path, 'README')
985 f = open(fn, 'w')
986 f.write("test\n")
987 f.close()
988 repo.index.add([fn])
989 repo.index.commit('initial commit')
990 master = repo.create_head('master')
991 repo.create_tag('init')
992
993 mp = repo.create_head('mp')
994 repo.head.reference = mp
995 f = open(fn, 'a')
996 f.write("test mp\n")
997 f.close()
998 repo.index.add([fn])
999 repo.index.commit('mp commit')
1000
1001 repo.head.reference = master
1002 repo.head.reset(index=True, working_tree=True)
1003 repo.git.clean('-x', '-f', '-d')
1004
1005 def ref_has_change(self, ref, change):
1006 path = os.path.join(self.git_root, change.project)
1007 repo = git.Repo(path)
1008 for commit in repo.iter_commits(ref):
1009 if commit.message.strip() == ('%s-1' % change.subject):
1010 return True
1011 return False
1012
1013 def job_has_changes(self, *args):
1014 job = args[0]
1015 commits = args[1:]
1016 if isinstance(job, FakeBuild):
1017 parameters = job.parameters
1018 else:
1019 parameters = json.loads(job.arguments)
1020 project = parameters['ZUUL_PROJECT']
1021 path = os.path.join(self.git_root, project)
1022 repo = git.Repo(path)
1023 ref = parameters['ZUUL_REF']
1024 sha = parameters['ZUUL_COMMIT']
1025 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1026 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1027 commit_messages = ['%s-1' % commit.subject for commit in commits]
1028 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1029 " repo_messages %s; sha %s" % (job, commit_messages,
1030 repo_messages, sha))
1031 for msg in commit_messages:
1032 if msg not in repo_messages:
1033 self.log.debug(" messages do not match")
1034 return False
1035 if repo_shas[0] != sha:
1036 self.log.debug(" sha does not match")
1037 return False
1038 self.log.debug(" OK")
1039 return True
1040
1041 def registerJobs(self):
1042 count = 0
1043 for job in self.sched.layout.jobs.keys():
1044 self.worker.registerFunction('build:' + job)
1045 count += 1
1046 self.worker.registerFunction('stop:' + self.worker.worker_id)
1047 count += 1
1048
1049 while len(self.gearman_server.functions) < count:
1050 time.sleep(0)
1051
1052 def release(self, job):
1053 if isinstance(job, FakeBuild):
1054 job.release()
1055 else:
1056 job.waiting = False
1057 self.log.debug("Queued job %s released" % job.unique)
1058 self.gearman_server.wakeConnections()
1059
1060 def getParameter(self, job, name):
1061 if isinstance(job, FakeBuild):
1062 return job.parameters[name]
1063 else:
1064 parameters = json.loads(job.arguments)
1065 return parameters[name]
1066
1067 def resetGearmanServer(self):
1068 self.worker.setFunctions([])
1069 while True:
1070 done = True
1071 for connection in self.gearman_server.active_connections:
1072 if (connection.functions and
1073 connection.client_id not in ['Zuul RPC Listener',
1074 'Zuul Merger']):
1075 done = False
1076 if done:
1077 break
1078 time.sleep(0)
1079 self.gearman_server.functions = set()
1080 self.rpc.register()
1081 self.merge_server.register()
1082
1083 def haveAllBuildsReported(self):
1084 # See if Zuul is waiting on a meta job to complete
1085 if self.launcher.meta_jobs:
1086 return False
1087 # Find out if every build that the worker has completed has been
1088 # reported back to Zuul. If it hasn't then that means a Gearman
1089 # event is still in transit and the system is not stable.
1090 for build in self.worker.build_history:
1091 zbuild = self.launcher.builds.get(build.uuid)
1092 if not zbuild:
1093 # It has already been reported
1094 continue
1095 # It hasn't been reported yet.
1096 return False
1097 # Make sure that none of the worker connections are in GRAB_WAIT
1098 for connection in self.worker.active_connections:
1099 if connection.state == 'GRAB_WAIT':
1100 return False
1101 return True
1102
1103 def areAllBuildsWaiting(self):
1104 ret = True
1105
1106 builds = self.launcher.builds.values()
1107 for build in builds:
1108 client_job = None
1109 for conn in self.launcher.gearman.active_connections:
1110 for j in conn.related_jobs.values():
1111 if j.unique == build.uuid:
1112 client_job = j
1113 break
1114 if not client_job:
1115 self.log.debug("%s is not known to the gearman client" %
1116 build)
1117 ret = False
1118 continue
1119 if not client_job.handle:
1120 self.log.debug("%s has no handle" % client_job)
1121 ret = False
1122 continue
1123 server_job = self.gearman_server.jobs.get(client_job.handle)
1124 if not server_job:
1125 self.log.debug("%s is not known to the gearman server" %
1126 client_job)
1127 ret = False
1128 continue
1129 if not hasattr(server_job, 'waiting'):
1130 self.log.debug("%s is being enqueued" % server_job)
1131 ret = False
1132 continue
1133 if server_job.waiting:
1134 continue
1135 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1136 if worker_job:
1137 if worker_job.build.isWaiting():
1138 continue
1139 else:
1140 self.log.debug("%s is running" % worker_job)
1141 ret = False
1142 else:
1143 self.log.debug("%s is unassigned" % server_job)
1144 ret = False
1145 return ret
1146
1147 def waitUntilSettled(self):
1148 self.log.debug("Waiting until settled...")
1149 start = time.time()
1150 while True:
1151 if time.time() - start > 10:
1152 print 'queue status:',
1153 print self.sched.trigger_event_queue.empty(),
1154 print self.sched.result_event_queue.empty(),
1155 print self.fake_gerrit.event_queue.empty(),
1156 print self.areAllBuildsWaiting()
1157 raise Exception("Timeout waiting for Zuul to settle")
1158 # Make sure no new events show up while we're checking
1159 self.worker.lock.acquire()
1160 # have all build states propogated to zuul?
1161 if self.haveAllBuildsReported():
1162 # Join ensures that the queue is empty _and_ events have been
1163 # processed
1164 self.fake_gerrit.event_queue.join()
1165 self.sched.trigger_event_queue.join()
1166 self.sched.result_event_queue.join()
1167 self.sched.run_handler_lock.acquire()
1168 if (self.sched.trigger_event_queue.empty() and
1169 self.sched.result_event_queue.empty() and
1170 self.fake_gerrit.event_queue.empty() and
1171 not self.merge_client.build_sets and
1172 self.haveAllBuildsReported() and
1173 self.areAllBuildsWaiting()):
1174 self.sched.run_handler_lock.release()
1175 self.worker.lock.release()
1176 self.log.debug("...settled.")
1177 return
1178 self.sched.run_handler_lock.release()
1179 self.worker.lock.release()
1180 self.sched.wake_event.wait(0.1)
1181
1182 def countJobResults(self, jobs, result):
1183 jobs = filter(lambda x: x.result == result, jobs)
1184 return len(jobs)
1185
1186 def getJobFromHistory(self, name):
1187 history = self.worker.build_history
1188 for job in history:
1189 if job.name == name:
1190 return job
1191 raise Exception("Unable to find job %s in history" % name)
1192
1193 def assertEmptyQueues(self):
1194 # Make sure there are no orphaned jobs
1195 for pipeline in self.sched.layout.pipelines.values():
1196 for queue in pipeline.queues:
1197 if len(queue.queue) != 0:
1198 print 'pipeline %s queue %s contents %s' % (
1199 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001200 self.assertEqual(len(queue.queue), 0,
1201 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001202
1203 def assertReportedStat(self, key, value=None, kind=None):
1204 start = time.time()
1205 while time.time() < (start + 5):
1206 for stat in self.statsd.stats:
1207 pprint.pprint(self.statsd.stats)
1208 k, v = stat.split(':')
1209 if key == k:
1210 if value is None and kind is None:
1211 return
1212 elif value:
1213 if value == v:
1214 return
1215 elif kind:
1216 if v.endswith('|' + kind):
1217 return
1218 time.sleep(0.1)
1219
1220 pprint.pprint(self.statsd.stats)
1221 raise Exception("Key %s not found in reported stats" % key)