blob: 6045bed9d4ab776a9505dbaa0f8c89e5fb5e6125 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
265 approval = {'description': self.categories[category][0],
266 'type': category,
267 'value': str(value),
268 'by': {
269 'username': username,
270 'email': username + '@example.com',
271 },
272 'grantedOn': int(granted_on)}
273 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
274 if x['by']['username'] == username and x['type'] == category:
275 del self.patchsets[-1]['approvals'][i]
276 self.patchsets[-1]['approvals'].append(approval)
277 event = {'approvals': [approval],
278 'author': {'email': 'user@example.com',
279 'name': 'User Name',
280 'username': 'username'},
281 'change': {'branch': self.branch,
282 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
283 'number': str(self.number),
284 'owner': {'email': 'user@example.com',
285 'name': 'User Name',
286 'username': 'username'},
287 'project': self.project,
288 'subject': self.subject,
289 'topic': 'master',
290 'url': 'https://hostname/459'},
291 'comment': '',
292 'patchSet': self.patchsets[-1],
293 'type': 'comment-added'}
294 self.data['submitRecords'] = self.getSubmitRecords()
295 return json.loads(json.dumps(event))
296
297 def getSubmitRecords(self):
298 status = {}
299 for cat in self.categories.keys():
300 status[cat] = 0
301
302 for a in self.patchsets[-1]['approvals']:
303 cur = status[a['type']]
304 cat_min, cat_max = self.categories[a['type']][1:]
305 new = int(a['value'])
306 if new == cat_min:
307 cur = new
308 elif abs(new) > abs(cur):
309 cur = new
310 status[a['type']] = cur
311
312 labels = []
313 ok = True
314 for typ, cat in self.categories.items():
315 cur = status[typ]
316 cat_min, cat_max = cat[1:]
317 if cur == cat_min:
318 value = 'REJECT'
319 ok = False
320 elif cur == cat_max:
321 value = 'OK'
322 else:
323 value = 'NEED'
324 ok = False
325 labels.append({'label': cat[0], 'status': value})
326 if ok:
327 return [{'status': 'OK'}]
328 return [{'status': 'NOT_READY',
329 'labels': labels}]
330
331 def setDependsOn(self, other, patchset):
332 self.depends_on_change = other
333 d = {'id': other.data['id'],
334 'number': other.data['number'],
335 'ref': other.patchsets[patchset - 1]['ref']
336 }
337 self.data['dependsOn'] = [d]
338
339 other.needed_by_changes.append(self)
340 needed = other.data.get('neededBy', [])
341 d = {'id': self.data['id'],
342 'number': self.data['number'],
343 'ref': self.patchsets[patchset - 1]['ref'],
344 'revision': self.patchsets[patchset - 1]['revision']
345 }
346 needed.append(d)
347 other.data['neededBy'] = needed
348
349 def query(self):
350 self.queried += 1
351 d = self.data.get('dependsOn')
352 if d:
353 d = d[0]
354 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
355 d['isCurrentPatchSet'] = True
356 else:
357 d['isCurrentPatchSet'] = False
358 return json.loads(json.dumps(self.data))
359
360 def setMerged(self):
361 if (self.depends_on_change and
362 self.depends_on_change.data['status'] != 'MERGED'):
363 return
364 if self.fail_merge:
365 return
366 self.data['status'] = 'MERGED'
367 self.open = False
368
369 path = os.path.join(self.upstream_root, self.project)
370 repo = git.Repo(path)
371 repo.heads[self.branch].commit = \
372 repo.commit(self.patchsets[-1]['revision'])
373
374 def setReported(self):
375 self.reported += 1
376
377
378class FakeGerrit(object):
379 def __init__(self, *args, **kw):
380 self.event_queue = Queue.Queue()
381 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
382 self.change_number = 0
383 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700384 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700385
386 def addFakeChange(self, project, branch, subject, status='NEW'):
387 self.change_number += 1
388 c = FakeChange(self, self.change_number, project, branch, subject,
389 upstream_root=self.upstream_root,
390 status=status)
391 self.changes[self.change_number] = c
392 return c
393
394 def addEvent(self, data):
395 return self.event_queue.put(data)
396
397 def getEvent(self):
398 return self.event_queue.get()
399
400 def eventDone(self):
401 self.event_queue.task_done()
402
403 def review(self, project, changeid, message, action):
404 number, ps = changeid.split(',')
405 change = self.changes[int(number)]
406 change.messages.append(message)
407 if 'submit' in action:
408 change.setMerged()
409 if message:
410 change.setReported()
411
412 def query(self, number):
413 change = self.changes.get(int(number))
414 if change:
415 return change.query()
416 return {}
417
James E. Blairc494d542014-08-06 09:23:52 -0700418 def simpleQuery(self, query):
419 # This is currently only used to return all open changes for a
420 # project
James E. Blairf8ff9932014-08-15 15:24:24 -0700421 self.queries.append(query)
422 l = [change.query() for change in self.changes.values()]
423 l.append({"type":"stats","rowCount":1,"runTimeMilliseconds":3})
424 return l
James E. Blairc494d542014-08-06 09:23:52 -0700425
Clark Boylanb640e052014-04-03 16:41:46 -0700426 def startWatching(self, *args, **kw):
427 pass
428
429
430class BuildHistory(object):
431 def __init__(self, **kw):
432 self.__dict__.update(kw)
433
434 def __repr__(self):
435 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
436 (self.result, self.name, self.number, self.changes))
437
438
439class FakeURLOpener(object):
440 def __init__(self, upstream_root, fake_gerrit, url):
441 self.upstream_root = upstream_root
442 self.fake_gerrit = fake_gerrit
443 self.url = url
444
445 def read(self):
446 res = urlparse.urlparse(self.url)
447 path = res.path
448 project = '/'.join(path.split('/')[2:-2])
449 ret = '001e# service=git-upload-pack\n'
450 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
451 'multi_ack thin-pack side-band side-band-64k ofs-delta '
452 'shallow no-progress include-tag multi_ack_detailed no-done\n')
453 path = os.path.join(self.upstream_root, project)
454 repo = git.Repo(path)
455 for ref in repo.refs:
456 r = ref.object.hexsha + ' ' + ref.path + '\n'
457 ret += '%04x%s' % (len(r) + 4, r)
458 ret += '0000'
459 return ret
460
461
462class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
463 name = 'gerrit'
464
465 def __init__(self, upstream_root, *args):
466 super(FakeGerritTrigger, self).__init__(*args)
467 self.upstream_root = upstream_root
468
469 def getGitUrl(self, project):
470 return os.path.join(self.upstream_root, project.name)
471
472
473class FakeStatsd(threading.Thread):
474 def __init__(self):
475 threading.Thread.__init__(self)
476 self.daemon = True
477 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
478 self.sock.bind(('', 0))
479 self.port = self.sock.getsockname()[1]
480 self.wake_read, self.wake_write = os.pipe()
481 self.stats = []
482
483 def run(self):
484 while True:
485 poll = select.poll()
486 poll.register(self.sock, select.POLLIN)
487 poll.register(self.wake_read, select.POLLIN)
488 ret = poll.poll()
489 for (fd, event) in ret:
490 if fd == self.sock.fileno():
491 data = self.sock.recvfrom(1024)
492 if not data:
493 return
494 self.stats.append(data[0])
495 if fd == self.wake_read:
496 return
497
498 def stop(self):
499 os.write(self.wake_write, '1\n')
500
501
502class FakeBuild(threading.Thread):
503 log = logging.getLogger("zuul.test")
504
505 def __init__(self, worker, job, number, node):
506 threading.Thread.__init__(self)
507 self.daemon = True
508 self.worker = worker
509 self.job = job
510 self.name = job.name.split(':')[1]
511 self.number = number
512 self.node = node
513 self.parameters = json.loads(job.arguments)
514 self.unique = self.parameters['ZUUL_UUID']
515 self.wait_condition = threading.Condition()
516 self.waiting = False
517 self.aborted = False
518 self.created = time.time()
519 self.description = ''
520 self.run_error = False
521
522 def release(self):
523 self.wait_condition.acquire()
524 self.wait_condition.notify()
525 self.waiting = False
526 self.log.debug("Build %s released" % self.unique)
527 self.wait_condition.release()
528
529 def isWaiting(self):
530 self.wait_condition.acquire()
531 if self.waiting:
532 ret = True
533 else:
534 ret = False
535 self.wait_condition.release()
536 return ret
537
538 def _wait(self):
539 self.wait_condition.acquire()
540 self.waiting = True
541 self.log.debug("Build %s waiting" % self.unique)
542 self.wait_condition.wait()
543 self.wait_condition.release()
544
545 def run(self):
546 data = {
547 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
548 'name': self.name,
549 'number': self.number,
550 'manager': self.worker.worker_id,
551 'worker_name': 'My Worker',
552 'worker_hostname': 'localhost',
553 'worker_ips': ['127.0.0.1', '192.168.1.1'],
554 'worker_fqdn': 'zuul.example.org',
555 'worker_program': 'FakeBuilder',
556 'worker_version': 'v1.1',
557 'worker_extra': {'something': 'else'}
558 }
559
560 self.log.debug('Running build %s' % self.unique)
561
562 self.job.sendWorkData(json.dumps(data))
563 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
564 self.job.sendWorkStatus(0, 100)
565
566 if self.worker.hold_jobs_in_build:
567 self.log.debug('Holding build %s' % self.unique)
568 self._wait()
569 self.log.debug("Build %s continuing" % self.unique)
570
571 self.worker.lock.acquire()
572
573 result = 'SUCCESS'
574 if (('ZUUL_REF' in self.parameters) and
575 self.worker.shouldFailTest(self.name,
576 self.parameters['ZUUL_REF'])):
577 result = 'FAILURE'
578 if self.aborted:
579 result = 'ABORTED'
580
581 if self.run_error:
582 work_fail = True
583 result = 'RUN_ERROR'
584 else:
585 data['result'] = result
586 work_fail = False
587
588 changes = None
589 if 'ZUUL_CHANGE_IDS' in self.parameters:
590 changes = self.parameters['ZUUL_CHANGE_IDS']
591
592 self.worker.build_history.append(
593 BuildHistory(name=self.name, number=self.number,
594 result=result, changes=changes, node=self.node,
595 uuid=self.unique, description=self.description,
596 pipeline=self.parameters['ZUUL_PIPELINE'])
597 )
598
599 self.job.sendWorkData(json.dumps(data))
600 if work_fail:
601 self.job.sendWorkFail()
602 else:
603 self.job.sendWorkComplete(json.dumps(data))
604 del self.worker.gearman_jobs[self.job.unique]
605 self.worker.running_builds.remove(self)
606 self.worker.lock.release()
607
608
609class FakeWorker(gear.Worker):
610 def __init__(self, worker_id, test):
611 super(FakeWorker, self).__init__(worker_id)
612 self.gearman_jobs = {}
613 self.build_history = []
614 self.running_builds = []
615 self.build_counter = 0
616 self.fail_tests = {}
617 self.test = test
618
619 self.hold_jobs_in_build = False
620 self.lock = threading.Lock()
621 self.__work_thread = threading.Thread(target=self.work)
622 self.__work_thread.daemon = True
623 self.__work_thread.start()
624
625 def handleJob(self, job):
626 parts = job.name.split(":")
627 cmd = parts[0]
628 name = parts[1]
629 if len(parts) > 2:
630 node = parts[2]
631 else:
632 node = None
633 if cmd == 'build':
634 self.handleBuild(job, name, node)
635 elif cmd == 'stop':
636 self.handleStop(job, name)
637 elif cmd == 'set_description':
638 self.handleSetDescription(job, name)
639
640 def handleBuild(self, job, name, node):
641 build = FakeBuild(self, job, self.build_counter, node)
642 job.build = build
643 self.gearman_jobs[job.unique] = job
644 self.build_counter += 1
645
646 self.running_builds.append(build)
647 build.start()
648
649 def handleStop(self, job, name):
650 self.log.debug("handle stop")
651 parameters = json.loads(job.arguments)
652 name = parameters['name']
653 number = parameters['number']
654 for build in self.running_builds:
655 if build.name == name and build.number == number:
656 build.aborted = True
657 build.release()
658 job.sendWorkComplete()
659 return
660 job.sendWorkFail()
661
662 def handleSetDescription(self, job, name):
663 self.log.debug("handle set description")
664 parameters = json.loads(job.arguments)
665 name = parameters['name']
666 number = parameters['number']
667 descr = parameters['html_description']
668 for build in self.running_builds:
669 if build.name == name and build.number == number:
670 build.description = descr
671 job.sendWorkComplete()
672 return
673 for build in self.build_history:
674 if build.name == name and build.number == number:
675 build.description = descr
676 job.sendWorkComplete()
677 return
678 job.sendWorkFail()
679
680 def work(self):
681 while self.running:
682 try:
683 job = self.getJob()
684 except gear.InterruptedError:
685 continue
686 try:
687 self.handleJob(job)
688 except:
689 self.log.exception("Worker exception:")
690
691 def addFailTest(self, name, change):
692 l = self.fail_tests.get(name, [])
693 l.append(change)
694 self.fail_tests[name] = l
695
696 def shouldFailTest(self, name, ref):
697 l = self.fail_tests.get(name, [])
698 for change in l:
699 if self.test.ref_has_change(ref, change):
700 return True
701 return False
702
703 def release(self, regex=None):
704 builds = self.running_builds[:]
705 self.log.debug("releasing build %s (%s)" % (regex,
706 len(self.running_builds)))
707 for build in builds:
708 if not regex or re.match(regex, build.name):
709 self.log.debug("releasing build %s" %
710 (build.parameters['ZUUL_UUID']))
711 build.release()
712 else:
713 self.log.debug("not releasing build %s" %
714 (build.parameters['ZUUL_UUID']))
715 self.log.debug("done releasing builds %s (%s)" %
716 (regex, len(self.running_builds)))
717
718
719class FakeGearmanServer(gear.Server):
720 def __init__(self):
721 self.hold_jobs_in_queue = False
722 super(FakeGearmanServer, self).__init__(0)
723
724 def getJobForConnection(self, connection, peek=False):
725 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
726 for job in queue:
727 if not hasattr(job, 'waiting'):
728 if job.name.startswith('build:'):
729 job.waiting = self.hold_jobs_in_queue
730 else:
731 job.waiting = False
732 if job.waiting:
733 continue
734 if job.name in connection.functions:
735 if not peek:
736 queue.remove(job)
737 connection.related_jobs[job.handle] = job
738 job.worker_connection = connection
739 job.running = True
740 return job
741 return None
742
743 def release(self, regex=None):
744 released = False
745 qlen = (len(self.high_queue) + len(self.normal_queue) +
746 len(self.low_queue))
747 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
748 for job in self.getQueue():
749 cmd, name = job.name.split(':')
750 if cmd != 'build':
751 continue
752 if not regex or re.match(regex, name):
753 self.log.debug("releasing queued job %s" %
754 job.unique)
755 job.waiting = False
756 released = True
757 else:
758 self.log.debug("not releasing queued job %s" %
759 job.unique)
760 if released:
761 self.wakeConnections()
762 qlen = (len(self.high_queue) + len(self.normal_queue) +
763 len(self.low_queue))
764 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
765
766
767class FakeSMTP(object):
768 log = logging.getLogger('zuul.FakeSMTP')
769
770 def __init__(self, messages, server, port):
771 self.server = server
772 self.port = port
773 self.messages = messages
774
775 def sendmail(self, from_email, to_email, msg):
776 self.log.info("Sending email from %s, to %s, with msg %s" % (
777 from_email, to_email, msg))
778
779 headers = msg.split('\n\n', 1)[0]
780 body = msg.split('\n\n', 1)[1]
781
782 self.messages.append(dict(
783 from_email=from_email,
784 to_email=to_email,
785 msg=msg,
786 headers=headers,
787 body=body,
788 ))
789
790 return True
791
792 def quit(self):
793 return True
794
795
796class FakeSwiftClientConnection(swiftclient.client.Connection):
797 def post_account(self, headers):
798 # Do nothing
799 pass
800
801 def get_auth(self):
802 # Returns endpoint and (unused) auth token
803 endpoint = os.path.join('https://storage.example.org', 'V1',
804 'AUTH_account')
805 return endpoint, ''
806
807
808class ZuulTestCase(testtools.TestCase):
809 log = logging.getLogger("zuul.test")
810
811 def setUp(self):
812 super(ZuulTestCase, self).setUp()
813 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
814 try:
815 test_timeout = int(test_timeout)
816 except ValueError:
817 # If timeout value is invalid do not set a timeout.
818 test_timeout = 0
819 if test_timeout > 0:
820 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
821
822 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
823 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
824 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
825 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
826 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
827 os.environ.get('OS_STDERR_CAPTURE') == '1'):
828 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
829 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
830 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
831 os.environ.get('OS_LOG_CAPTURE') == '1'):
832 self.useFixture(fixtures.FakeLogger(
833 level=logging.DEBUG,
834 format='%(asctime)s %(name)-32s '
835 '%(levelname)-8s %(message)s'))
James E. Blair97d902e2014-08-21 13:25:56 -0700836 if USE_TEMPDIR:
837 tmp_root = self.useFixture(fixtures.TempDir(
838 rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
839 else:
840 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700841 self.test_root = os.path.join(tmp_root, "zuul-test")
842 self.upstream_root = os.path.join(self.test_root, "upstream")
843 self.git_root = os.path.join(self.test_root, "git")
844
845 if os.path.exists(self.test_root):
846 shutil.rmtree(self.test_root)
847 os.makedirs(self.test_root)
848 os.makedirs(self.upstream_root)
849 os.makedirs(self.git_root)
850
851 # Make per test copy of Configuration.
852 self.setup_config()
853 self.config.set('zuul', 'layout_config',
854 os.path.join(FIXTURE_DIR, "layout.yaml"))
855 self.config.set('merger', 'git_dir', self.git_root)
856
857 # For each project in config:
858 self.init_repo("org/project")
859 self.init_repo("org/project1")
860 self.init_repo("org/project2")
861 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700862 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700863 self.init_repo("org/project5")
864 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700865 self.init_repo("org/one-job-project")
866 self.init_repo("org/nonvoting-project")
867 self.init_repo("org/templated-project")
868 self.init_repo("org/layered-project")
869 self.init_repo("org/node-project")
870 self.init_repo("org/conflict-project")
871 self.init_repo("org/noop-project")
872 self.init_repo("org/experimental-project")
873
874 self.statsd = FakeStatsd()
875 os.environ['STATSD_HOST'] = 'localhost'
876 os.environ['STATSD_PORT'] = str(self.statsd.port)
877 self.statsd.start()
878 # the statsd client object is configured in the statsd module import
879 reload(statsd)
880 reload(zuul.scheduler)
881
882 self.gearman_server = FakeGearmanServer()
883
884 self.config.set('gearman', 'port', str(self.gearman_server.port))
885
886 self.worker = FakeWorker('fake_worker', self)
887 self.worker.addServer('127.0.0.1', self.gearman_server.port)
888 self.gearman_server.worker = self.worker
889
890 self.merge_server = zuul.merger.server.MergeServer(self.config)
891 self.merge_server.start()
892
893 self.sched = zuul.scheduler.Scheduler()
894
895 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
896 FakeSwiftClientConnection))
897 self.swift = zuul.lib.swift.Swift(self.config)
898
899 def URLOpenerFactory(*args, **kw):
900 if isinstance(args[0], urllib2.Request):
901 return old_urlopen(*args, **kw)
902 args = [self.fake_gerrit] + list(args)
903 return FakeURLOpener(self.upstream_root, *args, **kw)
904
905 old_urlopen = urllib2.urlopen
906 urllib2.urlopen = URLOpenerFactory
907
908 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
909 self.swift)
910 self.merge_client = zuul.merger.client.MergeClient(
911 self.config, self.sched)
912
913 self.smtp_messages = []
914
915 def FakeSMTPFactory(*args, **kw):
916 args = [self.smtp_messages] + list(args)
917 return FakeSMTP(*args, **kw)
918
919 zuul.lib.gerrit.Gerrit = FakeGerrit
920 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
921
922 self.gerrit = FakeGerritTrigger(
923 self.upstream_root, self.config, self.sched)
924 self.gerrit.replication_timeout = 1.5
925 self.gerrit.replication_retry_interval = 0.5
926 self.fake_gerrit = self.gerrit.gerrit
927 self.fake_gerrit.upstream_root = self.upstream_root
928
929 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
930 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
931
932 self.sched.setLauncher(self.launcher)
933 self.sched.setMerger(self.merge_client)
934 self.sched.registerTrigger(self.gerrit)
935 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
936 self.sched.registerTrigger(self.timer)
James E. Blairc494d542014-08-06 09:23:52 -0700937 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config, self.sched)
938 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700939
940 self.sched.registerReporter(
941 zuul.reporter.gerrit.Reporter(self.gerrit))
942 self.smtp_reporter = zuul.reporter.smtp.Reporter(
943 self.config.get('smtp', 'default_from'),
944 self.config.get('smtp', 'default_to'),
945 self.config.get('smtp', 'server'))
946 self.sched.registerReporter(self.smtp_reporter)
947
948 self.sched.start()
949 self.sched.reconfigure(self.config)
950 self.sched.resume()
951 self.webapp.start()
952 self.rpc.start()
953 self.launcher.gearman.waitForServer()
954 self.registerJobs()
955 self.builds = self.worker.running_builds
956 self.history = self.worker.build_history
957
958 self.addCleanup(self.assertFinalState)
959 self.addCleanup(self.shutdown)
960
961 def setup_config(self):
962 """Per test config object. Override to set different config."""
963 self.config = ConfigParser.ConfigParser()
964 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
965
966 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700967 # Make sure that git.Repo objects have been garbage collected.
968 repos = []
969 gc.collect()
970 for obj in gc.get_objects():
971 if isinstance(obj, git.Repo):
972 repos.append(obj)
973 self.assertEqual(len(repos), 0)
974 self.assertEmptyQueues()
975
976 def shutdown(self):
977 self.log.debug("Shutting down after tests")
978 self.launcher.stop()
979 self.merge_server.stop()
980 self.merge_server.join()
981 self.merge_client.stop()
982 self.worker.shutdown()
983 self.gerrit.stop()
984 self.timer.stop()
985 self.sched.stop()
986 self.sched.join()
987 self.statsd.stop()
988 self.statsd.join()
989 self.webapp.stop()
990 self.webapp.join()
991 self.rpc.stop()
992 self.rpc.join()
993 self.gearman_server.shutdown()
994 threads = threading.enumerate()
995 if len(threads) > 1:
996 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -0700997
998 def init_repo(self, project):
999 parts = project.split('/')
1000 path = os.path.join(self.upstream_root, *parts[:-1])
1001 if not os.path.exists(path):
1002 os.makedirs(path)
1003 path = os.path.join(self.upstream_root, project)
1004 repo = git.Repo.init(path)
1005
1006 repo.config_writer().set_value('user', 'email', 'user@example.com')
1007 repo.config_writer().set_value('user', 'name', 'User Name')
1008 repo.config_writer().write()
1009
1010 fn = os.path.join(path, 'README')
1011 f = open(fn, 'w')
1012 f.write("test\n")
1013 f.close()
1014 repo.index.add([fn])
1015 repo.index.commit('initial commit')
1016 master = repo.create_head('master')
1017 repo.create_tag('init')
1018
James E. Blair97d902e2014-08-21 13:25:56 -07001019 repo.head.reference = master
1020 repo.head.reset(index=True, working_tree=True)
1021 repo.git.clean('-x', '-f', '-d')
1022
1023 self.create_branch(project, 'mp')
1024
1025 def create_branch(self, project, branch):
1026 path = os.path.join(self.upstream_root, project)
1027 repo = git.Repo.init(path)
1028 fn = os.path.join(path, 'README')
1029
1030 branch_head = repo.create_head(branch)
1031 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001032 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001033 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001034 f.close()
1035 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001036 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001037
James E. Blair97d902e2014-08-21 13:25:56 -07001038 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001039 repo.head.reset(index=True, working_tree=True)
1040 repo.git.clean('-x', '-f', '-d')
1041
1042 def ref_has_change(self, ref, change):
1043 path = os.path.join(self.git_root, change.project)
1044 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001045 try:
1046 for commit in repo.iter_commits(ref):
1047 if commit.message.strip() == ('%s-1' % change.subject):
1048 return True
1049 except GitCommandError:
1050 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001051 return False
1052
1053 def job_has_changes(self, *args):
1054 job = args[0]
1055 commits = args[1:]
1056 if isinstance(job, FakeBuild):
1057 parameters = job.parameters
1058 else:
1059 parameters = json.loads(job.arguments)
1060 project = parameters['ZUUL_PROJECT']
1061 path = os.path.join(self.git_root, project)
1062 repo = git.Repo(path)
1063 ref = parameters['ZUUL_REF']
1064 sha = parameters['ZUUL_COMMIT']
1065 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1066 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1067 commit_messages = ['%s-1' % commit.subject for commit in commits]
1068 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1069 " repo_messages %s; sha %s" % (job, commit_messages,
1070 repo_messages, sha))
1071 for msg in commit_messages:
1072 if msg not in repo_messages:
1073 self.log.debug(" messages do not match")
1074 return False
1075 if repo_shas[0] != sha:
1076 self.log.debug(" sha does not match")
1077 return False
1078 self.log.debug(" OK")
1079 return True
1080
1081 def registerJobs(self):
1082 count = 0
1083 for job in self.sched.layout.jobs.keys():
1084 self.worker.registerFunction('build:' + job)
1085 count += 1
1086 self.worker.registerFunction('stop:' + self.worker.worker_id)
1087 count += 1
1088
1089 while len(self.gearman_server.functions) < count:
1090 time.sleep(0)
1091
1092 def release(self, job):
1093 if isinstance(job, FakeBuild):
1094 job.release()
1095 else:
1096 job.waiting = False
1097 self.log.debug("Queued job %s released" % job.unique)
1098 self.gearman_server.wakeConnections()
1099
1100 def getParameter(self, job, name):
1101 if isinstance(job, FakeBuild):
1102 return job.parameters[name]
1103 else:
1104 parameters = json.loads(job.arguments)
1105 return parameters[name]
1106
1107 def resetGearmanServer(self):
1108 self.worker.setFunctions([])
1109 while True:
1110 done = True
1111 for connection in self.gearman_server.active_connections:
1112 if (connection.functions and
1113 connection.client_id not in ['Zuul RPC Listener',
1114 'Zuul Merger']):
1115 done = False
1116 if done:
1117 break
1118 time.sleep(0)
1119 self.gearman_server.functions = set()
1120 self.rpc.register()
1121 self.merge_server.register()
1122
1123 def haveAllBuildsReported(self):
1124 # See if Zuul is waiting on a meta job to complete
1125 if self.launcher.meta_jobs:
1126 return False
1127 # Find out if every build that the worker has completed has been
1128 # reported back to Zuul. If it hasn't then that means a Gearman
1129 # event is still in transit and the system is not stable.
1130 for build in self.worker.build_history:
1131 zbuild = self.launcher.builds.get(build.uuid)
1132 if not zbuild:
1133 # It has already been reported
1134 continue
1135 # It hasn't been reported yet.
1136 return False
1137 # Make sure that none of the worker connections are in GRAB_WAIT
1138 for connection in self.worker.active_connections:
1139 if connection.state == 'GRAB_WAIT':
1140 return False
1141 return True
1142
1143 def areAllBuildsWaiting(self):
1144 ret = True
1145
1146 builds = self.launcher.builds.values()
1147 for build in builds:
1148 client_job = None
1149 for conn in self.launcher.gearman.active_connections:
1150 for j in conn.related_jobs.values():
1151 if j.unique == build.uuid:
1152 client_job = j
1153 break
1154 if not client_job:
1155 self.log.debug("%s is not known to the gearman client" %
1156 build)
1157 ret = False
1158 continue
1159 if not client_job.handle:
1160 self.log.debug("%s has no handle" % client_job)
1161 ret = False
1162 continue
1163 server_job = self.gearman_server.jobs.get(client_job.handle)
1164 if not server_job:
1165 self.log.debug("%s is not known to the gearman server" %
1166 client_job)
1167 ret = False
1168 continue
1169 if not hasattr(server_job, 'waiting'):
1170 self.log.debug("%s is being enqueued" % server_job)
1171 ret = False
1172 continue
1173 if server_job.waiting:
1174 continue
1175 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1176 if worker_job:
1177 if worker_job.build.isWaiting():
1178 continue
1179 else:
1180 self.log.debug("%s is running" % worker_job)
1181 ret = False
1182 else:
1183 self.log.debug("%s is unassigned" % server_job)
1184 ret = False
1185 return ret
1186
1187 def waitUntilSettled(self):
1188 self.log.debug("Waiting until settled...")
1189 start = time.time()
1190 while True:
1191 if time.time() - start > 10:
1192 print 'queue status:',
1193 print self.sched.trigger_event_queue.empty(),
1194 print self.sched.result_event_queue.empty(),
1195 print self.fake_gerrit.event_queue.empty(),
1196 print self.areAllBuildsWaiting()
1197 raise Exception("Timeout waiting for Zuul to settle")
1198 # Make sure no new events show up while we're checking
1199 self.worker.lock.acquire()
1200 # have all build states propogated to zuul?
1201 if self.haveAllBuildsReported():
1202 # Join ensures that the queue is empty _and_ events have been
1203 # processed
1204 self.fake_gerrit.event_queue.join()
1205 self.sched.trigger_event_queue.join()
1206 self.sched.result_event_queue.join()
1207 self.sched.run_handler_lock.acquire()
1208 if (self.sched.trigger_event_queue.empty() and
1209 self.sched.result_event_queue.empty() and
1210 self.fake_gerrit.event_queue.empty() and
1211 not self.merge_client.build_sets and
1212 self.haveAllBuildsReported() and
1213 self.areAllBuildsWaiting()):
1214 self.sched.run_handler_lock.release()
1215 self.worker.lock.release()
1216 self.log.debug("...settled.")
1217 return
1218 self.sched.run_handler_lock.release()
1219 self.worker.lock.release()
1220 self.sched.wake_event.wait(0.1)
1221
1222 def countJobResults(self, jobs, result):
1223 jobs = filter(lambda x: x.result == result, jobs)
1224 return len(jobs)
1225
1226 def getJobFromHistory(self, name):
1227 history = self.worker.build_history
1228 for job in history:
1229 if job.name == name:
1230 return job
1231 raise Exception("Unable to find job %s in history" % name)
1232
1233 def assertEmptyQueues(self):
1234 # Make sure there are no orphaned jobs
1235 for pipeline in self.sched.layout.pipelines.values():
1236 for queue in pipeline.queues:
1237 if len(queue.queue) != 0:
1238 print 'pipeline %s queue %s contents %s' % (
1239 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001240 self.assertEqual(len(queue.queue), 0,
1241 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001242
1243 def assertReportedStat(self, key, value=None, kind=None):
1244 start = time.time()
1245 while time.time() < (start + 5):
1246 for stat in self.statsd.stats:
1247 pprint.pprint(self.statsd.stats)
1248 k, v = stat.split(':')
1249 if key == k:
1250 if value is None and kind is None:
1251 return
1252 elif value:
1253 if value == v:
1254 return
1255 elif kind:
1256 if v.endswith('|' + kind):
1257 return
1258 time.sleep(0.1)
1259
1260 pprint.pprint(self.statsd.stats)
1261 raise Exception("Key %s not found in reported stats" % key)