blob: 753bc5e600e46dc1635ee1a25f7ee2f8fa74bc3a [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
43
44import zuul.scheduler
45import zuul.webapp
46import zuul.rpclistener
47import zuul.launcher.gearman
48import zuul.lib.swift
49import zuul.merger.server
50import zuul.merger.client
51import zuul.reporter.gerrit
52import zuul.reporter.smtp
53import zuul.trigger.gerrit
54import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070055import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070056
57FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
58 'fixtures')
59
60logging.basicConfig(level=logging.DEBUG,
61 format='%(asctime)s %(name)-32s '
62 '%(levelname)-8s %(message)s')
63
64
65def repack_repo(path):
66 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
67 output = subprocess.Popen(cmd, close_fds=True,
68 stdout=subprocess.PIPE,
69 stderr=subprocess.PIPE)
70 out = output.communicate()
71 if output.returncode:
72 raise Exception("git repack returned %d" % output.returncode)
73 return out
74
75
76def random_sha1():
77 return hashlib.sha1(str(random.random())).hexdigest()
78
79
80class ChangeReference(git.Reference):
81 _common_path_default = "refs/changes"
82 _points_to_commits_only = True
83
84
85class FakeChange(object):
86 categories = {'APRV': ('Approved', -1, 1),
87 'CRVW': ('Code-Review', -2, 2),
88 'VRFY': ('Verified', -2, 2)}
89
90 def __init__(self, gerrit, number, project, branch, subject,
91 status='NEW', upstream_root=None):
92 self.gerrit = gerrit
93 self.reported = 0
94 self.queried = 0
95 self.patchsets = []
96 self.number = number
97 self.project = project
98 self.branch = branch
99 self.subject = subject
100 self.latest_patchset = 0
101 self.depends_on_change = None
102 self.needed_by_changes = []
103 self.fail_merge = False
104 self.messages = []
105 self.data = {
106 'branch': branch,
107 'comments': [],
108 'commitMessage': subject,
109 'createdOn': time.time(),
110 'id': 'I' + random_sha1(),
111 'lastUpdated': time.time(),
112 'number': str(number),
113 'open': status == 'NEW',
114 'owner': {'email': 'user@example.com',
115 'name': 'User Name',
116 'username': 'username'},
117 'patchSets': self.patchsets,
118 'project': project,
119 'status': status,
120 'subject': subject,
121 'submitRecords': [],
122 'url': 'https://hostname/%s' % number}
123
124 self.upstream_root = upstream_root
125 self.addPatchset()
126 self.data['submitRecords'] = self.getSubmitRecords()
127 self.open = status == 'NEW'
128
129 def add_fake_change_to_repo(self, msg, fn, large):
130 path = os.path.join(self.upstream_root, self.project)
131 repo = git.Repo(path)
132 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
133 self.latest_patchset),
134 'refs/tags/init')
135 repo.head.reference = ref
136 repo.head.reset(index=True, working_tree=True)
137 repo.git.clean('-x', '-f', '-d')
138
139 path = os.path.join(self.upstream_root, self.project)
140 if not large:
141 fn = os.path.join(path, fn)
142 f = open(fn, 'w')
143 f.write("test %s %s %s\n" %
144 (self.branch, self.number, self.latest_patchset))
145 f.close()
146 repo.index.add([fn])
147 else:
148 for fni in range(100):
149 fn = os.path.join(path, str(fni))
150 f = open(fn, 'w')
151 for ci in range(4096):
152 f.write(random.choice(string.printable))
153 f.close()
154 repo.index.add([fn])
155
156 r = repo.index.commit(msg)
157 repo.head.reference = 'master'
158 repo.head.reset(index=True, working_tree=True)
159 repo.git.clean('-x', '-f', '-d')
160 repo.heads['master'].checkout()
161 return r
162
163 def addPatchset(self, files=[], large=False):
164 self.latest_patchset += 1
165 if files:
166 fn = files[0]
167 else:
168 fn = '%s-%s' % (self.branch, self.number)
169 msg = self.subject + '-' + str(self.latest_patchset)
170 c = self.add_fake_change_to_repo(msg, fn, large)
171 ps_files = [{'file': '/COMMIT_MSG',
172 'type': 'ADDED'},
173 {'file': 'README',
174 'type': 'MODIFIED'}]
175 for f in files:
176 ps_files.append({'file': f, 'type': 'ADDED'})
177 d = {'approvals': [],
178 'createdOn': time.time(),
179 'files': ps_files,
180 'number': str(self.latest_patchset),
181 'ref': 'refs/changes/1/%s/%s' % (self.number,
182 self.latest_patchset),
183 'revision': c.hexsha,
184 'uploader': {'email': 'user@example.com',
185 'name': 'User name',
186 'username': 'user'}}
187 self.data['currentPatchSet'] = d
188 self.patchsets.append(d)
189 self.data['submitRecords'] = self.getSubmitRecords()
190
191 def getPatchsetCreatedEvent(self, patchset):
192 event = {"type": "patchset-created",
193 "change": {"project": self.project,
194 "branch": self.branch,
195 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
196 "number": str(self.number),
197 "subject": self.subject,
198 "owner": {"name": "User Name"},
199 "url": "https://hostname/3"},
200 "patchSet": self.patchsets[patchset - 1],
201 "uploader": {"name": "User Name"}}
202 return event
203
204 def getChangeRestoredEvent(self):
205 event = {"type": "change-restored",
206 "change": {"project": self.project,
207 "branch": self.branch,
208 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
209 "number": str(self.number),
210 "subject": self.subject,
211 "owner": {"name": "User Name"},
212 "url": "https://hostname/3"},
213 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100214 "patchSet": self.patchsets[-1],
215 "reason": ""}
216 return event
217
218 def getChangeAbandonedEvent(self):
219 event = {"type": "change-abandoned",
220 "change": {"project": self.project,
221 "branch": self.branch,
222 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
223 "number": str(self.number),
224 "subject": self.subject,
225 "owner": {"name": "User Name"},
226 "url": "https://hostname/3"},
227 "abandoner": {"name": "User Name"},
228 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700229 "reason": ""}
230 return event
231
232 def getChangeCommentEvent(self, patchset):
233 event = {"type": "comment-added",
234 "change": {"project": self.project,
235 "branch": self.branch,
236 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
237 "number": str(self.number),
238 "subject": self.subject,
239 "owner": {"name": "User Name"},
240 "url": "https://hostname/3"},
241 "patchSet": self.patchsets[patchset - 1],
242 "author": {"name": "User Name"},
243 "approvals": [{"type": "Code-Review",
244 "description": "Code-Review",
245 "value": "0"}],
246 "comment": "This is a comment"}
247 return event
248
249 def addApproval(self, category, value, username='jenkins',
250 granted_on=None):
251 if not granted_on:
252 granted_on = time.time()
253 approval = {'description': self.categories[category][0],
254 'type': category,
255 'value': str(value),
256 'by': {
257 'username': username,
258 'email': username + '@example.com',
259 },
260 'grantedOn': int(granted_on)}
261 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
262 if x['by']['username'] == username and x['type'] == category:
263 del self.patchsets[-1]['approvals'][i]
264 self.patchsets[-1]['approvals'].append(approval)
265 event = {'approvals': [approval],
266 'author': {'email': 'user@example.com',
267 'name': 'User Name',
268 'username': 'username'},
269 'change': {'branch': self.branch,
270 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
271 'number': str(self.number),
272 'owner': {'email': 'user@example.com',
273 'name': 'User Name',
274 'username': 'username'},
275 'project': self.project,
276 'subject': self.subject,
277 'topic': 'master',
278 'url': 'https://hostname/459'},
279 'comment': '',
280 'patchSet': self.patchsets[-1],
281 'type': 'comment-added'}
282 self.data['submitRecords'] = self.getSubmitRecords()
283 return json.loads(json.dumps(event))
284
285 def getSubmitRecords(self):
286 status = {}
287 for cat in self.categories.keys():
288 status[cat] = 0
289
290 for a in self.patchsets[-1]['approvals']:
291 cur = status[a['type']]
292 cat_min, cat_max = self.categories[a['type']][1:]
293 new = int(a['value'])
294 if new == cat_min:
295 cur = new
296 elif abs(new) > abs(cur):
297 cur = new
298 status[a['type']] = cur
299
300 labels = []
301 ok = True
302 for typ, cat in self.categories.items():
303 cur = status[typ]
304 cat_min, cat_max = cat[1:]
305 if cur == cat_min:
306 value = 'REJECT'
307 ok = False
308 elif cur == cat_max:
309 value = 'OK'
310 else:
311 value = 'NEED'
312 ok = False
313 labels.append({'label': cat[0], 'status': value})
314 if ok:
315 return [{'status': 'OK'}]
316 return [{'status': 'NOT_READY',
317 'labels': labels}]
318
319 def setDependsOn(self, other, patchset):
320 self.depends_on_change = other
321 d = {'id': other.data['id'],
322 'number': other.data['number'],
323 'ref': other.patchsets[patchset - 1]['ref']
324 }
325 self.data['dependsOn'] = [d]
326
327 other.needed_by_changes.append(self)
328 needed = other.data.get('neededBy', [])
329 d = {'id': self.data['id'],
330 'number': self.data['number'],
331 'ref': self.patchsets[patchset - 1]['ref'],
332 'revision': self.patchsets[patchset - 1]['revision']
333 }
334 needed.append(d)
335 other.data['neededBy'] = needed
336
337 def query(self):
338 self.queried += 1
339 d = self.data.get('dependsOn')
340 if d:
341 d = d[0]
342 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
343 d['isCurrentPatchSet'] = True
344 else:
345 d['isCurrentPatchSet'] = False
346 return json.loads(json.dumps(self.data))
347
348 def setMerged(self):
349 if (self.depends_on_change and
350 self.depends_on_change.data['status'] != 'MERGED'):
351 return
352 if self.fail_merge:
353 return
354 self.data['status'] = 'MERGED'
355 self.open = False
356
357 path = os.path.join(self.upstream_root, self.project)
358 repo = git.Repo(path)
359 repo.heads[self.branch].commit = \
360 repo.commit(self.patchsets[-1]['revision'])
361
362 def setReported(self):
363 self.reported += 1
364
365
366class FakeGerrit(object):
367 def __init__(self, *args, **kw):
368 self.event_queue = Queue.Queue()
369 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
370 self.change_number = 0
371 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700372 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700373
374 def addFakeChange(self, project, branch, subject, status='NEW'):
375 self.change_number += 1
376 c = FakeChange(self, self.change_number, project, branch, subject,
377 upstream_root=self.upstream_root,
378 status=status)
379 self.changes[self.change_number] = c
380 return c
381
382 def addEvent(self, data):
383 return self.event_queue.put(data)
384
385 def getEvent(self):
386 return self.event_queue.get()
387
388 def eventDone(self):
389 self.event_queue.task_done()
390
391 def review(self, project, changeid, message, action):
392 number, ps = changeid.split(',')
393 change = self.changes[int(number)]
394 change.messages.append(message)
395 if 'submit' in action:
396 change.setMerged()
397 if message:
398 change.setReported()
399
400 def query(self, number):
401 change = self.changes.get(int(number))
402 if change:
403 return change.query()
404 return {}
405
James E. Blairc494d542014-08-06 09:23:52 -0700406 def simpleQuery(self, query):
407 # This is currently only used to return all open changes for a
408 # project
James E. Blairf8ff9932014-08-15 15:24:24 -0700409 self.queries.append(query)
410 l = [change.query() for change in self.changes.values()]
411 l.append({"type":"stats","rowCount":1,"runTimeMilliseconds":3})
412 return l
James E. Blairc494d542014-08-06 09:23:52 -0700413
Clark Boylanb640e052014-04-03 16:41:46 -0700414 def startWatching(self, *args, **kw):
415 pass
416
417
418class BuildHistory(object):
419 def __init__(self, **kw):
420 self.__dict__.update(kw)
421
422 def __repr__(self):
423 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
424 (self.result, self.name, self.number, self.changes))
425
426
427class FakeURLOpener(object):
428 def __init__(self, upstream_root, fake_gerrit, url):
429 self.upstream_root = upstream_root
430 self.fake_gerrit = fake_gerrit
431 self.url = url
432
433 def read(self):
434 res = urlparse.urlparse(self.url)
435 path = res.path
436 project = '/'.join(path.split('/')[2:-2])
437 ret = '001e# service=git-upload-pack\n'
438 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
439 'multi_ack thin-pack side-band side-band-64k ofs-delta '
440 'shallow no-progress include-tag multi_ack_detailed no-done\n')
441 path = os.path.join(self.upstream_root, project)
442 repo = git.Repo(path)
443 for ref in repo.refs:
444 r = ref.object.hexsha + ' ' + ref.path + '\n'
445 ret += '%04x%s' % (len(r) + 4, r)
446 ret += '0000'
447 return ret
448
449
450class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
451 name = 'gerrit'
452
453 def __init__(self, upstream_root, *args):
454 super(FakeGerritTrigger, self).__init__(*args)
455 self.upstream_root = upstream_root
456
457 def getGitUrl(self, project):
458 return os.path.join(self.upstream_root, project.name)
459
460
461class FakeStatsd(threading.Thread):
462 def __init__(self):
463 threading.Thread.__init__(self)
464 self.daemon = True
465 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
466 self.sock.bind(('', 0))
467 self.port = self.sock.getsockname()[1]
468 self.wake_read, self.wake_write = os.pipe()
469 self.stats = []
470
471 def run(self):
472 while True:
473 poll = select.poll()
474 poll.register(self.sock, select.POLLIN)
475 poll.register(self.wake_read, select.POLLIN)
476 ret = poll.poll()
477 for (fd, event) in ret:
478 if fd == self.sock.fileno():
479 data = self.sock.recvfrom(1024)
480 if not data:
481 return
482 self.stats.append(data[0])
483 if fd == self.wake_read:
484 return
485
486 def stop(self):
487 os.write(self.wake_write, '1\n')
488
489
490class FakeBuild(threading.Thread):
491 log = logging.getLogger("zuul.test")
492
493 def __init__(self, worker, job, number, node):
494 threading.Thread.__init__(self)
495 self.daemon = True
496 self.worker = worker
497 self.job = job
498 self.name = job.name.split(':')[1]
499 self.number = number
500 self.node = node
501 self.parameters = json.loads(job.arguments)
502 self.unique = self.parameters['ZUUL_UUID']
503 self.wait_condition = threading.Condition()
504 self.waiting = False
505 self.aborted = False
506 self.created = time.time()
507 self.description = ''
508 self.run_error = False
509
510 def release(self):
511 self.wait_condition.acquire()
512 self.wait_condition.notify()
513 self.waiting = False
514 self.log.debug("Build %s released" % self.unique)
515 self.wait_condition.release()
516
517 def isWaiting(self):
518 self.wait_condition.acquire()
519 if self.waiting:
520 ret = True
521 else:
522 ret = False
523 self.wait_condition.release()
524 return ret
525
526 def _wait(self):
527 self.wait_condition.acquire()
528 self.waiting = True
529 self.log.debug("Build %s waiting" % self.unique)
530 self.wait_condition.wait()
531 self.wait_condition.release()
532
533 def run(self):
534 data = {
535 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
536 'name': self.name,
537 'number': self.number,
538 'manager': self.worker.worker_id,
539 'worker_name': 'My Worker',
540 'worker_hostname': 'localhost',
541 'worker_ips': ['127.0.0.1', '192.168.1.1'],
542 'worker_fqdn': 'zuul.example.org',
543 'worker_program': 'FakeBuilder',
544 'worker_version': 'v1.1',
545 'worker_extra': {'something': 'else'}
546 }
547
548 self.log.debug('Running build %s' % self.unique)
549
550 self.job.sendWorkData(json.dumps(data))
551 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
552 self.job.sendWorkStatus(0, 100)
553
554 if self.worker.hold_jobs_in_build:
555 self.log.debug('Holding build %s' % self.unique)
556 self._wait()
557 self.log.debug("Build %s continuing" % self.unique)
558
559 self.worker.lock.acquire()
560
561 result = 'SUCCESS'
562 if (('ZUUL_REF' in self.parameters) and
563 self.worker.shouldFailTest(self.name,
564 self.parameters['ZUUL_REF'])):
565 result = 'FAILURE'
566 if self.aborted:
567 result = 'ABORTED'
568
569 if self.run_error:
570 work_fail = True
571 result = 'RUN_ERROR'
572 else:
573 data['result'] = result
574 work_fail = False
575
576 changes = None
577 if 'ZUUL_CHANGE_IDS' in self.parameters:
578 changes = self.parameters['ZUUL_CHANGE_IDS']
579
580 self.worker.build_history.append(
581 BuildHistory(name=self.name, number=self.number,
582 result=result, changes=changes, node=self.node,
583 uuid=self.unique, description=self.description,
584 pipeline=self.parameters['ZUUL_PIPELINE'])
585 )
586
587 self.job.sendWorkData(json.dumps(data))
588 if work_fail:
589 self.job.sendWorkFail()
590 else:
591 self.job.sendWorkComplete(json.dumps(data))
592 del self.worker.gearman_jobs[self.job.unique]
593 self.worker.running_builds.remove(self)
594 self.worker.lock.release()
595
596
597class FakeWorker(gear.Worker):
598 def __init__(self, worker_id, test):
599 super(FakeWorker, self).__init__(worker_id)
600 self.gearman_jobs = {}
601 self.build_history = []
602 self.running_builds = []
603 self.build_counter = 0
604 self.fail_tests = {}
605 self.test = test
606
607 self.hold_jobs_in_build = False
608 self.lock = threading.Lock()
609 self.__work_thread = threading.Thread(target=self.work)
610 self.__work_thread.daemon = True
611 self.__work_thread.start()
612
613 def handleJob(self, job):
614 parts = job.name.split(":")
615 cmd = parts[0]
616 name = parts[1]
617 if len(parts) > 2:
618 node = parts[2]
619 else:
620 node = None
621 if cmd == 'build':
622 self.handleBuild(job, name, node)
623 elif cmd == 'stop':
624 self.handleStop(job, name)
625 elif cmd == 'set_description':
626 self.handleSetDescription(job, name)
627
628 def handleBuild(self, job, name, node):
629 build = FakeBuild(self, job, self.build_counter, node)
630 job.build = build
631 self.gearman_jobs[job.unique] = job
632 self.build_counter += 1
633
634 self.running_builds.append(build)
635 build.start()
636
637 def handleStop(self, job, name):
638 self.log.debug("handle stop")
639 parameters = json.loads(job.arguments)
640 name = parameters['name']
641 number = parameters['number']
642 for build in self.running_builds:
643 if build.name == name and build.number == number:
644 build.aborted = True
645 build.release()
646 job.sendWorkComplete()
647 return
648 job.sendWorkFail()
649
650 def handleSetDescription(self, job, name):
651 self.log.debug("handle set description")
652 parameters = json.loads(job.arguments)
653 name = parameters['name']
654 number = parameters['number']
655 descr = parameters['html_description']
656 for build in self.running_builds:
657 if build.name == name and build.number == number:
658 build.description = descr
659 job.sendWorkComplete()
660 return
661 for build in self.build_history:
662 if build.name == name and build.number == number:
663 build.description = descr
664 job.sendWorkComplete()
665 return
666 job.sendWorkFail()
667
668 def work(self):
669 while self.running:
670 try:
671 job = self.getJob()
672 except gear.InterruptedError:
673 continue
674 try:
675 self.handleJob(job)
676 except:
677 self.log.exception("Worker exception:")
678
679 def addFailTest(self, name, change):
680 l = self.fail_tests.get(name, [])
681 l.append(change)
682 self.fail_tests[name] = l
683
684 def shouldFailTest(self, name, ref):
685 l = self.fail_tests.get(name, [])
686 for change in l:
687 if self.test.ref_has_change(ref, change):
688 return True
689 return False
690
691 def release(self, regex=None):
692 builds = self.running_builds[:]
693 self.log.debug("releasing build %s (%s)" % (regex,
694 len(self.running_builds)))
695 for build in builds:
696 if not regex or re.match(regex, build.name):
697 self.log.debug("releasing build %s" %
698 (build.parameters['ZUUL_UUID']))
699 build.release()
700 else:
701 self.log.debug("not releasing build %s" %
702 (build.parameters['ZUUL_UUID']))
703 self.log.debug("done releasing builds %s (%s)" %
704 (regex, len(self.running_builds)))
705
706
707class FakeGearmanServer(gear.Server):
708 def __init__(self):
709 self.hold_jobs_in_queue = False
710 super(FakeGearmanServer, self).__init__(0)
711
712 def getJobForConnection(self, connection, peek=False):
713 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
714 for job in queue:
715 if not hasattr(job, 'waiting'):
716 if job.name.startswith('build:'):
717 job.waiting = self.hold_jobs_in_queue
718 else:
719 job.waiting = False
720 if job.waiting:
721 continue
722 if job.name in connection.functions:
723 if not peek:
724 queue.remove(job)
725 connection.related_jobs[job.handle] = job
726 job.worker_connection = connection
727 job.running = True
728 return job
729 return None
730
731 def release(self, regex=None):
732 released = False
733 qlen = (len(self.high_queue) + len(self.normal_queue) +
734 len(self.low_queue))
735 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
736 for job in self.getQueue():
737 cmd, name = job.name.split(':')
738 if cmd != 'build':
739 continue
740 if not regex or re.match(regex, name):
741 self.log.debug("releasing queued job %s" %
742 job.unique)
743 job.waiting = False
744 released = True
745 else:
746 self.log.debug("not releasing queued job %s" %
747 job.unique)
748 if released:
749 self.wakeConnections()
750 qlen = (len(self.high_queue) + len(self.normal_queue) +
751 len(self.low_queue))
752 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
753
754
755class FakeSMTP(object):
756 log = logging.getLogger('zuul.FakeSMTP')
757
758 def __init__(self, messages, server, port):
759 self.server = server
760 self.port = port
761 self.messages = messages
762
763 def sendmail(self, from_email, to_email, msg):
764 self.log.info("Sending email from %s, to %s, with msg %s" % (
765 from_email, to_email, msg))
766
767 headers = msg.split('\n\n', 1)[0]
768 body = msg.split('\n\n', 1)[1]
769
770 self.messages.append(dict(
771 from_email=from_email,
772 to_email=to_email,
773 msg=msg,
774 headers=headers,
775 body=body,
776 ))
777
778 return True
779
780 def quit(self):
781 return True
782
783
784class FakeSwiftClientConnection(swiftclient.client.Connection):
785 def post_account(self, headers):
786 # Do nothing
787 pass
788
789 def get_auth(self):
790 # Returns endpoint and (unused) auth token
791 endpoint = os.path.join('https://storage.example.org', 'V1',
792 'AUTH_account')
793 return endpoint, ''
794
795
796class ZuulTestCase(testtools.TestCase):
797 log = logging.getLogger("zuul.test")
798
799 def setUp(self):
800 super(ZuulTestCase, self).setUp()
801 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
802 try:
803 test_timeout = int(test_timeout)
804 except ValueError:
805 # If timeout value is invalid do not set a timeout.
806 test_timeout = 0
807 if test_timeout > 0:
808 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
809
810 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
811 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
812 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
813 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
814 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
815 os.environ.get('OS_STDERR_CAPTURE') == '1'):
816 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
817 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
818 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
819 os.environ.get('OS_LOG_CAPTURE') == '1'):
820 self.useFixture(fixtures.FakeLogger(
821 level=logging.DEBUG,
822 format='%(asctime)s %(name)-32s '
823 '%(levelname)-8s %(message)s'))
824 tmp_root = self.useFixture(fixtures.TempDir(
825 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
826 self.test_root = os.path.join(tmp_root, "zuul-test")
827 self.upstream_root = os.path.join(self.test_root, "upstream")
828 self.git_root = os.path.join(self.test_root, "git")
829
830 if os.path.exists(self.test_root):
831 shutil.rmtree(self.test_root)
832 os.makedirs(self.test_root)
833 os.makedirs(self.upstream_root)
834 os.makedirs(self.git_root)
835
836 # Make per test copy of Configuration.
837 self.setup_config()
838 self.config.set('zuul', 'layout_config',
839 os.path.join(FIXTURE_DIR, "layout.yaml"))
840 self.config.set('merger', 'git_dir', self.git_root)
841
842 # For each project in config:
843 self.init_repo("org/project")
844 self.init_repo("org/project1")
845 self.init_repo("org/project2")
846 self.init_repo("org/project3")
847 self.init_repo("org/one-job-project")
848 self.init_repo("org/nonvoting-project")
849 self.init_repo("org/templated-project")
850 self.init_repo("org/layered-project")
851 self.init_repo("org/node-project")
852 self.init_repo("org/conflict-project")
853 self.init_repo("org/noop-project")
854 self.init_repo("org/experimental-project")
855
856 self.statsd = FakeStatsd()
857 os.environ['STATSD_HOST'] = 'localhost'
858 os.environ['STATSD_PORT'] = str(self.statsd.port)
859 self.statsd.start()
860 # the statsd client object is configured in the statsd module import
861 reload(statsd)
862 reload(zuul.scheduler)
863
864 self.gearman_server = FakeGearmanServer()
865
866 self.config.set('gearman', 'port', str(self.gearman_server.port))
867
868 self.worker = FakeWorker('fake_worker', self)
869 self.worker.addServer('127.0.0.1', self.gearman_server.port)
870 self.gearman_server.worker = self.worker
871
872 self.merge_server = zuul.merger.server.MergeServer(self.config)
873 self.merge_server.start()
874
875 self.sched = zuul.scheduler.Scheduler()
876
877 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
878 FakeSwiftClientConnection))
879 self.swift = zuul.lib.swift.Swift(self.config)
880
881 def URLOpenerFactory(*args, **kw):
882 if isinstance(args[0], urllib2.Request):
883 return old_urlopen(*args, **kw)
884 args = [self.fake_gerrit] + list(args)
885 return FakeURLOpener(self.upstream_root, *args, **kw)
886
887 old_urlopen = urllib2.urlopen
888 urllib2.urlopen = URLOpenerFactory
889
890 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
891 self.swift)
892 self.merge_client = zuul.merger.client.MergeClient(
893 self.config, self.sched)
894
895 self.smtp_messages = []
896
897 def FakeSMTPFactory(*args, **kw):
898 args = [self.smtp_messages] + list(args)
899 return FakeSMTP(*args, **kw)
900
901 zuul.lib.gerrit.Gerrit = FakeGerrit
902 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
903
904 self.gerrit = FakeGerritTrigger(
905 self.upstream_root, self.config, self.sched)
906 self.gerrit.replication_timeout = 1.5
907 self.gerrit.replication_retry_interval = 0.5
908 self.fake_gerrit = self.gerrit.gerrit
909 self.fake_gerrit.upstream_root = self.upstream_root
910
911 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
912 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
913
914 self.sched.setLauncher(self.launcher)
915 self.sched.setMerger(self.merge_client)
916 self.sched.registerTrigger(self.gerrit)
917 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
918 self.sched.registerTrigger(self.timer)
James E. Blairc494d542014-08-06 09:23:52 -0700919 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config, self.sched)
920 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700921
922 self.sched.registerReporter(
923 zuul.reporter.gerrit.Reporter(self.gerrit))
924 self.smtp_reporter = zuul.reporter.smtp.Reporter(
925 self.config.get('smtp', 'default_from'),
926 self.config.get('smtp', 'default_to'),
927 self.config.get('smtp', 'server'))
928 self.sched.registerReporter(self.smtp_reporter)
929
930 self.sched.start()
931 self.sched.reconfigure(self.config)
932 self.sched.resume()
933 self.webapp.start()
934 self.rpc.start()
935 self.launcher.gearman.waitForServer()
936 self.registerJobs()
937 self.builds = self.worker.running_builds
938 self.history = self.worker.build_history
939
940 self.addCleanup(self.assertFinalState)
941 self.addCleanup(self.shutdown)
942
943 def setup_config(self):
944 """Per test config object. Override to set different config."""
945 self.config = ConfigParser.ConfigParser()
946 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
947
948 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700949 # Make sure that git.Repo objects have been garbage collected.
950 repos = []
951 gc.collect()
952 for obj in gc.get_objects():
953 if isinstance(obj, git.Repo):
954 repos.append(obj)
955 self.assertEqual(len(repos), 0)
956 self.assertEmptyQueues()
957
958 def shutdown(self):
959 self.log.debug("Shutting down after tests")
960 self.launcher.stop()
961 self.merge_server.stop()
962 self.merge_server.join()
963 self.merge_client.stop()
964 self.worker.shutdown()
965 self.gerrit.stop()
966 self.timer.stop()
967 self.sched.stop()
968 self.sched.join()
969 self.statsd.stop()
970 self.statsd.join()
971 self.webapp.stop()
972 self.webapp.join()
973 self.rpc.stop()
974 self.rpc.join()
975 self.gearman_server.shutdown()
976 threads = threading.enumerate()
977 if len(threads) > 1:
978 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -0700979
980 def init_repo(self, project):
981 parts = project.split('/')
982 path = os.path.join(self.upstream_root, *parts[:-1])
983 if not os.path.exists(path):
984 os.makedirs(path)
985 path = os.path.join(self.upstream_root, project)
986 repo = git.Repo.init(path)
987
988 repo.config_writer().set_value('user', 'email', 'user@example.com')
989 repo.config_writer().set_value('user', 'name', 'User Name')
990 repo.config_writer().write()
991
992 fn = os.path.join(path, 'README')
993 f = open(fn, 'w')
994 f.write("test\n")
995 f.close()
996 repo.index.add([fn])
997 repo.index.commit('initial commit')
998 master = repo.create_head('master')
999 repo.create_tag('init')
1000
1001 mp = repo.create_head('mp')
1002 repo.head.reference = mp
1003 f = open(fn, 'a')
1004 f.write("test mp\n")
1005 f.close()
1006 repo.index.add([fn])
1007 repo.index.commit('mp commit')
1008
1009 repo.head.reference = master
1010 repo.head.reset(index=True, working_tree=True)
1011 repo.git.clean('-x', '-f', '-d')
1012
1013 def ref_has_change(self, ref, change):
1014 path = os.path.join(self.git_root, change.project)
1015 repo = git.Repo(path)
1016 for commit in repo.iter_commits(ref):
1017 if commit.message.strip() == ('%s-1' % change.subject):
1018 return True
1019 return False
1020
1021 def job_has_changes(self, *args):
1022 job = args[0]
1023 commits = args[1:]
1024 if isinstance(job, FakeBuild):
1025 parameters = job.parameters
1026 else:
1027 parameters = json.loads(job.arguments)
1028 project = parameters['ZUUL_PROJECT']
1029 path = os.path.join(self.git_root, project)
1030 repo = git.Repo(path)
1031 ref = parameters['ZUUL_REF']
1032 sha = parameters['ZUUL_COMMIT']
1033 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1034 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1035 commit_messages = ['%s-1' % commit.subject for commit in commits]
1036 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1037 " repo_messages %s; sha %s" % (job, commit_messages,
1038 repo_messages, sha))
1039 for msg in commit_messages:
1040 if msg not in repo_messages:
1041 self.log.debug(" messages do not match")
1042 return False
1043 if repo_shas[0] != sha:
1044 self.log.debug(" sha does not match")
1045 return False
1046 self.log.debug(" OK")
1047 return True
1048
1049 def registerJobs(self):
1050 count = 0
1051 for job in self.sched.layout.jobs.keys():
1052 self.worker.registerFunction('build:' + job)
1053 count += 1
1054 self.worker.registerFunction('stop:' + self.worker.worker_id)
1055 count += 1
1056
1057 while len(self.gearman_server.functions) < count:
1058 time.sleep(0)
1059
1060 def release(self, job):
1061 if isinstance(job, FakeBuild):
1062 job.release()
1063 else:
1064 job.waiting = False
1065 self.log.debug("Queued job %s released" % job.unique)
1066 self.gearman_server.wakeConnections()
1067
1068 def getParameter(self, job, name):
1069 if isinstance(job, FakeBuild):
1070 return job.parameters[name]
1071 else:
1072 parameters = json.loads(job.arguments)
1073 return parameters[name]
1074
1075 def resetGearmanServer(self):
1076 self.worker.setFunctions([])
1077 while True:
1078 done = True
1079 for connection in self.gearman_server.active_connections:
1080 if (connection.functions and
1081 connection.client_id not in ['Zuul RPC Listener',
1082 'Zuul Merger']):
1083 done = False
1084 if done:
1085 break
1086 time.sleep(0)
1087 self.gearman_server.functions = set()
1088 self.rpc.register()
1089 self.merge_server.register()
1090
1091 def haveAllBuildsReported(self):
1092 # See if Zuul is waiting on a meta job to complete
1093 if self.launcher.meta_jobs:
1094 return False
1095 # Find out if every build that the worker has completed has been
1096 # reported back to Zuul. If it hasn't then that means a Gearman
1097 # event is still in transit and the system is not stable.
1098 for build in self.worker.build_history:
1099 zbuild = self.launcher.builds.get(build.uuid)
1100 if not zbuild:
1101 # It has already been reported
1102 continue
1103 # It hasn't been reported yet.
1104 return False
1105 # Make sure that none of the worker connections are in GRAB_WAIT
1106 for connection in self.worker.active_connections:
1107 if connection.state == 'GRAB_WAIT':
1108 return False
1109 return True
1110
1111 def areAllBuildsWaiting(self):
1112 ret = True
1113
1114 builds = self.launcher.builds.values()
1115 for build in builds:
1116 client_job = None
1117 for conn in self.launcher.gearman.active_connections:
1118 for j in conn.related_jobs.values():
1119 if j.unique == build.uuid:
1120 client_job = j
1121 break
1122 if not client_job:
1123 self.log.debug("%s is not known to the gearman client" %
1124 build)
1125 ret = False
1126 continue
1127 if not client_job.handle:
1128 self.log.debug("%s has no handle" % client_job)
1129 ret = False
1130 continue
1131 server_job = self.gearman_server.jobs.get(client_job.handle)
1132 if not server_job:
1133 self.log.debug("%s is not known to the gearman server" %
1134 client_job)
1135 ret = False
1136 continue
1137 if not hasattr(server_job, 'waiting'):
1138 self.log.debug("%s is being enqueued" % server_job)
1139 ret = False
1140 continue
1141 if server_job.waiting:
1142 continue
1143 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1144 if worker_job:
1145 if worker_job.build.isWaiting():
1146 continue
1147 else:
1148 self.log.debug("%s is running" % worker_job)
1149 ret = False
1150 else:
1151 self.log.debug("%s is unassigned" % server_job)
1152 ret = False
1153 return ret
1154
1155 def waitUntilSettled(self):
1156 self.log.debug("Waiting until settled...")
1157 start = time.time()
1158 while True:
1159 if time.time() - start > 10:
1160 print 'queue status:',
1161 print self.sched.trigger_event_queue.empty(),
1162 print self.sched.result_event_queue.empty(),
1163 print self.fake_gerrit.event_queue.empty(),
1164 print self.areAllBuildsWaiting()
1165 raise Exception("Timeout waiting for Zuul to settle")
1166 # Make sure no new events show up while we're checking
1167 self.worker.lock.acquire()
1168 # have all build states propogated to zuul?
1169 if self.haveAllBuildsReported():
1170 # Join ensures that the queue is empty _and_ events have been
1171 # processed
1172 self.fake_gerrit.event_queue.join()
1173 self.sched.trigger_event_queue.join()
1174 self.sched.result_event_queue.join()
1175 self.sched.run_handler_lock.acquire()
1176 if (self.sched.trigger_event_queue.empty() and
1177 self.sched.result_event_queue.empty() and
1178 self.fake_gerrit.event_queue.empty() and
1179 not self.merge_client.build_sets and
1180 self.haveAllBuildsReported() and
1181 self.areAllBuildsWaiting()):
1182 self.sched.run_handler_lock.release()
1183 self.worker.lock.release()
1184 self.log.debug("...settled.")
1185 return
1186 self.sched.run_handler_lock.release()
1187 self.worker.lock.release()
1188 self.sched.wake_event.wait(0.1)
1189
1190 def countJobResults(self, jobs, result):
1191 jobs = filter(lambda x: x.result == result, jobs)
1192 return len(jobs)
1193
1194 def getJobFromHistory(self, name):
1195 history = self.worker.build_history
1196 for job in history:
1197 if job.name == name:
1198 return job
1199 raise Exception("Unable to find job %s in history" % name)
1200
1201 def assertEmptyQueues(self):
1202 # Make sure there are no orphaned jobs
1203 for pipeline in self.sched.layout.pipelines.values():
1204 for queue in pipeline.queues:
1205 if len(queue.queue) != 0:
1206 print 'pipeline %s queue %s contents %s' % (
1207 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001208 self.assertEqual(len(queue.queue), 0,
1209 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001210
1211 def assertReportedStat(self, key, value=None, kind=None):
1212 start = time.time()
1213 while time.time() < (start + 5):
1214 for stat in self.statsd.stats:
1215 pprint.pprint(self.statsd.stats)
1216 k, v = stat.split(':')
1217 if key == k:
1218 if value is None and kind is None:
1219 return
1220 elif value:
1221 if value == v:
1222 return
1223 elif kind:
1224 if v.endswith('|' + kind):
1225 return
1226 time.sleep(0.1)
1227
1228 pprint.pprint(self.statsd.stats)
1229 raise Exception("Key %s not found in reported stats" % key)