blob: 18d5f5a84f71e9081de8126f7860775ac566f169 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
381 def __init__(self, *args, **kw):
382 self.event_queue = Queue.Queue()
383 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
384 self.change_number = 0
385 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700386 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700387
388 def addFakeChange(self, project, branch, subject, status='NEW'):
389 self.change_number += 1
390 c = FakeChange(self, self.change_number, project, branch, subject,
391 upstream_root=self.upstream_root,
392 status=status)
393 self.changes[self.change_number] = c
394 return c
395
396 def addEvent(self, data):
397 return self.event_queue.put(data)
398
399 def getEvent(self):
400 return self.event_queue.get()
401
402 def eventDone(self):
403 self.event_queue.task_done()
404
405 def review(self, project, changeid, message, action):
406 number, ps = changeid.split(',')
407 change = self.changes[int(number)]
408 change.messages.append(message)
409 if 'submit' in action:
410 change.setMerged()
411 if message:
412 change.setReported()
413
414 def query(self, number):
415 change = self.changes.get(int(number))
416 if change:
417 return change.query()
418 return {}
419
James E. Blairc494d542014-08-06 09:23:52 -0700420 def simpleQuery(self, query):
James E. Blairf8ff9932014-08-15 15:24:24 -0700421 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800422 if query.startswith('change:'):
423 # Query a specific changeid
424 changeid = query[len('change:'):]
425 l = [change.query() for change in self.changes.values()
426 if change.data['id'] == changeid]
427 else:
428 # Query all open changes
429 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700430 return l
James E. Blairc494d542014-08-06 09:23:52 -0700431
Clark Boylanb640e052014-04-03 16:41:46 -0700432 def startWatching(self, *args, **kw):
433 pass
434
435
436class BuildHistory(object):
437 def __init__(self, **kw):
438 self.__dict__.update(kw)
439
440 def __repr__(self):
441 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
442 (self.result, self.name, self.number, self.changes))
443
444
445class FakeURLOpener(object):
446 def __init__(self, upstream_root, fake_gerrit, url):
447 self.upstream_root = upstream_root
448 self.fake_gerrit = fake_gerrit
449 self.url = url
450
451 def read(self):
452 res = urlparse.urlparse(self.url)
453 path = res.path
454 project = '/'.join(path.split('/')[2:-2])
455 ret = '001e# service=git-upload-pack\n'
456 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
457 'multi_ack thin-pack side-band side-band-64k ofs-delta '
458 'shallow no-progress include-tag multi_ack_detailed no-done\n')
459 path = os.path.join(self.upstream_root, project)
460 repo = git.Repo(path)
461 for ref in repo.refs:
462 r = ref.object.hexsha + ' ' + ref.path + '\n'
463 ret += '%04x%s' % (len(r) + 4, r)
464 ret += '0000'
465 return ret
466
467
468class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
469 name = 'gerrit'
470
471 def __init__(self, upstream_root, *args):
472 super(FakeGerritTrigger, self).__init__(*args)
473 self.upstream_root = upstream_root
474
475 def getGitUrl(self, project):
476 return os.path.join(self.upstream_root, project.name)
477
478
479class FakeStatsd(threading.Thread):
480 def __init__(self):
481 threading.Thread.__init__(self)
482 self.daemon = True
483 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
484 self.sock.bind(('', 0))
485 self.port = self.sock.getsockname()[1]
486 self.wake_read, self.wake_write = os.pipe()
487 self.stats = []
488
489 def run(self):
490 while True:
491 poll = select.poll()
492 poll.register(self.sock, select.POLLIN)
493 poll.register(self.wake_read, select.POLLIN)
494 ret = poll.poll()
495 for (fd, event) in ret:
496 if fd == self.sock.fileno():
497 data = self.sock.recvfrom(1024)
498 if not data:
499 return
500 self.stats.append(data[0])
501 if fd == self.wake_read:
502 return
503
504 def stop(self):
505 os.write(self.wake_write, '1\n')
506
507
508class FakeBuild(threading.Thread):
509 log = logging.getLogger("zuul.test")
510
511 def __init__(self, worker, job, number, node):
512 threading.Thread.__init__(self)
513 self.daemon = True
514 self.worker = worker
515 self.job = job
516 self.name = job.name.split(':')[1]
517 self.number = number
518 self.node = node
519 self.parameters = json.loads(job.arguments)
520 self.unique = self.parameters['ZUUL_UUID']
521 self.wait_condition = threading.Condition()
522 self.waiting = False
523 self.aborted = False
524 self.created = time.time()
525 self.description = ''
526 self.run_error = False
527
528 def release(self):
529 self.wait_condition.acquire()
530 self.wait_condition.notify()
531 self.waiting = False
532 self.log.debug("Build %s released" % self.unique)
533 self.wait_condition.release()
534
535 def isWaiting(self):
536 self.wait_condition.acquire()
537 if self.waiting:
538 ret = True
539 else:
540 ret = False
541 self.wait_condition.release()
542 return ret
543
544 def _wait(self):
545 self.wait_condition.acquire()
546 self.waiting = True
547 self.log.debug("Build %s waiting" % self.unique)
548 self.wait_condition.wait()
549 self.wait_condition.release()
550
551 def run(self):
552 data = {
553 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
554 'name': self.name,
555 'number': self.number,
556 'manager': self.worker.worker_id,
557 'worker_name': 'My Worker',
558 'worker_hostname': 'localhost',
559 'worker_ips': ['127.0.0.1', '192.168.1.1'],
560 'worker_fqdn': 'zuul.example.org',
561 'worker_program': 'FakeBuilder',
562 'worker_version': 'v1.1',
563 'worker_extra': {'something': 'else'}
564 }
565
566 self.log.debug('Running build %s' % self.unique)
567
568 self.job.sendWorkData(json.dumps(data))
569 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
570 self.job.sendWorkStatus(0, 100)
571
572 if self.worker.hold_jobs_in_build:
573 self.log.debug('Holding build %s' % self.unique)
574 self._wait()
575 self.log.debug("Build %s continuing" % self.unique)
576
577 self.worker.lock.acquire()
578
579 result = 'SUCCESS'
580 if (('ZUUL_REF' in self.parameters) and
581 self.worker.shouldFailTest(self.name,
582 self.parameters['ZUUL_REF'])):
583 result = 'FAILURE'
584 if self.aborted:
585 result = 'ABORTED'
586
587 if self.run_error:
588 work_fail = True
589 result = 'RUN_ERROR'
590 else:
591 data['result'] = result
592 work_fail = False
593
594 changes = None
595 if 'ZUUL_CHANGE_IDS' in self.parameters:
596 changes = self.parameters['ZUUL_CHANGE_IDS']
597
598 self.worker.build_history.append(
599 BuildHistory(name=self.name, number=self.number,
600 result=result, changes=changes, node=self.node,
601 uuid=self.unique, description=self.description,
602 pipeline=self.parameters['ZUUL_PIPELINE'])
603 )
604
605 self.job.sendWorkData(json.dumps(data))
606 if work_fail:
607 self.job.sendWorkFail()
608 else:
609 self.job.sendWorkComplete(json.dumps(data))
610 del self.worker.gearman_jobs[self.job.unique]
611 self.worker.running_builds.remove(self)
612 self.worker.lock.release()
613
614
615class FakeWorker(gear.Worker):
616 def __init__(self, worker_id, test):
617 super(FakeWorker, self).__init__(worker_id)
618 self.gearman_jobs = {}
619 self.build_history = []
620 self.running_builds = []
621 self.build_counter = 0
622 self.fail_tests = {}
623 self.test = test
624
625 self.hold_jobs_in_build = False
626 self.lock = threading.Lock()
627 self.__work_thread = threading.Thread(target=self.work)
628 self.__work_thread.daemon = True
629 self.__work_thread.start()
630
631 def handleJob(self, job):
632 parts = job.name.split(":")
633 cmd = parts[0]
634 name = parts[1]
635 if len(parts) > 2:
636 node = parts[2]
637 else:
638 node = None
639 if cmd == 'build':
640 self.handleBuild(job, name, node)
641 elif cmd == 'stop':
642 self.handleStop(job, name)
643 elif cmd == 'set_description':
644 self.handleSetDescription(job, name)
645
646 def handleBuild(self, job, name, node):
647 build = FakeBuild(self, job, self.build_counter, node)
648 job.build = build
649 self.gearman_jobs[job.unique] = job
650 self.build_counter += 1
651
652 self.running_builds.append(build)
653 build.start()
654
655 def handleStop(self, job, name):
656 self.log.debug("handle stop")
657 parameters = json.loads(job.arguments)
658 name = parameters['name']
659 number = parameters['number']
660 for build in self.running_builds:
661 if build.name == name and build.number == number:
662 build.aborted = True
663 build.release()
664 job.sendWorkComplete()
665 return
666 job.sendWorkFail()
667
668 def handleSetDescription(self, job, name):
669 self.log.debug("handle set description")
670 parameters = json.loads(job.arguments)
671 name = parameters['name']
672 number = parameters['number']
673 descr = parameters['html_description']
674 for build in self.running_builds:
675 if build.name == name and build.number == number:
676 build.description = descr
677 job.sendWorkComplete()
678 return
679 for build in self.build_history:
680 if build.name == name and build.number == number:
681 build.description = descr
682 job.sendWorkComplete()
683 return
684 job.sendWorkFail()
685
686 def work(self):
687 while self.running:
688 try:
689 job = self.getJob()
690 except gear.InterruptedError:
691 continue
692 try:
693 self.handleJob(job)
694 except:
695 self.log.exception("Worker exception:")
696
697 def addFailTest(self, name, change):
698 l = self.fail_tests.get(name, [])
699 l.append(change)
700 self.fail_tests[name] = l
701
702 def shouldFailTest(self, name, ref):
703 l = self.fail_tests.get(name, [])
704 for change in l:
705 if self.test.ref_has_change(ref, change):
706 return True
707 return False
708
709 def release(self, regex=None):
710 builds = self.running_builds[:]
711 self.log.debug("releasing build %s (%s)" % (regex,
712 len(self.running_builds)))
713 for build in builds:
714 if not regex or re.match(regex, build.name):
715 self.log.debug("releasing build %s" %
716 (build.parameters['ZUUL_UUID']))
717 build.release()
718 else:
719 self.log.debug("not releasing build %s" %
720 (build.parameters['ZUUL_UUID']))
721 self.log.debug("done releasing builds %s (%s)" %
722 (regex, len(self.running_builds)))
723
724
725class FakeGearmanServer(gear.Server):
726 def __init__(self):
727 self.hold_jobs_in_queue = False
728 super(FakeGearmanServer, self).__init__(0)
729
730 def getJobForConnection(self, connection, peek=False):
731 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
732 for job in queue:
733 if not hasattr(job, 'waiting'):
734 if job.name.startswith('build:'):
735 job.waiting = self.hold_jobs_in_queue
736 else:
737 job.waiting = False
738 if job.waiting:
739 continue
740 if job.name in connection.functions:
741 if not peek:
742 queue.remove(job)
743 connection.related_jobs[job.handle] = job
744 job.worker_connection = connection
745 job.running = True
746 return job
747 return None
748
749 def release(self, regex=None):
750 released = False
751 qlen = (len(self.high_queue) + len(self.normal_queue) +
752 len(self.low_queue))
753 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
754 for job in self.getQueue():
755 cmd, name = job.name.split(':')
756 if cmd != 'build':
757 continue
758 if not regex or re.match(regex, name):
759 self.log.debug("releasing queued job %s" %
760 job.unique)
761 job.waiting = False
762 released = True
763 else:
764 self.log.debug("not releasing queued job %s" %
765 job.unique)
766 if released:
767 self.wakeConnections()
768 qlen = (len(self.high_queue) + len(self.normal_queue) +
769 len(self.low_queue))
770 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
771
772
773class FakeSMTP(object):
774 log = logging.getLogger('zuul.FakeSMTP')
775
776 def __init__(self, messages, server, port):
777 self.server = server
778 self.port = port
779 self.messages = messages
780
781 def sendmail(self, from_email, to_email, msg):
782 self.log.info("Sending email from %s, to %s, with msg %s" % (
783 from_email, to_email, msg))
784
785 headers = msg.split('\n\n', 1)[0]
786 body = msg.split('\n\n', 1)[1]
787
788 self.messages.append(dict(
789 from_email=from_email,
790 to_email=to_email,
791 msg=msg,
792 headers=headers,
793 body=body,
794 ))
795
796 return True
797
798 def quit(self):
799 return True
800
801
802class FakeSwiftClientConnection(swiftclient.client.Connection):
803 def post_account(self, headers):
804 # Do nothing
805 pass
806
807 def get_auth(self):
808 # Returns endpoint and (unused) auth token
809 endpoint = os.path.join('https://storage.example.org', 'V1',
810 'AUTH_account')
811 return endpoint, ''
812
813
Maru Newby3fe5f852015-01-13 04:22:14 +0000814class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700815 log = logging.getLogger("zuul.test")
816
817 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000818 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700819 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
820 try:
821 test_timeout = int(test_timeout)
822 except ValueError:
823 # If timeout value is invalid do not set a timeout.
824 test_timeout = 0
825 if test_timeout > 0:
826 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
827
828 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
829 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
830 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
831 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
832 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
833 os.environ.get('OS_STDERR_CAPTURE') == '1'):
834 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
835 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
836 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
837 os.environ.get('OS_LOG_CAPTURE') == '1'):
838 self.useFixture(fixtures.FakeLogger(
839 level=logging.DEBUG,
840 format='%(asctime)s %(name)-32s '
841 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000842
843
844class ZuulTestCase(BaseTestCase):
845
846 def setUp(self):
847 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700848 if USE_TEMPDIR:
849 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000850 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
851 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700852 else:
853 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700854 self.test_root = os.path.join(tmp_root, "zuul-test")
855 self.upstream_root = os.path.join(self.test_root, "upstream")
856 self.git_root = os.path.join(self.test_root, "git")
857
858 if os.path.exists(self.test_root):
859 shutil.rmtree(self.test_root)
860 os.makedirs(self.test_root)
861 os.makedirs(self.upstream_root)
862 os.makedirs(self.git_root)
863
864 # Make per test copy of Configuration.
865 self.setup_config()
866 self.config.set('zuul', 'layout_config',
867 os.path.join(FIXTURE_DIR, "layout.yaml"))
868 self.config.set('merger', 'git_dir', self.git_root)
869
870 # For each project in config:
871 self.init_repo("org/project")
872 self.init_repo("org/project1")
873 self.init_repo("org/project2")
874 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700875 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700876 self.init_repo("org/project5")
877 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700878 self.init_repo("org/one-job-project")
879 self.init_repo("org/nonvoting-project")
880 self.init_repo("org/templated-project")
881 self.init_repo("org/layered-project")
882 self.init_repo("org/node-project")
883 self.init_repo("org/conflict-project")
884 self.init_repo("org/noop-project")
885 self.init_repo("org/experimental-project")
886
887 self.statsd = FakeStatsd()
888 os.environ['STATSD_HOST'] = 'localhost'
889 os.environ['STATSD_PORT'] = str(self.statsd.port)
890 self.statsd.start()
891 # the statsd client object is configured in the statsd module import
892 reload(statsd)
893 reload(zuul.scheduler)
894
895 self.gearman_server = FakeGearmanServer()
896
897 self.config.set('gearman', 'port', str(self.gearman_server.port))
898
899 self.worker = FakeWorker('fake_worker', self)
900 self.worker.addServer('127.0.0.1', self.gearman_server.port)
901 self.gearman_server.worker = self.worker
902
903 self.merge_server = zuul.merger.server.MergeServer(self.config)
904 self.merge_server.start()
905
906 self.sched = zuul.scheduler.Scheduler()
907
908 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
909 FakeSwiftClientConnection))
910 self.swift = zuul.lib.swift.Swift(self.config)
911
912 def URLOpenerFactory(*args, **kw):
913 if isinstance(args[0], urllib2.Request):
914 return old_urlopen(*args, **kw)
915 args = [self.fake_gerrit] + list(args)
916 return FakeURLOpener(self.upstream_root, *args, **kw)
917
918 old_urlopen = urllib2.urlopen
919 urllib2.urlopen = URLOpenerFactory
920
921 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
922 self.swift)
923 self.merge_client = zuul.merger.client.MergeClient(
924 self.config, self.sched)
925
926 self.smtp_messages = []
927
928 def FakeSMTPFactory(*args, **kw):
929 args = [self.smtp_messages] + list(args)
930 return FakeSMTP(*args, **kw)
931
932 zuul.lib.gerrit.Gerrit = FakeGerrit
933 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
934
935 self.gerrit = FakeGerritTrigger(
936 self.upstream_root, self.config, self.sched)
937 self.gerrit.replication_timeout = 1.5
938 self.gerrit.replication_retry_interval = 0.5
939 self.fake_gerrit = self.gerrit.gerrit
940 self.fake_gerrit.upstream_root = self.upstream_root
941
942 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
943 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
944
945 self.sched.setLauncher(self.launcher)
946 self.sched.setMerger(self.merge_client)
947 self.sched.registerTrigger(self.gerrit)
948 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
949 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000950 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
951 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700952 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700953
954 self.sched.registerReporter(
955 zuul.reporter.gerrit.Reporter(self.gerrit))
956 self.smtp_reporter = zuul.reporter.smtp.Reporter(
957 self.config.get('smtp', 'default_from'),
958 self.config.get('smtp', 'default_to'),
959 self.config.get('smtp', 'server'))
960 self.sched.registerReporter(self.smtp_reporter)
961
962 self.sched.start()
963 self.sched.reconfigure(self.config)
964 self.sched.resume()
965 self.webapp.start()
966 self.rpc.start()
967 self.launcher.gearman.waitForServer()
968 self.registerJobs()
969 self.builds = self.worker.running_builds
970 self.history = self.worker.build_history
971
972 self.addCleanup(self.assertFinalState)
973 self.addCleanup(self.shutdown)
974
975 def setup_config(self):
976 """Per test config object. Override to set different config."""
977 self.config = ConfigParser.ConfigParser()
978 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
979
980 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700981 # Make sure that git.Repo objects have been garbage collected.
982 repos = []
983 gc.collect()
984 for obj in gc.get_objects():
985 if isinstance(obj, git.Repo):
986 repos.append(obj)
987 self.assertEqual(len(repos), 0)
988 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -0800989 for pipeline in self.sched.layout.pipelines.values():
990 if isinstance(pipeline.manager,
991 zuul.scheduler.IndependentPipelineManager):
992 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -0700993
994 def shutdown(self):
995 self.log.debug("Shutting down after tests")
996 self.launcher.stop()
997 self.merge_server.stop()
998 self.merge_server.join()
999 self.merge_client.stop()
1000 self.worker.shutdown()
1001 self.gerrit.stop()
1002 self.timer.stop()
1003 self.sched.stop()
1004 self.sched.join()
1005 self.statsd.stop()
1006 self.statsd.join()
1007 self.webapp.stop()
1008 self.webapp.join()
1009 self.rpc.stop()
1010 self.rpc.join()
1011 self.gearman_server.shutdown()
1012 threads = threading.enumerate()
1013 if len(threads) > 1:
1014 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001015
1016 def init_repo(self, project):
1017 parts = project.split('/')
1018 path = os.path.join(self.upstream_root, *parts[:-1])
1019 if not os.path.exists(path):
1020 os.makedirs(path)
1021 path = os.path.join(self.upstream_root, project)
1022 repo = git.Repo.init(path)
1023
1024 repo.config_writer().set_value('user', 'email', 'user@example.com')
1025 repo.config_writer().set_value('user', 'name', 'User Name')
1026 repo.config_writer().write()
1027
1028 fn = os.path.join(path, 'README')
1029 f = open(fn, 'w')
1030 f.write("test\n")
1031 f.close()
1032 repo.index.add([fn])
1033 repo.index.commit('initial commit')
1034 master = repo.create_head('master')
1035 repo.create_tag('init')
1036
James E. Blair97d902e2014-08-21 13:25:56 -07001037 repo.head.reference = master
1038 repo.head.reset(index=True, working_tree=True)
1039 repo.git.clean('-x', '-f', '-d')
1040
1041 self.create_branch(project, 'mp')
1042
1043 def create_branch(self, project, branch):
1044 path = os.path.join(self.upstream_root, project)
1045 repo = git.Repo.init(path)
1046 fn = os.path.join(path, 'README')
1047
1048 branch_head = repo.create_head(branch)
1049 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001050 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001051 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001052 f.close()
1053 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001054 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001055
James E. Blair97d902e2014-08-21 13:25:56 -07001056 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001057 repo.head.reset(index=True, working_tree=True)
1058 repo.git.clean('-x', '-f', '-d')
1059
1060 def ref_has_change(self, ref, change):
1061 path = os.path.join(self.git_root, change.project)
1062 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001063 try:
1064 for commit in repo.iter_commits(ref):
1065 if commit.message.strip() == ('%s-1' % change.subject):
1066 return True
1067 except GitCommandError:
1068 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001069 return False
1070
1071 def job_has_changes(self, *args):
1072 job = args[0]
1073 commits = args[1:]
1074 if isinstance(job, FakeBuild):
1075 parameters = job.parameters
1076 else:
1077 parameters = json.loads(job.arguments)
1078 project = parameters['ZUUL_PROJECT']
1079 path = os.path.join(self.git_root, project)
1080 repo = git.Repo(path)
1081 ref = parameters['ZUUL_REF']
1082 sha = parameters['ZUUL_COMMIT']
1083 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1084 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1085 commit_messages = ['%s-1' % commit.subject for commit in commits]
1086 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1087 " repo_messages %s; sha %s" % (job, commit_messages,
1088 repo_messages, sha))
1089 for msg in commit_messages:
1090 if msg not in repo_messages:
1091 self.log.debug(" messages do not match")
1092 return False
1093 if repo_shas[0] != sha:
1094 self.log.debug(" sha does not match")
1095 return False
1096 self.log.debug(" OK")
1097 return True
1098
1099 def registerJobs(self):
1100 count = 0
1101 for job in self.sched.layout.jobs.keys():
1102 self.worker.registerFunction('build:' + job)
1103 count += 1
1104 self.worker.registerFunction('stop:' + self.worker.worker_id)
1105 count += 1
1106
1107 while len(self.gearman_server.functions) < count:
1108 time.sleep(0)
1109
1110 def release(self, job):
1111 if isinstance(job, FakeBuild):
1112 job.release()
1113 else:
1114 job.waiting = False
1115 self.log.debug("Queued job %s released" % job.unique)
1116 self.gearman_server.wakeConnections()
1117
1118 def getParameter(self, job, name):
1119 if isinstance(job, FakeBuild):
1120 return job.parameters[name]
1121 else:
1122 parameters = json.loads(job.arguments)
1123 return parameters[name]
1124
1125 def resetGearmanServer(self):
1126 self.worker.setFunctions([])
1127 while True:
1128 done = True
1129 for connection in self.gearman_server.active_connections:
1130 if (connection.functions and
1131 connection.client_id not in ['Zuul RPC Listener',
1132 'Zuul Merger']):
1133 done = False
1134 if done:
1135 break
1136 time.sleep(0)
1137 self.gearman_server.functions = set()
1138 self.rpc.register()
1139 self.merge_server.register()
1140
1141 def haveAllBuildsReported(self):
1142 # See if Zuul is waiting on a meta job to complete
1143 if self.launcher.meta_jobs:
1144 return False
1145 # Find out if every build that the worker has completed has been
1146 # reported back to Zuul. If it hasn't then that means a Gearman
1147 # event is still in transit and the system is not stable.
1148 for build in self.worker.build_history:
1149 zbuild = self.launcher.builds.get(build.uuid)
1150 if not zbuild:
1151 # It has already been reported
1152 continue
1153 # It hasn't been reported yet.
1154 return False
1155 # Make sure that none of the worker connections are in GRAB_WAIT
1156 for connection in self.worker.active_connections:
1157 if connection.state == 'GRAB_WAIT':
1158 return False
1159 return True
1160
1161 def areAllBuildsWaiting(self):
1162 ret = True
1163
1164 builds = self.launcher.builds.values()
1165 for build in builds:
1166 client_job = None
1167 for conn in self.launcher.gearman.active_connections:
1168 for j in conn.related_jobs.values():
1169 if j.unique == build.uuid:
1170 client_job = j
1171 break
1172 if not client_job:
1173 self.log.debug("%s is not known to the gearman client" %
1174 build)
1175 ret = False
1176 continue
1177 if not client_job.handle:
1178 self.log.debug("%s has no handle" % client_job)
1179 ret = False
1180 continue
1181 server_job = self.gearman_server.jobs.get(client_job.handle)
1182 if not server_job:
1183 self.log.debug("%s is not known to the gearman server" %
1184 client_job)
1185 ret = False
1186 continue
1187 if not hasattr(server_job, 'waiting'):
1188 self.log.debug("%s is being enqueued" % server_job)
1189 ret = False
1190 continue
1191 if server_job.waiting:
1192 continue
1193 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1194 if worker_job:
1195 if worker_job.build.isWaiting():
1196 continue
1197 else:
1198 self.log.debug("%s is running" % worker_job)
1199 ret = False
1200 else:
1201 self.log.debug("%s is unassigned" % server_job)
1202 ret = False
1203 return ret
1204
1205 def waitUntilSettled(self):
1206 self.log.debug("Waiting until settled...")
1207 start = time.time()
1208 while True:
1209 if time.time() - start > 10:
1210 print 'queue status:',
1211 print self.sched.trigger_event_queue.empty(),
1212 print self.sched.result_event_queue.empty(),
1213 print self.fake_gerrit.event_queue.empty(),
1214 print self.areAllBuildsWaiting()
1215 raise Exception("Timeout waiting for Zuul to settle")
1216 # Make sure no new events show up while we're checking
1217 self.worker.lock.acquire()
1218 # have all build states propogated to zuul?
1219 if self.haveAllBuildsReported():
1220 # Join ensures that the queue is empty _and_ events have been
1221 # processed
1222 self.fake_gerrit.event_queue.join()
1223 self.sched.trigger_event_queue.join()
1224 self.sched.result_event_queue.join()
1225 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001226 if (not self.merge_client.build_sets and
1227 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001228 self.sched.result_event_queue.empty() and
1229 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001230 self.haveAllBuildsReported() and
1231 self.areAllBuildsWaiting()):
1232 self.sched.run_handler_lock.release()
1233 self.worker.lock.release()
1234 self.log.debug("...settled.")
1235 return
1236 self.sched.run_handler_lock.release()
1237 self.worker.lock.release()
1238 self.sched.wake_event.wait(0.1)
1239
1240 def countJobResults(self, jobs, result):
1241 jobs = filter(lambda x: x.result == result, jobs)
1242 return len(jobs)
1243
1244 def getJobFromHistory(self, name):
1245 history = self.worker.build_history
1246 for job in history:
1247 if job.name == name:
1248 return job
1249 raise Exception("Unable to find job %s in history" % name)
1250
1251 def assertEmptyQueues(self):
1252 # Make sure there are no orphaned jobs
1253 for pipeline in self.sched.layout.pipelines.values():
1254 for queue in pipeline.queues:
1255 if len(queue.queue) != 0:
1256 print 'pipeline %s queue %s contents %s' % (
1257 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001258 self.assertEqual(len(queue.queue), 0,
1259 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001260
1261 def assertReportedStat(self, key, value=None, kind=None):
1262 start = time.time()
1263 while time.time() < (start + 5):
1264 for stat in self.statsd.stats:
1265 pprint.pprint(self.statsd.stats)
1266 k, v = stat.split(':')
1267 if key == k:
1268 if value is None and kind is None:
1269 return
1270 elif value:
1271 if value == v:
1272 return
1273 elif kind:
1274 if v.endswith('|' + kind):
1275 return
1276 time.sleep(0.1)
1277
1278 pprint.pprint(self.statsd.stats)
1279 raise Exception("Key %s not found in reported stats" % key)