blob: 46c708787995e7dc33f0f7b2b98586b410250260 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
43
44import zuul.scheduler
45import zuul.webapp
46import zuul.rpclistener
47import zuul.launcher.gearman
48import zuul.lib.swift
49import zuul.merger.server
50import zuul.merger.client
51import zuul.reporter.gerrit
52import zuul.reporter.smtp
53import zuul.trigger.gerrit
54import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070055import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070056
57FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
58 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070059USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070060
61logging.basicConfig(level=logging.DEBUG,
62 format='%(asctime)s %(name)-32s '
63 '%(levelname)-8s %(message)s')
64
65
66def repack_repo(path):
67 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
68 output = subprocess.Popen(cmd, close_fds=True,
69 stdout=subprocess.PIPE,
70 stderr=subprocess.PIPE)
71 out = output.communicate()
72 if output.returncode:
73 raise Exception("git repack returned %d" % output.returncode)
74 return out
75
76
77def random_sha1():
78 return hashlib.sha1(str(random.random())).hexdigest()
79
80
81class ChangeReference(git.Reference):
82 _common_path_default = "refs/changes"
83 _points_to_commits_only = True
84
85
86class FakeChange(object):
87 categories = {'APRV': ('Approved', -1, 1),
88 'CRVW': ('Code-Review', -2, 2),
89 'VRFY': ('Verified', -2, 2)}
90
91 def __init__(self, gerrit, number, project, branch, subject,
92 status='NEW', upstream_root=None):
93 self.gerrit = gerrit
94 self.reported = 0
95 self.queried = 0
96 self.patchsets = []
97 self.number = number
98 self.project = project
99 self.branch = branch
100 self.subject = subject
101 self.latest_patchset = 0
102 self.depends_on_change = None
103 self.needed_by_changes = []
104 self.fail_merge = False
105 self.messages = []
106 self.data = {
107 'branch': branch,
108 'comments': [],
109 'commitMessage': subject,
110 'createdOn': time.time(),
111 'id': 'I' + random_sha1(),
112 'lastUpdated': time.time(),
113 'number': str(number),
114 'open': status == 'NEW',
115 'owner': {'email': 'user@example.com',
116 'name': 'User Name',
117 'username': 'username'},
118 'patchSets': self.patchsets,
119 'project': project,
120 'status': status,
121 'subject': subject,
122 'submitRecords': [],
123 'url': 'https://hostname/%s' % number}
124
125 self.upstream_root = upstream_root
126 self.addPatchset()
127 self.data['submitRecords'] = self.getSubmitRecords()
128 self.open = status == 'NEW'
129
130 def add_fake_change_to_repo(self, msg, fn, large):
131 path = os.path.join(self.upstream_root, self.project)
132 repo = git.Repo(path)
133 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
134 self.latest_patchset),
135 'refs/tags/init')
136 repo.head.reference = ref
137 repo.head.reset(index=True, working_tree=True)
138 repo.git.clean('-x', '-f', '-d')
139
140 path = os.path.join(self.upstream_root, self.project)
141 if not large:
142 fn = os.path.join(path, fn)
143 f = open(fn, 'w')
144 f.write("test %s %s %s\n" %
145 (self.branch, self.number, self.latest_patchset))
146 f.close()
147 repo.index.add([fn])
148 else:
149 for fni in range(100):
150 fn = os.path.join(path, str(fni))
151 f = open(fn, 'w')
152 for ci in range(4096):
153 f.write(random.choice(string.printable))
154 f.close()
155 repo.index.add([fn])
156
157 r = repo.index.commit(msg)
158 repo.head.reference = 'master'
159 repo.head.reset(index=True, working_tree=True)
160 repo.git.clean('-x', '-f', '-d')
161 repo.heads['master'].checkout()
162 return r
163
164 def addPatchset(self, files=[], large=False):
165 self.latest_patchset += 1
166 if files:
167 fn = files[0]
168 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700169 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700170 msg = self.subject + '-' + str(self.latest_patchset)
171 c = self.add_fake_change_to_repo(msg, fn, large)
172 ps_files = [{'file': '/COMMIT_MSG',
173 'type': 'ADDED'},
174 {'file': 'README',
175 'type': 'MODIFIED'}]
176 for f in files:
177 ps_files.append({'file': f, 'type': 'ADDED'})
178 d = {'approvals': [],
179 'createdOn': time.time(),
180 'files': ps_files,
181 'number': str(self.latest_patchset),
182 'ref': 'refs/changes/1/%s/%s' % (self.number,
183 self.latest_patchset),
184 'revision': c.hexsha,
185 'uploader': {'email': 'user@example.com',
186 'name': 'User name',
187 'username': 'user'}}
188 self.data['currentPatchSet'] = d
189 self.patchsets.append(d)
190 self.data['submitRecords'] = self.getSubmitRecords()
191
192 def getPatchsetCreatedEvent(self, patchset):
193 event = {"type": "patchset-created",
194 "change": {"project": self.project,
195 "branch": self.branch,
196 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
197 "number": str(self.number),
198 "subject": self.subject,
199 "owner": {"name": "User Name"},
200 "url": "https://hostname/3"},
201 "patchSet": self.patchsets[patchset - 1],
202 "uploader": {"name": "User Name"}}
203 return event
204
205 def getChangeRestoredEvent(self):
206 event = {"type": "change-restored",
207 "change": {"project": self.project,
208 "branch": self.branch,
209 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
210 "number": str(self.number),
211 "subject": self.subject,
212 "owner": {"name": "User Name"},
213 "url": "https://hostname/3"},
214 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100215 "patchSet": self.patchsets[-1],
216 "reason": ""}
217 return event
218
219 def getChangeAbandonedEvent(self):
220 event = {"type": "change-abandoned",
221 "change": {"project": self.project,
222 "branch": self.branch,
223 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
224 "number": str(self.number),
225 "subject": self.subject,
226 "owner": {"name": "User Name"},
227 "url": "https://hostname/3"},
228 "abandoner": {"name": "User Name"},
229 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700230 "reason": ""}
231 return event
232
233 def getChangeCommentEvent(self, patchset):
234 event = {"type": "comment-added",
235 "change": {"project": self.project,
236 "branch": self.branch,
237 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
238 "number": str(self.number),
239 "subject": self.subject,
240 "owner": {"name": "User Name"},
241 "url": "https://hostname/3"},
242 "patchSet": self.patchsets[patchset - 1],
243 "author": {"name": "User Name"},
244 "approvals": [{"type": "Code-Review",
245 "description": "Code-Review",
246 "value": "0"}],
247 "comment": "This is a comment"}
248 return event
249
250 def addApproval(self, category, value, username='jenkins',
251 granted_on=None):
252 if not granted_on:
253 granted_on = time.time()
254 approval = {'description': self.categories[category][0],
255 'type': category,
256 'value': str(value),
257 'by': {
258 'username': username,
259 'email': username + '@example.com',
260 },
261 'grantedOn': int(granted_on)}
262 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
263 if x['by']['username'] == username and x['type'] == category:
264 del self.patchsets[-1]['approvals'][i]
265 self.patchsets[-1]['approvals'].append(approval)
266 event = {'approvals': [approval],
267 'author': {'email': 'user@example.com',
268 'name': 'User Name',
269 'username': 'username'},
270 'change': {'branch': self.branch,
271 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
272 'number': str(self.number),
273 'owner': {'email': 'user@example.com',
274 'name': 'User Name',
275 'username': 'username'},
276 'project': self.project,
277 'subject': self.subject,
278 'topic': 'master',
279 'url': 'https://hostname/459'},
280 'comment': '',
281 'patchSet': self.patchsets[-1],
282 'type': 'comment-added'}
283 self.data['submitRecords'] = self.getSubmitRecords()
284 return json.loads(json.dumps(event))
285
286 def getSubmitRecords(self):
287 status = {}
288 for cat in self.categories.keys():
289 status[cat] = 0
290
291 for a in self.patchsets[-1]['approvals']:
292 cur = status[a['type']]
293 cat_min, cat_max = self.categories[a['type']][1:]
294 new = int(a['value'])
295 if new == cat_min:
296 cur = new
297 elif abs(new) > abs(cur):
298 cur = new
299 status[a['type']] = cur
300
301 labels = []
302 ok = True
303 for typ, cat in self.categories.items():
304 cur = status[typ]
305 cat_min, cat_max = cat[1:]
306 if cur == cat_min:
307 value = 'REJECT'
308 ok = False
309 elif cur == cat_max:
310 value = 'OK'
311 else:
312 value = 'NEED'
313 ok = False
314 labels.append({'label': cat[0], 'status': value})
315 if ok:
316 return [{'status': 'OK'}]
317 return [{'status': 'NOT_READY',
318 'labels': labels}]
319
320 def setDependsOn(self, other, patchset):
321 self.depends_on_change = other
322 d = {'id': other.data['id'],
323 'number': other.data['number'],
324 'ref': other.patchsets[patchset - 1]['ref']
325 }
326 self.data['dependsOn'] = [d]
327
328 other.needed_by_changes.append(self)
329 needed = other.data.get('neededBy', [])
330 d = {'id': self.data['id'],
331 'number': self.data['number'],
332 'ref': self.patchsets[patchset - 1]['ref'],
333 'revision': self.patchsets[patchset - 1]['revision']
334 }
335 needed.append(d)
336 other.data['neededBy'] = needed
337
338 def query(self):
339 self.queried += 1
340 d = self.data.get('dependsOn')
341 if d:
342 d = d[0]
343 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
344 d['isCurrentPatchSet'] = True
345 else:
346 d['isCurrentPatchSet'] = False
347 return json.loads(json.dumps(self.data))
348
349 def setMerged(self):
350 if (self.depends_on_change and
351 self.depends_on_change.data['status'] != 'MERGED'):
352 return
353 if self.fail_merge:
354 return
355 self.data['status'] = 'MERGED'
356 self.open = False
357
358 path = os.path.join(self.upstream_root, self.project)
359 repo = git.Repo(path)
360 repo.heads[self.branch].commit = \
361 repo.commit(self.patchsets[-1]['revision'])
362
363 def setReported(self):
364 self.reported += 1
365
366
367class FakeGerrit(object):
368 def __init__(self, *args, **kw):
369 self.event_queue = Queue.Queue()
370 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
371 self.change_number = 0
372 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700373 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700374
375 def addFakeChange(self, project, branch, subject, status='NEW'):
376 self.change_number += 1
377 c = FakeChange(self, self.change_number, project, branch, subject,
378 upstream_root=self.upstream_root,
379 status=status)
380 self.changes[self.change_number] = c
381 return c
382
383 def addEvent(self, data):
384 return self.event_queue.put(data)
385
386 def getEvent(self):
387 return self.event_queue.get()
388
389 def eventDone(self):
390 self.event_queue.task_done()
391
392 def review(self, project, changeid, message, action):
393 number, ps = changeid.split(',')
394 change = self.changes[int(number)]
395 change.messages.append(message)
396 if 'submit' in action:
397 change.setMerged()
398 if message:
399 change.setReported()
400
401 def query(self, number):
402 change = self.changes.get(int(number))
403 if change:
404 return change.query()
405 return {}
406
James E. Blairc494d542014-08-06 09:23:52 -0700407 def simpleQuery(self, query):
408 # This is currently only used to return all open changes for a
409 # project
James E. Blairf8ff9932014-08-15 15:24:24 -0700410 self.queries.append(query)
411 l = [change.query() for change in self.changes.values()]
412 l.append({"type":"stats","rowCount":1,"runTimeMilliseconds":3})
413 return l
James E. Blairc494d542014-08-06 09:23:52 -0700414
Clark Boylanb640e052014-04-03 16:41:46 -0700415 def startWatching(self, *args, **kw):
416 pass
417
418
419class BuildHistory(object):
420 def __init__(self, **kw):
421 self.__dict__.update(kw)
422
423 def __repr__(self):
424 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
425 (self.result, self.name, self.number, self.changes))
426
427
428class FakeURLOpener(object):
429 def __init__(self, upstream_root, fake_gerrit, url):
430 self.upstream_root = upstream_root
431 self.fake_gerrit = fake_gerrit
432 self.url = url
433
434 def read(self):
435 res = urlparse.urlparse(self.url)
436 path = res.path
437 project = '/'.join(path.split('/')[2:-2])
438 ret = '001e# service=git-upload-pack\n'
439 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
440 'multi_ack thin-pack side-band side-band-64k ofs-delta '
441 'shallow no-progress include-tag multi_ack_detailed no-done\n')
442 path = os.path.join(self.upstream_root, project)
443 repo = git.Repo(path)
444 for ref in repo.refs:
445 r = ref.object.hexsha + ' ' + ref.path + '\n'
446 ret += '%04x%s' % (len(r) + 4, r)
447 ret += '0000'
448 return ret
449
450
451class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
452 name = 'gerrit'
453
454 def __init__(self, upstream_root, *args):
455 super(FakeGerritTrigger, self).__init__(*args)
456 self.upstream_root = upstream_root
457
458 def getGitUrl(self, project):
459 return os.path.join(self.upstream_root, project.name)
460
461
462class FakeStatsd(threading.Thread):
463 def __init__(self):
464 threading.Thread.__init__(self)
465 self.daemon = True
466 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
467 self.sock.bind(('', 0))
468 self.port = self.sock.getsockname()[1]
469 self.wake_read, self.wake_write = os.pipe()
470 self.stats = []
471
472 def run(self):
473 while True:
474 poll = select.poll()
475 poll.register(self.sock, select.POLLIN)
476 poll.register(self.wake_read, select.POLLIN)
477 ret = poll.poll()
478 for (fd, event) in ret:
479 if fd == self.sock.fileno():
480 data = self.sock.recvfrom(1024)
481 if not data:
482 return
483 self.stats.append(data[0])
484 if fd == self.wake_read:
485 return
486
487 def stop(self):
488 os.write(self.wake_write, '1\n')
489
490
491class FakeBuild(threading.Thread):
492 log = logging.getLogger("zuul.test")
493
494 def __init__(self, worker, job, number, node):
495 threading.Thread.__init__(self)
496 self.daemon = True
497 self.worker = worker
498 self.job = job
499 self.name = job.name.split(':')[1]
500 self.number = number
501 self.node = node
502 self.parameters = json.loads(job.arguments)
503 self.unique = self.parameters['ZUUL_UUID']
504 self.wait_condition = threading.Condition()
505 self.waiting = False
506 self.aborted = False
507 self.created = time.time()
508 self.description = ''
509 self.run_error = False
510
511 def release(self):
512 self.wait_condition.acquire()
513 self.wait_condition.notify()
514 self.waiting = False
515 self.log.debug("Build %s released" % self.unique)
516 self.wait_condition.release()
517
518 def isWaiting(self):
519 self.wait_condition.acquire()
520 if self.waiting:
521 ret = True
522 else:
523 ret = False
524 self.wait_condition.release()
525 return ret
526
527 def _wait(self):
528 self.wait_condition.acquire()
529 self.waiting = True
530 self.log.debug("Build %s waiting" % self.unique)
531 self.wait_condition.wait()
532 self.wait_condition.release()
533
534 def run(self):
535 data = {
536 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
537 'name': self.name,
538 'number': self.number,
539 'manager': self.worker.worker_id,
540 'worker_name': 'My Worker',
541 'worker_hostname': 'localhost',
542 'worker_ips': ['127.0.0.1', '192.168.1.1'],
543 'worker_fqdn': 'zuul.example.org',
544 'worker_program': 'FakeBuilder',
545 'worker_version': 'v1.1',
546 'worker_extra': {'something': 'else'}
547 }
548
549 self.log.debug('Running build %s' % self.unique)
550
551 self.job.sendWorkData(json.dumps(data))
552 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
553 self.job.sendWorkStatus(0, 100)
554
555 if self.worker.hold_jobs_in_build:
556 self.log.debug('Holding build %s' % self.unique)
557 self._wait()
558 self.log.debug("Build %s continuing" % self.unique)
559
560 self.worker.lock.acquire()
561
562 result = 'SUCCESS'
563 if (('ZUUL_REF' in self.parameters) and
564 self.worker.shouldFailTest(self.name,
565 self.parameters['ZUUL_REF'])):
566 result = 'FAILURE'
567 if self.aborted:
568 result = 'ABORTED'
569
570 if self.run_error:
571 work_fail = True
572 result = 'RUN_ERROR'
573 else:
574 data['result'] = result
575 work_fail = False
576
577 changes = None
578 if 'ZUUL_CHANGE_IDS' in self.parameters:
579 changes = self.parameters['ZUUL_CHANGE_IDS']
580
581 self.worker.build_history.append(
582 BuildHistory(name=self.name, number=self.number,
583 result=result, changes=changes, node=self.node,
584 uuid=self.unique, description=self.description,
585 pipeline=self.parameters['ZUUL_PIPELINE'])
586 )
587
588 self.job.sendWorkData(json.dumps(data))
589 if work_fail:
590 self.job.sendWorkFail()
591 else:
592 self.job.sendWorkComplete(json.dumps(data))
593 del self.worker.gearman_jobs[self.job.unique]
594 self.worker.running_builds.remove(self)
595 self.worker.lock.release()
596
597
598class FakeWorker(gear.Worker):
599 def __init__(self, worker_id, test):
600 super(FakeWorker, self).__init__(worker_id)
601 self.gearman_jobs = {}
602 self.build_history = []
603 self.running_builds = []
604 self.build_counter = 0
605 self.fail_tests = {}
606 self.test = test
607
608 self.hold_jobs_in_build = False
609 self.lock = threading.Lock()
610 self.__work_thread = threading.Thread(target=self.work)
611 self.__work_thread.daemon = True
612 self.__work_thread.start()
613
614 def handleJob(self, job):
615 parts = job.name.split(":")
616 cmd = parts[0]
617 name = parts[1]
618 if len(parts) > 2:
619 node = parts[2]
620 else:
621 node = None
622 if cmd == 'build':
623 self.handleBuild(job, name, node)
624 elif cmd == 'stop':
625 self.handleStop(job, name)
626 elif cmd == 'set_description':
627 self.handleSetDescription(job, name)
628
629 def handleBuild(self, job, name, node):
630 build = FakeBuild(self, job, self.build_counter, node)
631 job.build = build
632 self.gearman_jobs[job.unique] = job
633 self.build_counter += 1
634
635 self.running_builds.append(build)
636 build.start()
637
638 def handleStop(self, job, name):
639 self.log.debug("handle stop")
640 parameters = json.loads(job.arguments)
641 name = parameters['name']
642 number = parameters['number']
643 for build in self.running_builds:
644 if build.name == name and build.number == number:
645 build.aborted = True
646 build.release()
647 job.sendWorkComplete()
648 return
649 job.sendWorkFail()
650
651 def handleSetDescription(self, job, name):
652 self.log.debug("handle set description")
653 parameters = json.loads(job.arguments)
654 name = parameters['name']
655 number = parameters['number']
656 descr = parameters['html_description']
657 for build in self.running_builds:
658 if build.name == name and build.number == number:
659 build.description = descr
660 job.sendWorkComplete()
661 return
662 for build in self.build_history:
663 if build.name == name and build.number == number:
664 build.description = descr
665 job.sendWorkComplete()
666 return
667 job.sendWorkFail()
668
669 def work(self):
670 while self.running:
671 try:
672 job = self.getJob()
673 except gear.InterruptedError:
674 continue
675 try:
676 self.handleJob(job)
677 except:
678 self.log.exception("Worker exception:")
679
680 def addFailTest(self, name, change):
681 l = self.fail_tests.get(name, [])
682 l.append(change)
683 self.fail_tests[name] = l
684
685 def shouldFailTest(self, name, ref):
686 l = self.fail_tests.get(name, [])
687 for change in l:
688 if self.test.ref_has_change(ref, change):
689 return True
690 return False
691
692 def release(self, regex=None):
693 builds = self.running_builds[:]
694 self.log.debug("releasing build %s (%s)" % (regex,
695 len(self.running_builds)))
696 for build in builds:
697 if not regex or re.match(regex, build.name):
698 self.log.debug("releasing build %s" %
699 (build.parameters['ZUUL_UUID']))
700 build.release()
701 else:
702 self.log.debug("not releasing build %s" %
703 (build.parameters['ZUUL_UUID']))
704 self.log.debug("done releasing builds %s (%s)" %
705 (regex, len(self.running_builds)))
706
707
708class FakeGearmanServer(gear.Server):
709 def __init__(self):
710 self.hold_jobs_in_queue = False
711 super(FakeGearmanServer, self).__init__(0)
712
713 def getJobForConnection(self, connection, peek=False):
714 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
715 for job in queue:
716 if not hasattr(job, 'waiting'):
717 if job.name.startswith('build:'):
718 job.waiting = self.hold_jobs_in_queue
719 else:
720 job.waiting = False
721 if job.waiting:
722 continue
723 if job.name in connection.functions:
724 if not peek:
725 queue.remove(job)
726 connection.related_jobs[job.handle] = job
727 job.worker_connection = connection
728 job.running = True
729 return job
730 return None
731
732 def release(self, regex=None):
733 released = False
734 qlen = (len(self.high_queue) + len(self.normal_queue) +
735 len(self.low_queue))
736 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
737 for job in self.getQueue():
738 cmd, name = job.name.split(':')
739 if cmd != 'build':
740 continue
741 if not regex or re.match(regex, name):
742 self.log.debug("releasing queued job %s" %
743 job.unique)
744 job.waiting = False
745 released = True
746 else:
747 self.log.debug("not releasing queued job %s" %
748 job.unique)
749 if released:
750 self.wakeConnections()
751 qlen = (len(self.high_queue) + len(self.normal_queue) +
752 len(self.low_queue))
753 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
754
755
756class FakeSMTP(object):
757 log = logging.getLogger('zuul.FakeSMTP')
758
759 def __init__(self, messages, server, port):
760 self.server = server
761 self.port = port
762 self.messages = messages
763
764 def sendmail(self, from_email, to_email, msg):
765 self.log.info("Sending email from %s, to %s, with msg %s" % (
766 from_email, to_email, msg))
767
768 headers = msg.split('\n\n', 1)[0]
769 body = msg.split('\n\n', 1)[1]
770
771 self.messages.append(dict(
772 from_email=from_email,
773 to_email=to_email,
774 msg=msg,
775 headers=headers,
776 body=body,
777 ))
778
779 return True
780
781 def quit(self):
782 return True
783
784
785class FakeSwiftClientConnection(swiftclient.client.Connection):
786 def post_account(self, headers):
787 # Do nothing
788 pass
789
790 def get_auth(self):
791 # Returns endpoint and (unused) auth token
792 endpoint = os.path.join('https://storage.example.org', 'V1',
793 'AUTH_account')
794 return endpoint, ''
795
796
797class ZuulTestCase(testtools.TestCase):
798 log = logging.getLogger("zuul.test")
799
800 def setUp(self):
801 super(ZuulTestCase, self).setUp()
802 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
803 try:
804 test_timeout = int(test_timeout)
805 except ValueError:
806 # If timeout value is invalid do not set a timeout.
807 test_timeout = 0
808 if test_timeout > 0:
809 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
810
811 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
812 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
813 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
814 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
815 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
816 os.environ.get('OS_STDERR_CAPTURE') == '1'):
817 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
818 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
819 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
820 os.environ.get('OS_LOG_CAPTURE') == '1'):
821 self.useFixture(fixtures.FakeLogger(
822 level=logging.DEBUG,
823 format='%(asctime)s %(name)-32s '
824 '%(levelname)-8s %(message)s'))
James E. Blair97d902e2014-08-21 13:25:56 -0700825 if USE_TEMPDIR:
826 tmp_root = self.useFixture(fixtures.TempDir(
827 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
828 else:
829 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700830 self.test_root = os.path.join(tmp_root, "zuul-test")
831 self.upstream_root = os.path.join(self.test_root, "upstream")
832 self.git_root = os.path.join(self.test_root, "git")
833
834 if os.path.exists(self.test_root):
835 shutil.rmtree(self.test_root)
836 os.makedirs(self.test_root)
837 os.makedirs(self.upstream_root)
838 os.makedirs(self.git_root)
839
840 # Make per test copy of Configuration.
841 self.setup_config()
842 self.config.set('zuul', 'layout_config',
843 os.path.join(FIXTURE_DIR, "layout.yaml"))
844 self.config.set('merger', 'git_dir', self.git_root)
845
846 # For each project in config:
847 self.init_repo("org/project")
848 self.init_repo("org/project1")
849 self.init_repo("org/project2")
850 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700851 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700852 self.init_repo("org/project5")
853 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700854 self.init_repo("org/one-job-project")
855 self.init_repo("org/nonvoting-project")
856 self.init_repo("org/templated-project")
857 self.init_repo("org/layered-project")
858 self.init_repo("org/node-project")
859 self.init_repo("org/conflict-project")
860 self.init_repo("org/noop-project")
861 self.init_repo("org/experimental-project")
862
863 self.statsd = FakeStatsd()
864 os.environ['STATSD_HOST'] = 'localhost'
865 os.environ['STATSD_PORT'] = str(self.statsd.port)
866 self.statsd.start()
867 # the statsd client object is configured in the statsd module import
868 reload(statsd)
869 reload(zuul.scheduler)
870
871 self.gearman_server = FakeGearmanServer()
872
873 self.config.set('gearman', 'port', str(self.gearman_server.port))
874
875 self.worker = FakeWorker('fake_worker', self)
876 self.worker.addServer('127.0.0.1', self.gearman_server.port)
877 self.gearman_server.worker = self.worker
878
879 self.merge_server = zuul.merger.server.MergeServer(self.config)
880 self.merge_server.start()
881
882 self.sched = zuul.scheduler.Scheduler()
883
884 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
885 FakeSwiftClientConnection))
886 self.swift = zuul.lib.swift.Swift(self.config)
887
888 def URLOpenerFactory(*args, **kw):
889 if isinstance(args[0], urllib2.Request):
890 return old_urlopen(*args, **kw)
891 args = [self.fake_gerrit] + list(args)
892 return FakeURLOpener(self.upstream_root, *args, **kw)
893
894 old_urlopen = urllib2.urlopen
895 urllib2.urlopen = URLOpenerFactory
896
897 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
898 self.swift)
899 self.merge_client = zuul.merger.client.MergeClient(
900 self.config, self.sched)
901
902 self.smtp_messages = []
903
904 def FakeSMTPFactory(*args, **kw):
905 args = [self.smtp_messages] + list(args)
906 return FakeSMTP(*args, **kw)
907
908 zuul.lib.gerrit.Gerrit = FakeGerrit
909 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
910
911 self.gerrit = FakeGerritTrigger(
912 self.upstream_root, self.config, self.sched)
913 self.gerrit.replication_timeout = 1.5
914 self.gerrit.replication_retry_interval = 0.5
915 self.fake_gerrit = self.gerrit.gerrit
916 self.fake_gerrit.upstream_root = self.upstream_root
917
918 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
919 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
920
921 self.sched.setLauncher(self.launcher)
922 self.sched.setMerger(self.merge_client)
923 self.sched.registerTrigger(self.gerrit)
924 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
925 self.sched.registerTrigger(self.timer)
James E. Blairc494d542014-08-06 09:23:52 -0700926 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config, self.sched)
927 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700928
929 self.sched.registerReporter(
930 zuul.reporter.gerrit.Reporter(self.gerrit))
931 self.smtp_reporter = zuul.reporter.smtp.Reporter(
932 self.config.get('smtp', 'default_from'),
933 self.config.get('smtp', 'default_to'),
934 self.config.get('smtp', 'server'))
935 self.sched.registerReporter(self.smtp_reporter)
936
937 self.sched.start()
938 self.sched.reconfigure(self.config)
939 self.sched.resume()
940 self.webapp.start()
941 self.rpc.start()
942 self.launcher.gearman.waitForServer()
943 self.registerJobs()
944 self.builds = self.worker.running_builds
945 self.history = self.worker.build_history
946
947 self.addCleanup(self.assertFinalState)
948 self.addCleanup(self.shutdown)
949
950 def setup_config(self):
951 """Per test config object. Override to set different config."""
952 self.config = ConfigParser.ConfigParser()
953 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
954
955 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700956 # Make sure that git.Repo objects have been garbage collected.
957 repos = []
958 gc.collect()
959 for obj in gc.get_objects():
960 if isinstance(obj, git.Repo):
961 repos.append(obj)
962 self.assertEqual(len(repos), 0)
963 self.assertEmptyQueues()
964
965 def shutdown(self):
966 self.log.debug("Shutting down after tests")
967 self.launcher.stop()
968 self.merge_server.stop()
969 self.merge_server.join()
970 self.merge_client.stop()
971 self.worker.shutdown()
972 self.gerrit.stop()
973 self.timer.stop()
974 self.sched.stop()
975 self.sched.join()
976 self.statsd.stop()
977 self.statsd.join()
978 self.webapp.stop()
979 self.webapp.join()
980 self.rpc.stop()
981 self.rpc.join()
982 self.gearman_server.shutdown()
983 threads = threading.enumerate()
984 if len(threads) > 1:
985 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -0700986
987 def init_repo(self, project):
988 parts = project.split('/')
989 path = os.path.join(self.upstream_root, *parts[:-1])
990 if not os.path.exists(path):
991 os.makedirs(path)
992 path = os.path.join(self.upstream_root, project)
993 repo = git.Repo.init(path)
994
995 repo.config_writer().set_value('user', 'email', 'user@example.com')
996 repo.config_writer().set_value('user', 'name', 'User Name')
997 repo.config_writer().write()
998
999 fn = os.path.join(path, 'README')
1000 f = open(fn, 'w')
1001 f.write("test\n")
1002 f.close()
1003 repo.index.add([fn])
1004 repo.index.commit('initial commit')
1005 master = repo.create_head('master')
1006 repo.create_tag('init')
1007
James E. Blair97d902e2014-08-21 13:25:56 -07001008 repo.head.reference = master
1009 repo.head.reset(index=True, working_tree=True)
1010 repo.git.clean('-x', '-f', '-d')
1011
1012 self.create_branch(project, 'mp')
1013
1014 def create_branch(self, project, branch):
1015 path = os.path.join(self.upstream_root, project)
1016 repo = git.Repo.init(path)
1017 fn = os.path.join(path, 'README')
1018
1019 branch_head = repo.create_head(branch)
1020 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001021 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001022 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001023 f.close()
1024 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001025 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001026
James E. Blair97d902e2014-08-21 13:25:56 -07001027 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001028 repo.head.reset(index=True, working_tree=True)
1029 repo.git.clean('-x', '-f', '-d')
1030
1031 def ref_has_change(self, ref, change):
1032 path = os.path.join(self.git_root, change.project)
1033 repo = git.Repo(path)
1034 for commit in repo.iter_commits(ref):
1035 if commit.message.strip() == ('%s-1' % change.subject):
1036 return True
1037 return False
1038
1039 def job_has_changes(self, *args):
1040 job = args[0]
1041 commits = args[1:]
1042 if isinstance(job, FakeBuild):
1043 parameters = job.parameters
1044 else:
1045 parameters = json.loads(job.arguments)
1046 project = parameters['ZUUL_PROJECT']
1047 path = os.path.join(self.git_root, project)
1048 repo = git.Repo(path)
1049 ref = parameters['ZUUL_REF']
1050 sha = parameters['ZUUL_COMMIT']
1051 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1052 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1053 commit_messages = ['%s-1' % commit.subject for commit in commits]
1054 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1055 " repo_messages %s; sha %s" % (job, commit_messages,
1056 repo_messages, sha))
1057 for msg in commit_messages:
1058 if msg not in repo_messages:
1059 self.log.debug(" messages do not match")
1060 return False
1061 if repo_shas[0] != sha:
1062 self.log.debug(" sha does not match")
1063 return False
1064 self.log.debug(" OK")
1065 return True
1066
1067 def registerJobs(self):
1068 count = 0
1069 for job in self.sched.layout.jobs.keys():
1070 self.worker.registerFunction('build:' + job)
1071 count += 1
1072 self.worker.registerFunction('stop:' + self.worker.worker_id)
1073 count += 1
1074
1075 while len(self.gearman_server.functions) < count:
1076 time.sleep(0)
1077
1078 def release(self, job):
1079 if isinstance(job, FakeBuild):
1080 job.release()
1081 else:
1082 job.waiting = False
1083 self.log.debug("Queued job %s released" % job.unique)
1084 self.gearman_server.wakeConnections()
1085
1086 def getParameter(self, job, name):
1087 if isinstance(job, FakeBuild):
1088 return job.parameters[name]
1089 else:
1090 parameters = json.loads(job.arguments)
1091 return parameters[name]
1092
1093 def resetGearmanServer(self):
1094 self.worker.setFunctions([])
1095 while True:
1096 done = True
1097 for connection in self.gearman_server.active_connections:
1098 if (connection.functions and
1099 connection.client_id not in ['Zuul RPC Listener',
1100 'Zuul Merger']):
1101 done = False
1102 if done:
1103 break
1104 time.sleep(0)
1105 self.gearman_server.functions = set()
1106 self.rpc.register()
1107 self.merge_server.register()
1108
1109 def haveAllBuildsReported(self):
1110 # See if Zuul is waiting on a meta job to complete
1111 if self.launcher.meta_jobs:
1112 return False
1113 # Find out if every build that the worker has completed has been
1114 # reported back to Zuul. If it hasn't then that means a Gearman
1115 # event is still in transit and the system is not stable.
1116 for build in self.worker.build_history:
1117 zbuild = self.launcher.builds.get(build.uuid)
1118 if not zbuild:
1119 # It has already been reported
1120 continue
1121 # It hasn't been reported yet.
1122 return False
1123 # Make sure that none of the worker connections are in GRAB_WAIT
1124 for connection in self.worker.active_connections:
1125 if connection.state == 'GRAB_WAIT':
1126 return False
1127 return True
1128
1129 def areAllBuildsWaiting(self):
1130 ret = True
1131
1132 builds = self.launcher.builds.values()
1133 for build in builds:
1134 client_job = None
1135 for conn in self.launcher.gearman.active_connections:
1136 for j in conn.related_jobs.values():
1137 if j.unique == build.uuid:
1138 client_job = j
1139 break
1140 if not client_job:
1141 self.log.debug("%s is not known to the gearman client" %
1142 build)
1143 ret = False
1144 continue
1145 if not client_job.handle:
1146 self.log.debug("%s has no handle" % client_job)
1147 ret = False
1148 continue
1149 server_job = self.gearman_server.jobs.get(client_job.handle)
1150 if not server_job:
1151 self.log.debug("%s is not known to the gearman server" %
1152 client_job)
1153 ret = False
1154 continue
1155 if not hasattr(server_job, 'waiting'):
1156 self.log.debug("%s is being enqueued" % server_job)
1157 ret = False
1158 continue
1159 if server_job.waiting:
1160 continue
1161 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1162 if worker_job:
1163 if worker_job.build.isWaiting():
1164 continue
1165 else:
1166 self.log.debug("%s is running" % worker_job)
1167 ret = False
1168 else:
1169 self.log.debug("%s is unassigned" % server_job)
1170 ret = False
1171 return ret
1172
1173 def waitUntilSettled(self):
1174 self.log.debug("Waiting until settled...")
1175 start = time.time()
1176 while True:
1177 if time.time() - start > 10:
1178 print 'queue status:',
1179 print self.sched.trigger_event_queue.empty(),
1180 print self.sched.result_event_queue.empty(),
1181 print self.fake_gerrit.event_queue.empty(),
1182 print self.areAllBuildsWaiting()
1183 raise Exception("Timeout waiting for Zuul to settle")
1184 # Make sure no new events show up while we're checking
1185 self.worker.lock.acquire()
1186 # have all build states propogated to zuul?
1187 if self.haveAllBuildsReported():
1188 # Join ensures that the queue is empty _and_ events have been
1189 # processed
1190 self.fake_gerrit.event_queue.join()
1191 self.sched.trigger_event_queue.join()
1192 self.sched.result_event_queue.join()
1193 self.sched.run_handler_lock.acquire()
1194 if (self.sched.trigger_event_queue.empty() and
1195 self.sched.result_event_queue.empty() and
1196 self.fake_gerrit.event_queue.empty() and
1197 not self.merge_client.build_sets and
1198 self.haveAllBuildsReported() and
1199 self.areAllBuildsWaiting()):
1200 self.sched.run_handler_lock.release()
1201 self.worker.lock.release()
1202 self.log.debug("...settled.")
1203 return
1204 self.sched.run_handler_lock.release()
1205 self.worker.lock.release()
1206 self.sched.wake_event.wait(0.1)
1207
1208 def countJobResults(self, jobs, result):
1209 jobs = filter(lambda x: x.result == result, jobs)
1210 return len(jobs)
1211
1212 def getJobFromHistory(self, name):
1213 history = self.worker.build_history
1214 for job in history:
1215 if job.name == name:
1216 return job
1217 raise Exception("Unable to find job %s in history" % name)
1218
1219 def assertEmptyQueues(self):
1220 # Make sure there are no orphaned jobs
1221 for pipeline in self.sched.layout.pipelines.values():
1222 for queue in pipeline.queues:
1223 if len(queue.queue) != 0:
1224 print 'pipeline %s queue %s contents %s' % (
1225 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001226 self.assertEqual(len(queue.queue), 0,
1227 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001228
1229 def assertReportedStat(self, key, value=None, kind=None):
1230 start = time.time()
1231 while time.time() < (start + 5):
1232 for stat in self.statsd.stats:
1233 pprint.pprint(self.statsd.stats)
1234 k, v = stat.split(':')
1235 if key == k:
1236 if value is None and kind is None:
1237 return
1238 elif value:
1239 if value == v:
1240 return
1241 elif kind:
1242 if v.endswith('|' + kind):
1243 return
1244 time.sleep(0.1)
1245
1246 pprint.pprint(self.statsd.stats)
1247 raise Exception("Key %s not found in reported stats" % key)