blob: 4ba88fb23fc324c2465adb1a22d25f31f6c01cac [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
43
44import zuul.scheduler
45import zuul.webapp
46import zuul.rpclistener
47import zuul.launcher.gearman
48import zuul.lib.swift
49import zuul.merger.server
50import zuul.merger.client
51import zuul.reporter.gerrit
52import zuul.reporter.smtp
53import zuul.trigger.gerrit
54import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070055import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070056
57FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
58 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070059USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070060
61logging.basicConfig(level=logging.DEBUG,
62 format='%(asctime)s %(name)-32s '
63 '%(levelname)-8s %(message)s')
64
65
66def repack_repo(path):
67 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
68 output = subprocess.Popen(cmd, close_fds=True,
69 stdout=subprocess.PIPE,
70 stderr=subprocess.PIPE)
71 out = output.communicate()
72 if output.returncode:
73 raise Exception("git repack returned %d" % output.returncode)
74 return out
75
76
77def random_sha1():
78 return hashlib.sha1(str(random.random())).hexdigest()
79
80
81class ChangeReference(git.Reference):
82 _common_path_default = "refs/changes"
83 _points_to_commits_only = True
84
85
86class FakeChange(object):
87 categories = {'APRV': ('Approved', -1, 1),
88 'CRVW': ('Code-Review', -2, 2),
89 'VRFY': ('Verified', -2, 2)}
90
91 def __init__(self, gerrit, number, project, branch, subject,
92 status='NEW', upstream_root=None):
93 self.gerrit = gerrit
94 self.reported = 0
95 self.queried = 0
96 self.patchsets = []
97 self.number = number
98 self.project = project
99 self.branch = branch
100 self.subject = subject
101 self.latest_patchset = 0
102 self.depends_on_change = None
103 self.needed_by_changes = []
104 self.fail_merge = False
105 self.messages = []
106 self.data = {
107 'branch': branch,
108 'comments': [],
109 'commitMessage': subject,
110 'createdOn': time.time(),
111 'id': 'I' + random_sha1(),
112 'lastUpdated': time.time(),
113 'number': str(number),
114 'open': status == 'NEW',
115 'owner': {'email': 'user@example.com',
116 'name': 'User Name',
117 'username': 'username'},
118 'patchSets': self.patchsets,
119 'project': project,
120 'status': status,
121 'subject': subject,
122 'submitRecords': [],
123 'url': 'https://hostname/%s' % number}
124
125 self.upstream_root = upstream_root
126 self.addPatchset()
127 self.data['submitRecords'] = self.getSubmitRecords()
128 self.open = status == 'NEW'
129
130 def add_fake_change_to_repo(self, msg, fn, large):
131 path = os.path.join(self.upstream_root, self.project)
132 repo = git.Repo(path)
133 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
134 self.latest_patchset),
135 'refs/tags/init')
136 repo.head.reference = ref
137 repo.head.reset(index=True, working_tree=True)
138 repo.git.clean('-x', '-f', '-d')
139
140 path = os.path.join(self.upstream_root, self.project)
141 if not large:
142 fn = os.path.join(path, fn)
143 f = open(fn, 'w')
144 f.write("test %s %s %s\n" %
145 (self.branch, self.number, self.latest_patchset))
146 f.close()
147 repo.index.add([fn])
148 else:
149 for fni in range(100):
150 fn = os.path.join(path, str(fni))
151 f = open(fn, 'w')
152 for ci in range(4096):
153 f.write(random.choice(string.printable))
154 f.close()
155 repo.index.add([fn])
156
157 r = repo.index.commit(msg)
158 repo.head.reference = 'master'
159 repo.head.reset(index=True, working_tree=True)
160 repo.git.clean('-x', '-f', '-d')
161 repo.heads['master'].checkout()
162 return r
163
164 def addPatchset(self, files=[], large=False):
165 self.latest_patchset += 1
166 if files:
167 fn = files[0]
168 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700169 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700170 msg = self.subject + '-' + str(self.latest_patchset)
171 c = self.add_fake_change_to_repo(msg, fn, large)
172 ps_files = [{'file': '/COMMIT_MSG',
173 'type': 'ADDED'},
174 {'file': 'README',
175 'type': 'MODIFIED'}]
176 for f in files:
177 ps_files.append({'file': f, 'type': 'ADDED'})
178 d = {'approvals': [],
179 'createdOn': time.time(),
180 'files': ps_files,
181 'number': str(self.latest_patchset),
182 'ref': 'refs/changes/1/%s/%s' % (self.number,
183 self.latest_patchset),
184 'revision': c.hexsha,
185 'uploader': {'email': 'user@example.com',
186 'name': 'User name',
187 'username': 'user'}}
188 self.data['currentPatchSet'] = d
189 self.patchsets.append(d)
190 self.data['submitRecords'] = self.getSubmitRecords()
191
192 def getPatchsetCreatedEvent(self, patchset):
193 event = {"type": "patchset-created",
194 "change": {"project": self.project,
195 "branch": self.branch,
196 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
197 "number": str(self.number),
198 "subject": self.subject,
199 "owner": {"name": "User Name"},
200 "url": "https://hostname/3"},
201 "patchSet": self.patchsets[patchset - 1],
202 "uploader": {"name": "User Name"}}
203 return event
204
205 def getChangeRestoredEvent(self):
206 event = {"type": "change-restored",
207 "change": {"project": self.project,
208 "branch": self.branch,
209 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
210 "number": str(self.number),
211 "subject": self.subject,
212 "owner": {"name": "User Name"},
213 "url": "https://hostname/3"},
214 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100215 "patchSet": self.patchsets[-1],
216 "reason": ""}
217 return event
218
219 def getChangeAbandonedEvent(self):
220 event = {"type": "change-abandoned",
221 "change": {"project": self.project,
222 "branch": self.branch,
223 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
224 "number": str(self.number),
225 "subject": self.subject,
226 "owner": {"name": "User Name"},
227 "url": "https://hostname/3"},
228 "abandoner": {"name": "User Name"},
229 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700230 "reason": ""}
231 return event
232
233 def getChangeCommentEvent(self, patchset):
234 event = {"type": "comment-added",
235 "change": {"project": self.project,
236 "branch": self.branch,
237 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
238 "number": str(self.number),
239 "subject": self.subject,
240 "owner": {"name": "User Name"},
241 "url": "https://hostname/3"},
242 "patchSet": self.patchsets[patchset - 1],
243 "author": {"name": "User Name"},
244 "approvals": [{"type": "Code-Review",
245 "description": "Code-Review",
246 "value": "0"}],
247 "comment": "This is a comment"}
248 return event
249
250 def addApproval(self, category, value, username='jenkins',
251 granted_on=None):
252 if not granted_on:
253 granted_on = time.time()
254 approval = {'description': self.categories[category][0],
255 'type': category,
256 'value': str(value),
257 'by': {
258 'username': username,
259 'email': username + '@example.com',
260 },
261 'grantedOn': int(granted_on)}
262 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
263 if x['by']['username'] == username and x['type'] == category:
264 del self.patchsets[-1]['approvals'][i]
265 self.patchsets[-1]['approvals'].append(approval)
266 event = {'approvals': [approval],
267 'author': {'email': 'user@example.com',
268 'name': 'User Name',
269 'username': 'username'},
270 'change': {'branch': self.branch,
271 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
272 'number': str(self.number),
273 'owner': {'email': 'user@example.com',
274 'name': 'User Name',
275 'username': 'username'},
276 'project': self.project,
277 'subject': self.subject,
278 'topic': 'master',
279 'url': 'https://hostname/459'},
280 'comment': '',
281 'patchSet': self.patchsets[-1],
282 'type': 'comment-added'}
283 self.data['submitRecords'] = self.getSubmitRecords()
284 return json.loads(json.dumps(event))
285
286 def getSubmitRecords(self):
287 status = {}
288 for cat in self.categories.keys():
289 status[cat] = 0
290
291 for a in self.patchsets[-1]['approvals']:
292 cur = status[a['type']]
293 cat_min, cat_max = self.categories[a['type']][1:]
294 new = int(a['value'])
295 if new == cat_min:
296 cur = new
297 elif abs(new) > abs(cur):
298 cur = new
299 status[a['type']] = cur
300
301 labels = []
302 ok = True
303 for typ, cat in self.categories.items():
304 cur = status[typ]
305 cat_min, cat_max = cat[1:]
306 if cur == cat_min:
307 value = 'REJECT'
308 ok = False
309 elif cur == cat_max:
310 value = 'OK'
311 else:
312 value = 'NEED'
313 ok = False
314 labels.append({'label': cat[0], 'status': value})
315 if ok:
316 return [{'status': 'OK'}]
317 return [{'status': 'NOT_READY',
318 'labels': labels}]
319
320 def setDependsOn(self, other, patchset):
321 self.depends_on_change = other
322 d = {'id': other.data['id'],
323 'number': other.data['number'],
324 'ref': other.patchsets[patchset - 1]['ref']
325 }
326 self.data['dependsOn'] = [d]
327
328 other.needed_by_changes.append(self)
329 needed = other.data.get('neededBy', [])
330 d = {'id': self.data['id'],
331 'number': self.data['number'],
332 'ref': self.patchsets[patchset - 1]['ref'],
333 'revision': self.patchsets[patchset - 1]['revision']
334 }
335 needed.append(d)
336 other.data['neededBy'] = needed
337
338 def query(self):
339 self.queried += 1
340 d = self.data.get('dependsOn')
341 if d:
342 d = d[0]
343 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
344 d['isCurrentPatchSet'] = True
345 else:
346 d['isCurrentPatchSet'] = False
347 return json.loads(json.dumps(self.data))
348
349 def setMerged(self):
350 if (self.depends_on_change and
351 self.depends_on_change.data['status'] != 'MERGED'):
352 return
353 if self.fail_merge:
354 return
355 self.data['status'] = 'MERGED'
356 self.open = False
357
358 path = os.path.join(self.upstream_root, self.project)
359 repo = git.Repo(path)
360 repo.heads[self.branch].commit = \
361 repo.commit(self.patchsets[-1]['revision'])
362
363 def setReported(self):
364 self.reported += 1
365
366
367class FakeGerrit(object):
368 def __init__(self, *args, **kw):
369 self.event_queue = Queue.Queue()
370 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
371 self.change_number = 0
372 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700373 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700374
375 def addFakeChange(self, project, branch, subject, status='NEW'):
376 self.change_number += 1
377 c = FakeChange(self, self.change_number, project, branch, subject,
378 upstream_root=self.upstream_root,
379 status=status)
380 self.changes[self.change_number] = c
381 return c
382
383 def addEvent(self, data):
384 return self.event_queue.put(data)
385
386 def getEvent(self):
387 return self.event_queue.get()
388
389 def eventDone(self):
390 self.event_queue.task_done()
391
392 def review(self, project, changeid, message, action):
393 number, ps = changeid.split(',')
394 change = self.changes[int(number)]
395 change.messages.append(message)
396 if 'submit' in action:
397 change.setMerged()
398 if message:
399 change.setReported()
400
401 def query(self, number):
402 change = self.changes.get(int(number))
403 if change:
404 return change.query()
405 return {}
406
James E. Blairc494d542014-08-06 09:23:52 -0700407 def simpleQuery(self, query):
408 # This is currently only used to return all open changes for a
409 # project
James E. Blairf8ff9932014-08-15 15:24:24 -0700410 self.queries.append(query)
411 l = [change.query() for change in self.changes.values()]
412 l.append({"type":"stats","rowCount":1,"runTimeMilliseconds":3})
413 return l
James E. Blairc494d542014-08-06 09:23:52 -0700414
Clark Boylanb640e052014-04-03 16:41:46 -0700415 def startWatching(self, *args, **kw):
416 pass
417
418
419class BuildHistory(object):
420 def __init__(self, **kw):
421 self.__dict__.update(kw)
422
423 def __repr__(self):
424 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
425 (self.result, self.name, self.number, self.changes))
426
427
428class FakeURLOpener(object):
429 def __init__(self, upstream_root, fake_gerrit, url):
430 self.upstream_root = upstream_root
431 self.fake_gerrit = fake_gerrit
432 self.url = url
433
434 def read(self):
435 res = urlparse.urlparse(self.url)
436 path = res.path
437 project = '/'.join(path.split('/')[2:-2])
438 ret = '001e# service=git-upload-pack\n'
439 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
440 'multi_ack thin-pack side-band side-band-64k ofs-delta '
441 'shallow no-progress include-tag multi_ack_detailed no-done\n')
442 path = os.path.join(self.upstream_root, project)
443 repo = git.Repo(path)
444 for ref in repo.refs:
445 r = ref.object.hexsha + ' ' + ref.path + '\n'
446 ret += '%04x%s' % (len(r) + 4, r)
447 ret += '0000'
448 return ret
449
450
451class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
452 name = 'gerrit'
453
454 def __init__(self, upstream_root, *args):
455 super(FakeGerritTrigger, self).__init__(*args)
456 self.upstream_root = upstream_root
457
458 def getGitUrl(self, project):
459 return os.path.join(self.upstream_root, project.name)
460
461
462class FakeStatsd(threading.Thread):
463 def __init__(self):
464 threading.Thread.__init__(self)
465 self.daemon = True
466 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
467 self.sock.bind(('', 0))
468 self.port = self.sock.getsockname()[1]
469 self.wake_read, self.wake_write = os.pipe()
470 self.stats = []
471
472 def run(self):
473 while True:
474 poll = select.poll()
475 poll.register(self.sock, select.POLLIN)
476 poll.register(self.wake_read, select.POLLIN)
477 ret = poll.poll()
478 for (fd, event) in ret:
479 if fd == self.sock.fileno():
480 data = self.sock.recvfrom(1024)
481 if not data:
482 return
483 self.stats.append(data[0])
484 if fd == self.wake_read:
485 return
486
487 def stop(self):
488 os.write(self.wake_write, '1\n')
489
490
491class FakeBuild(threading.Thread):
492 log = logging.getLogger("zuul.test")
493
494 def __init__(self, worker, job, number, node):
495 threading.Thread.__init__(self)
496 self.daemon = True
497 self.worker = worker
498 self.job = job
499 self.name = job.name.split(':')[1]
500 self.number = number
501 self.node = node
502 self.parameters = json.loads(job.arguments)
503 self.unique = self.parameters['ZUUL_UUID']
504 self.wait_condition = threading.Condition()
505 self.waiting = False
506 self.aborted = False
507 self.created = time.time()
508 self.description = ''
509 self.run_error = False
510
511 def release(self):
512 self.wait_condition.acquire()
513 self.wait_condition.notify()
514 self.waiting = False
515 self.log.debug("Build %s released" % self.unique)
516 self.wait_condition.release()
517
518 def isWaiting(self):
519 self.wait_condition.acquire()
520 if self.waiting:
521 ret = True
522 else:
523 ret = False
524 self.wait_condition.release()
525 return ret
526
527 def _wait(self):
528 self.wait_condition.acquire()
529 self.waiting = True
530 self.log.debug("Build %s waiting" % self.unique)
531 self.wait_condition.wait()
532 self.wait_condition.release()
533
534 def run(self):
535 data = {
536 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
537 'name': self.name,
538 'number': self.number,
539 'manager': self.worker.worker_id,
540 'worker_name': 'My Worker',
541 'worker_hostname': 'localhost',
542 'worker_ips': ['127.0.0.1', '192.168.1.1'],
543 'worker_fqdn': 'zuul.example.org',
544 'worker_program': 'FakeBuilder',
545 'worker_version': 'v1.1',
546 'worker_extra': {'something': 'else'}
547 }
548
549 self.log.debug('Running build %s' % self.unique)
550
551 self.job.sendWorkData(json.dumps(data))
552 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
553 self.job.sendWorkStatus(0, 100)
554
555 if self.worker.hold_jobs_in_build:
556 self.log.debug('Holding build %s' % self.unique)
557 self._wait()
558 self.log.debug("Build %s continuing" % self.unique)
559
560 self.worker.lock.acquire()
561
562 result = 'SUCCESS'
563 if (('ZUUL_REF' in self.parameters) and
564 self.worker.shouldFailTest(self.name,
565 self.parameters['ZUUL_REF'])):
566 result = 'FAILURE'
567 if self.aborted:
568 result = 'ABORTED'
569
570 if self.run_error:
571 work_fail = True
572 result = 'RUN_ERROR'
573 else:
574 data['result'] = result
575 work_fail = False
576
577 changes = None
578 if 'ZUUL_CHANGE_IDS' in self.parameters:
579 changes = self.parameters['ZUUL_CHANGE_IDS']
580
581 self.worker.build_history.append(
582 BuildHistory(name=self.name, number=self.number,
583 result=result, changes=changes, node=self.node,
584 uuid=self.unique, description=self.description,
585 pipeline=self.parameters['ZUUL_PIPELINE'])
586 )
587
588 self.job.sendWorkData(json.dumps(data))
589 if work_fail:
590 self.job.sendWorkFail()
591 else:
592 self.job.sendWorkComplete(json.dumps(data))
593 del self.worker.gearman_jobs[self.job.unique]
594 self.worker.running_builds.remove(self)
595 self.worker.lock.release()
596
597
598class FakeWorker(gear.Worker):
599 def __init__(self, worker_id, test):
600 super(FakeWorker, self).__init__(worker_id)
601 self.gearman_jobs = {}
602 self.build_history = []
603 self.running_builds = []
604 self.build_counter = 0
605 self.fail_tests = {}
606 self.test = test
607
608 self.hold_jobs_in_build = False
609 self.lock = threading.Lock()
610 self.__work_thread = threading.Thread(target=self.work)
611 self.__work_thread.daemon = True
612 self.__work_thread.start()
613
614 def handleJob(self, job):
615 parts = job.name.split(":")
616 cmd = parts[0]
617 name = parts[1]
618 if len(parts) > 2:
619 node = parts[2]
620 else:
621 node = None
622 if cmd == 'build':
623 self.handleBuild(job, name, node)
624 elif cmd == 'stop':
625 self.handleStop(job, name)
626 elif cmd == 'set_description':
627 self.handleSetDescription(job, name)
628
629 def handleBuild(self, job, name, node):
630 build = FakeBuild(self, job, self.build_counter, node)
631 job.build = build
632 self.gearman_jobs[job.unique] = job
633 self.build_counter += 1
634
635 self.running_builds.append(build)
636 build.start()
637
638 def handleStop(self, job, name):
639 self.log.debug("handle stop")
640 parameters = json.loads(job.arguments)
641 name = parameters['name']
642 number = parameters['number']
643 for build in self.running_builds:
644 if build.name == name and build.number == number:
645 build.aborted = True
646 build.release()
647 job.sendWorkComplete()
648 return
649 job.sendWorkFail()
650
651 def handleSetDescription(self, job, name):
652 self.log.debug("handle set description")
653 parameters = json.loads(job.arguments)
654 name = parameters['name']
655 number = parameters['number']
656 descr = parameters['html_description']
657 for build in self.running_builds:
658 if build.name == name and build.number == number:
659 build.description = descr
660 job.sendWorkComplete()
661 return
662 for build in self.build_history:
663 if build.name == name and build.number == number:
664 build.description = descr
665 job.sendWorkComplete()
666 return
667 job.sendWorkFail()
668
669 def work(self):
670 while self.running:
671 try:
672 job = self.getJob()
673 except gear.InterruptedError:
674 continue
675 try:
676 self.handleJob(job)
677 except:
678 self.log.exception("Worker exception:")
679
680 def addFailTest(self, name, change):
681 l = self.fail_tests.get(name, [])
682 l.append(change)
683 self.fail_tests[name] = l
684
685 def shouldFailTest(self, name, ref):
686 l = self.fail_tests.get(name, [])
687 for change in l:
688 if self.test.ref_has_change(ref, change):
689 return True
690 return False
691
692 def release(self, regex=None):
693 builds = self.running_builds[:]
694 self.log.debug("releasing build %s (%s)" % (regex,
695 len(self.running_builds)))
696 for build in builds:
697 if not regex or re.match(regex, build.name):
698 self.log.debug("releasing build %s" %
699 (build.parameters['ZUUL_UUID']))
700 build.release()
701 else:
702 self.log.debug("not releasing build %s" %
703 (build.parameters['ZUUL_UUID']))
704 self.log.debug("done releasing builds %s (%s)" %
705 (regex, len(self.running_builds)))
706
707
708class FakeGearmanServer(gear.Server):
709 def __init__(self):
710 self.hold_jobs_in_queue = False
711 super(FakeGearmanServer, self).__init__(0)
712
713 def getJobForConnection(self, connection, peek=False):
714 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
715 for job in queue:
716 if not hasattr(job, 'waiting'):
717 if job.name.startswith('build:'):
718 job.waiting = self.hold_jobs_in_queue
719 else:
720 job.waiting = False
721 if job.waiting:
722 continue
723 if job.name in connection.functions:
724 if not peek:
725 queue.remove(job)
726 connection.related_jobs[job.handle] = job
727 job.worker_connection = connection
728 job.running = True
729 return job
730 return None
731
732 def release(self, regex=None):
733 released = False
734 qlen = (len(self.high_queue) + len(self.normal_queue) +
735 len(self.low_queue))
736 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
737 for job in self.getQueue():
738 cmd, name = job.name.split(':')
739 if cmd != 'build':
740 continue
741 if not regex or re.match(regex, name):
742 self.log.debug("releasing queued job %s" %
743 job.unique)
744 job.waiting = False
745 released = True
746 else:
747 self.log.debug("not releasing queued job %s" %
748 job.unique)
749 if released:
750 self.wakeConnections()
751 qlen = (len(self.high_queue) + len(self.normal_queue) +
752 len(self.low_queue))
753 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
754
755
756class FakeSMTP(object):
757 log = logging.getLogger('zuul.FakeSMTP')
758
759 def __init__(self, messages, server, port):
760 self.server = server
761 self.port = port
762 self.messages = messages
763
764 def sendmail(self, from_email, to_email, msg):
765 self.log.info("Sending email from %s, to %s, with msg %s" % (
766 from_email, to_email, msg))
767
768 headers = msg.split('\n\n', 1)[0]
769 body = msg.split('\n\n', 1)[1]
770
771 self.messages.append(dict(
772 from_email=from_email,
773 to_email=to_email,
774 msg=msg,
775 headers=headers,
776 body=body,
777 ))
778
779 return True
780
781 def quit(self):
782 return True
783
784
785class FakeSwiftClientConnection(swiftclient.client.Connection):
786 def post_account(self, headers):
787 # Do nothing
788 pass
789
790 def get_auth(self):
791 # Returns endpoint and (unused) auth token
792 endpoint = os.path.join('https://storage.example.org', 'V1',
793 'AUTH_account')
794 return endpoint, ''
795
796
797class ZuulTestCase(testtools.TestCase):
798 log = logging.getLogger("zuul.test")
799
800 def setUp(self):
801 super(ZuulTestCase, self).setUp()
802 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
803 try:
804 test_timeout = int(test_timeout)
805 except ValueError:
806 # If timeout value is invalid do not set a timeout.
807 test_timeout = 0
808 if test_timeout > 0:
809 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
810
811 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
812 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
813 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
814 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
815 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
816 os.environ.get('OS_STDERR_CAPTURE') == '1'):
817 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
818 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
819 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
820 os.environ.get('OS_LOG_CAPTURE') == '1'):
821 self.useFixture(fixtures.FakeLogger(
822 level=logging.DEBUG,
823 format='%(asctime)s %(name)-32s '
824 '%(levelname)-8s %(message)s'))
James E. Blair97d902e2014-08-21 13:25:56 -0700825 if USE_TEMPDIR:
826 tmp_root = self.useFixture(fixtures.TempDir(
827 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
828 else:
829 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700830 self.test_root = os.path.join(tmp_root, "zuul-test")
831 self.upstream_root = os.path.join(self.test_root, "upstream")
832 self.git_root = os.path.join(self.test_root, "git")
833
834 if os.path.exists(self.test_root):
835 shutil.rmtree(self.test_root)
836 os.makedirs(self.test_root)
837 os.makedirs(self.upstream_root)
838 os.makedirs(self.git_root)
839
840 # Make per test copy of Configuration.
841 self.setup_config()
842 self.config.set('zuul', 'layout_config',
843 os.path.join(FIXTURE_DIR, "layout.yaml"))
844 self.config.set('merger', 'git_dir', self.git_root)
845
846 # For each project in config:
847 self.init_repo("org/project")
848 self.init_repo("org/project1")
849 self.init_repo("org/project2")
850 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700851 self.init_repo("org/project4")
Clark Boylanb640e052014-04-03 16:41:46 -0700852 self.init_repo("org/one-job-project")
853 self.init_repo("org/nonvoting-project")
854 self.init_repo("org/templated-project")
855 self.init_repo("org/layered-project")
856 self.init_repo("org/node-project")
857 self.init_repo("org/conflict-project")
858 self.init_repo("org/noop-project")
859 self.init_repo("org/experimental-project")
860
861 self.statsd = FakeStatsd()
862 os.environ['STATSD_HOST'] = 'localhost'
863 os.environ['STATSD_PORT'] = str(self.statsd.port)
864 self.statsd.start()
865 # the statsd client object is configured in the statsd module import
866 reload(statsd)
867 reload(zuul.scheduler)
868
869 self.gearman_server = FakeGearmanServer()
870
871 self.config.set('gearman', 'port', str(self.gearman_server.port))
872
873 self.worker = FakeWorker('fake_worker', self)
874 self.worker.addServer('127.0.0.1', self.gearman_server.port)
875 self.gearman_server.worker = self.worker
876
877 self.merge_server = zuul.merger.server.MergeServer(self.config)
878 self.merge_server.start()
879
880 self.sched = zuul.scheduler.Scheduler()
881
882 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
883 FakeSwiftClientConnection))
884 self.swift = zuul.lib.swift.Swift(self.config)
885
886 def URLOpenerFactory(*args, **kw):
887 if isinstance(args[0], urllib2.Request):
888 return old_urlopen(*args, **kw)
889 args = [self.fake_gerrit] + list(args)
890 return FakeURLOpener(self.upstream_root, *args, **kw)
891
892 old_urlopen = urllib2.urlopen
893 urllib2.urlopen = URLOpenerFactory
894
895 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
896 self.swift)
897 self.merge_client = zuul.merger.client.MergeClient(
898 self.config, self.sched)
899
900 self.smtp_messages = []
901
902 def FakeSMTPFactory(*args, **kw):
903 args = [self.smtp_messages] + list(args)
904 return FakeSMTP(*args, **kw)
905
906 zuul.lib.gerrit.Gerrit = FakeGerrit
907 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
908
909 self.gerrit = FakeGerritTrigger(
910 self.upstream_root, self.config, self.sched)
911 self.gerrit.replication_timeout = 1.5
912 self.gerrit.replication_retry_interval = 0.5
913 self.fake_gerrit = self.gerrit.gerrit
914 self.fake_gerrit.upstream_root = self.upstream_root
915
916 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
917 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
918
919 self.sched.setLauncher(self.launcher)
920 self.sched.setMerger(self.merge_client)
921 self.sched.registerTrigger(self.gerrit)
922 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
923 self.sched.registerTrigger(self.timer)
James E. Blairc494d542014-08-06 09:23:52 -0700924 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config, self.sched)
925 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700926
927 self.sched.registerReporter(
928 zuul.reporter.gerrit.Reporter(self.gerrit))
929 self.smtp_reporter = zuul.reporter.smtp.Reporter(
930 self.config.get('smtp', 'default_from'),
931 self.config.get('smtp', 'default_to'),
932 self.config.get('smtp', 'server'))
933 self.sched.registerReporter(self.smtp_reporter)
934
935 self.sched.start()
936 self.sched.reconfigure(self.config)
937 self.sched.resume()
938 self.webapp.start()
939 self.rpc.start()
940 self.launcher.gearman.waitForServer()
941 self.registerJobs()
942 self.builds = self.worker.running_builds
943 self.history = self.worker.build_history
944
945 self.addCleanup(self.assertFinalState)
946 self.addCleanup(self.shutdown)
947
948 def setup_config(self):
949 """Per test config object. Override to set different config."""
950 self.config = ConfigParser.ConfigParser()
951 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
952
953 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700954 # Make sure that git.Repo objects have been garbage collected.
955 repos = []
956 gc.collect()
957 for obj in gc.get_objects():
958 if isinstance(obj, git.Repo):
959 repos.append(obj)
960 self.assertEqual(len(repos), 0)
961 self.assertEmptyQueues()
962
963 def shutdown(self):
964 self.log.debug("Shutting down after tests")
965 self.launcher.stop()
966 self.merge_server.stop()
967 self.merge_server.join()
968 self.merge_client.stop()
969 self.worker.shutdown()
970 self.gerrit.stop()
971 self.timer.stop()
972 self.sched.stop()
973 self.sched.join()
974 self.statsd.stop()
975 self.statsd.join()
976 self.webapp.stop()
977 self.webapp.join()
978 self.rpc.stop()
979 self.rpc.join()
980 self.gearman_server.shutdown()
981 threads = threading.enumerate()
982 if len(threads) > 1:
983 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -0700984
985 def init_repo(self, project):
986 parts = project.split('/')
987 path = os.path.join(self.upstream_root, *parts[:-1])
988 if not os.path.exists(path):
989 os.makedirs(path)
990 path = os.path.join(self.upstream_root, project)
991 repo = git.Repo.init(path)
992
993 repo.config_writer().set_value('user', 'email', 'user@example.com')
994 repo.config_writer().set_value('user', 'name', 'User Name')
995 repo.config_writer().write()
996
997 fn = os.path.join(path, 'README')
998 f = open(fn, 'w')
999 f.write("test\n")
1000 f.close()
1001 repo.index.add([fn])
1002 repo.index.commit('initial commit')
1003 master = repo.create_head('master')
1004 repo.create_tag('init')
1005
James E. Blair97d902e2014-08-21 13:25:56 -07001006 repo.head.reference = master
1007 repo.head.reset(index=True, working_tree=True)
1008 repo.git.clean('-x', '-f', '-d')
1009
1010 self.create_branch(project, 'mp')
1011
1012 def create_branch(self, project, branch):
1013 path = os.path.join(self.upstream_root, project)
1014 repo = git.Repo.init(path)
1015 fn = os.path.join(path, 'README')
1016
1017 branch_head = repo.create_head(branch)
1018 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001019 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001020 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001021 f.close()
1022 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001023 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001024
James E. Blair97d902e2014-08-21 13:25:56 -07001025 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001026 repo.head.reset(index=True, working_tree=True)
1027 repo.git.clean('-x', '-f', '-d')
1028
1029 def ref_has_change(self, ref, change):
1030 path = os.path.join(self.git_root, change.project)
1031 repo = git.Repo(path)
1032 for commit in repo.iter_commits(ref):
1033 if commit.message.strip() == ('%s-1' % change.subject):
1034 return True
1035 return False
1036
1037 def job_has_changes(self, *args):
1038 job = args[0]
1039 commits = args[1:]
1040 if isinstance(job, FakeBuild):
1041 parameters = job.parameters
1042 else:
1043 parameters = json.loads(job.arguments)
1044 project = parameters['ZUUL_PROJECT']
1045 path = os.path.join(self.git_root, project)
1046 repo = git.Repo(path)
1047 ref = parameters['ZUUL_REF']
1048 sha = parameters['ZUUL_COMMIT']
1049 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1050 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1051 commit_messages = ['%s-1' % commit.subject for commit in commits]
1052 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1053 " repo_messages %s; sha %s" % (job, commit_messages,
1054 repo_messages, sha))
1055 for msg in commit_messages:
1056 if msg not in repo_messages:
1057 self.log.debug(" messages do not match")
1058 return False
1059 if repo_shas[0] != sha:
1060 self.log.debug(" sha does not match")
1061 return False
1062 self.log.debug(" OK")
1063 return True
1064
1065 def registerJobs(self):
1066 count = 0
1067 for job in self.sched.layout.jobs.keys():
1068 self.worker.registerFunction('build:' + job)
1069 count += 1
1070 self.worker.registerFunction('stop:' + self.worker.worker_id)
1071 count += 1
1072
1073 while len(self.gearman_server.functions) < count:
1074 time.sleep(0)
1075
1076 def release(self, job):
1077 if isinstance(job, FakeBuild):
1078 job.release()
1079 else:
1080 job.waiting = False
1081 self.log.debug("Queued job %s released" % job.unique)
1082 self.gearman_server.wakeConnections()
1083
1084 def getParameter(self, job, name):
1085 if isinstance(job, FakeBuild):
1086 return job.parameters[name]
1087 else:
1088 parameters = json.loads(job.arguments)
1089 return parameters[name]
1090
1091 def resetGearmanServer(self):
1092 self.worker.setFunctions([])
1093 while True:
1094 done = True
1095 for connection in self.gearman_server.active_connections:
1096 if (connection.functions and
1097 connection.client_id not in ['Zuul RPC Listener',
1098 'Zuul Merger']):
1099 done = False
1100 if done:
1101 break
1102 time.sleep(0)
1103 self.gearman_server.functions = set()
1104 self.rpc.register()
1105 self.merge_server.register()
1106
1107 def haveAllBuildsReported(self):
1108 # See if Zuul is waiting on a meta job to complete
1109 if self.launcher.meta_jobs:
1110 return False
1111 # Find out if every build that the worker has completed has been
1112 # reported back to Zuul. If it hasn't then that means a Gearman
1113 # event is still in transit and the system is not stable.
1114 for build in self.worker.build_history:
1115 zbuild = self.launcher.builds.get(build.uuid)
1116 if not zbuild:
1117 # It has already been reported
1118 continue
1119 # It hasn't been reported yet.
1120 return False
1121 # Make sure that none of the worker connections are in GRAB_WAIT
1122 for connection in self.worker.active_connections:
1123 if connection.state == 'GRAB_WAIT':
1124 return False
1125 return True
1126
1127 def areAllBuildsWaiting(self):
1128 ret = True
1129
1130 builds = self.launcher.builds.values()
1131 for build in builds:
1132 client_job = None
1133 for conn in self.launcher.gearman.active_connections:
1134 for j in conn.related_jobs.values():
1135 if j.unique == build.uuid:
1136 client_job = j
1137 break
1138 if not client_job:
1139 self.log.debug("%s is not known to the gearman client" %
1140 build)
1141 ret = False
1142 continue
1143 if not client_job.handle:
1144 self.log.debug("%s has no handle" % client_job)
1145 ret = False
1146 continue
1147 server_job = self.gearman_server.jobs.get(client_job.handle)
1148 if not server_job:
1149 self.log.debug("%s is not known to the gearman server" %
1150 client_job)
1151 ret = False
1152 continue
1153 if not hasattr(server_job, 'waiting'):
1154 self.log.debug("%s is being enqueued" % server_job)
1155 ret = False
1156 continue
1157 if server_job.waiting:
1158 continue
1159 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1160 if worker_job:
1161 if worker_job.build.isWaiting():
1162 continue
1163 else:
1164 self.log.debug("%s is running" % worker_job)
1165 ret = False
1166 else:
1167 self.log.debug("%s is unassigned" % server_job)
1168 ret = False
1169 return ret
1170
1171 def waitUntilSettled(self):
1172 self.log.debug("Waiting until settled...")
1173 start = time.time()
1174 while True:
1175 if time.time() - start > 10:
1176 print 'queue status:',
1177 print self.sched.trigger_event_queue.empty(),
1178 print self.sched.result_event_queue.empty(),
1179 print self.fake_gerrit.event_queue.empty(),
1180 print self.areAllBuildsWaiting()
1181 raise Exception("Timeout waiting for Zuul to settle")
1182 # Make sure no new events show up while we're checking
1183 self.worker.lock.acquire()
1184 # have all build states propogated to zuul?
1185 if self.haveAllBuildsReported():
1186 # Join ensures that the queue is empty _and_ events have been
1187 # processed
1188 self.fake_gerrit.event_queue.join()
1189 self.sched.trigger_event_queue.join()
1190 self.sched.result_event_queue.join()
1191 self.sched.run_handler_lock.acquire()
1192 if (self.sched.trigger_event_queue.empty() and
1193 self.sched.result_event_queue.empty() and
1194 self.fake_gerrit.event_queue.empty() and
1195 not self.merge_client.build_sets and
1196 self.haveAllBuildsReported() and
1197 self.areAllBuildsWaiting()):
1198 self.sched.run_handler_lock.release()
1199 self.worker.lock.release()
1200 self.log.debug("...settled.")
1201 return
1202 self.sched.run_handler_lock.release()
1203 self.worker.lock.release()
1204 self.sched.wake_event.wait(0.1)
1205
1206 def countJobResults(self, jobs, result):
1207 jobs = filter(lambda x: x.result == result, jobs)
1208 return len(jobs)
1209
1210 def getJobFromHistory(self, name):
1211 history = self.worker.build_history
1212 for job in history:
1213 if job.name == name:
1214 return job
1215 raise Exception("Unable to find job %s in history" % name)
1216
1217 def assertEmptyQueues(self):
1218 # Make sure there are no orphaned jobs
1219 for pipeline in self.sched.layout.pipelines.values():
1220 for queue in pipeline.queues:
1221 if len(queue.queue) != 0:
1222 print 'pipeline %s queue %s contents %s' % (
1223 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001224 self.assertEqual(len(queue.queue), 0,
1225 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001226
1227 def assertReportedStat(self, key, value=None, kind=None):
1228 start = time.time()
1229 while time.time() < (start + 5):
1230 for stat in self.statsd.stats:
1231 pprint.pprint(self.statsd.stats)
1232 k, v = stat.split(':')
1233 if key == k:
1234 if value is None and kind is None:
1235 return
1236 elif value:
1237 if value == v:
1238 return
1239 elif kind:
1240 if v.endswith('|' + kind):
1241 return
1242 time.sleep(0.1)
1243
1244 pprint.pprint(self.statsd.stats)
1245 raise Exception("Key %s not found in reported stats" % key)