blob: 773f92653a43e0d138f6116b843989a26d39a3fb [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
381 def __init__(self, *args, **kw):
382 self.event_queue = Queue.Queue()
383 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
384 self.change_number = 0
385 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700386 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700387
388 def addFakeChange(self, project, branch, subject, status='NEW'):
389 self.change_number += 1
390 c = FakeChange(self, self.change_number, project, branch, subject,
391 upstream_root=self.upstream_root,
392 status=status)
393 self.changes[self.change_number] = c
394 return c
395
396 def addEvent(self, data):
397 return self.event_queue.put(data)
398
399 def getEvent(self):
400 return self.event_queue.get()
401
402 def eventDone(self):
403 self.event_queue.task_done()
404
405 def review(self, project, changeid, message, action):
406 number, ps = changeid.split(',')
407 change = self.changes[int(number)]
408 change.messages.append(message)
409 if 'submit' in action:
410 change.setMerged()
411 if message:
412 change.setReported()
413
414 def query(self, number):
415 change = self.changes.get(int(number))
416 if change:
417 return change.query()
418 return {}
419
James E. Blairc494d542014-08-06 09:23:52 -0700420 def simpleQuery(self, query):
James E. Blairf8ff9932014-08-15 15:24:24 -0700421 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800422 if query.startswith('change:'):
423 # Query a specific changeid
424 changeid = query[len('change:'):]
425 l = [change.query() for change in self.changes.values()
426 if change.data['id'] == changeid]
427 else:
428 # Query all open changes
429 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700430 return l
James E. Blairc494d542014-08-06 09:23:52 -0700431
Clark Boylanb640e052014-04-03 16:41:46 -0700432 def startWatching(self, *args, **kw):
433 pass
434
435
436class BuildHistory(object):
437 def __init__(self, **kw):
438 self.__dict__.update(kw)
439
440 def __repr__(self):
441 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
442 (self.result, self.name, self.number, self.changes))
443
444
445class FakeURLOpener(object):
446 def __init__(self, upstream_root, fake_gerrit, url):
447 self.upstream_root = upstream_root
448 self.fake_gerrit = fake_gerrit
449 self.url = url
450
451 def read(self):
452 res = urlparse.urlparse(self.url)
453 path = res.path
454 project = '/'.join(path.split('/')[2:-2])
455 ret = '001e# service=git-upload-pack\n'
456 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
457 'multi_ack thin-pack side-band side-band-64k ofs-delta '
458 'shallow no-progress include-tag multi_ack_detailed no-done\n')
459 path = os.path.join(self.upstream_root, project)
460 repo = git.Repo(path)
461 for ref in repo.refs:
462 r = ref.object.hexsha + ' ' + ref.path + '\n'
463 ret += '%04x%s' % (len(r) + 4, r)
464 ret += '0000'
465 return ret
466
467
468class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
469 name = 'gerrit'
470
471 def __init__(self, upstream_root, *args):
472 super(FakeGerritTrigger, self).__init__(*args)
473 self.upstream_root = upstream_root
474
475 def getGitUrl(self, project):
476 return os.path.join(self.upstream_root, project.name)
477
478
479class FakeStatsd(threading.Thread):
480 def __init__(self):
481 threading.Thread.__init__(self)
482 self.daemon = True
483 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
484 self.sock.bind(('', 0))
485 self.port = self.sock.getsockname()[1]
486 self.wake_read, self.wake_write = os.pipe()
487 self.stats = []
488
489 def run(self):
490 while True:
491 poll = select.poll()
492 poll.register(self.sock, select.POLLIN)
493 poll.register(self.wake_read, select.POLLIN)
494 ret = poll.poll()
495 for (fd, event) in ret:
496 if fd == self.sock.fileno():
497 data = self.sock.recvfrom(1024)
498 if not data:
499 return
500 self.stats.append(data[0])
501 if fd == self.wake_read:
502 return
503
504 def stop(self):
505 os.write(self.wake_write, '1\n')
506
507
508class FakeBuild(threading.Thread):
509 log = logging.getLogger("zuul.test")
510
511 def __init__(self, worker, job, number, node):
512 threading.Thread.__init__(self)
513 self.daemon = True
514 self.worker = worker
515 self.job = job
516 self.name = job.name.split(':')[1]
517 self.number = number
518 self.node = node
519 self.parameters = json.loads(job.arguments)
520 self.unique = self.parameters['ZUUL_UUID']
521 self.wait_condition = threading.Condition()
522 self.waiting = False
523 self.aborted = False
524 self.created = time.time()
525 self.description = ''
526 self.run_error = False
527
528 def release(self):
529 self.wait_condition.acquire()
530 self.wait_condition.notify()
531 self.waiting = False
532 self.log.debug("Build %s released" % self.unique)
533 self.wait_condition.release()
534
535 def isWaiting(self):
536 self.wait_condition.acquire()
537 if self.waiting:
538 ret = True
539 else:
540 ret = False
541 self.wait_condition.release()
542 return ret
543
544 def _wait(self):
545 self.wait_condition.acquire()
546 self.waiting = True
547 self.log.debug("Build %s waiting" % self.unique)
548 self.wait_condition.wait()
549 self.wait_condition.release()
550
551 def run(self):
552 data = {
553 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
554 'name': self.name,
555 'number': self.number,
556 'manager': self.worker.worker_id,
557 'worker_name': 'My Worker',
558 'worker_hostname': 'localhost',
559 'worker_ips': ['127.0.0.1', '192.168.1.1'],
560 'worker_fqdn': 'zuul.example.org',
561 'worker_program': 'FakeBuilder',
562 'worker_version': 'v1.1',
563 'worker_extra': {'something': 'else'}
564 }
565
566 self.log.debug('Running build %s' % self.unique)
567
568 self.job.sendWorkData(json.dumps(data))
569 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
570 self.job.sendWorkStatus(0, 100)
571
572 if self.worker.hold_jobs_in_build:
573 self.log.debug('Holding build %s' % self.unique)
574 self._wait()
575 self.log.debug("Build %s continuing" % self.unique)
576
577 self.worker.lock.acquire()
578
579 result = 'SUCCESS'
580 if (('ZUUL_REF' in self.parameters) and
581 self.worker.shouldFailTest(self.name,
582 self.parameters['ZUUL_REF'])):
583 result = 'FAILURE'
584 if self.aborted:
585 result = 'ABORTED'
586
587 if self.run_error:
588 work_fail = True
589 result = 'RUN_ERROR'
590 else:
591 data['result'] = result
592 work_fail = False
593
594 changes = None
595 if 'ZUUL_CHANGE_IDS' in self.parameters:
596 changes = self.parameters['ZUUL_CHANGE_IDS']
597
598 self.worker.build_history.append(
599 BuildHistory(name=self.name, number=self.number,
600 result=result, changes=changes, node=self.node,
601 uuid=self.unique, description=self.description,
602 pipeline=self.parameters['ZUUL_PIPELINE'])
603 )
604
605 self.job.sendWorkData(json.dumps(data))
606 if work_fail:
607 self.job.sendWorkFail()
608 else:
609 self.job.sendWorkComplete(json.dumps(data))
610 del self.worker.gearman_jobs[self.job.unique]
611 self.worker.running_builds.remove(self)
612 self.worker.lock.release()
613
614
615class FakeWorker(gear.Worker):
616 def __init__(self, worker_id, test):
617 super(FakeWorker, self).__init__(worker_id)
618 self.gearman_jobs = {}
619 self.build_history = []
620 self.running_builds = []
621 self.build_counter = 0
622 self.fail_tests = {}
623 self.test = test
624
625 self.hold_jobs_in_build = False
626 self.lock = threading.Lock()
627 self.__work_thread = threading.Thread(target=self.work)
628 self.__work_thread.daemon = True
629 self.__work_thread.start()
630
631 def handleJob(self, job):
632 parts = job.name.split(":")
633 cmd = parts[0]
634 name = parts[1]
635 if len(parts) > 2:
636 node = parts[2]
637 else:
638 node = None
639 if cmd == 'build':
640 self.handleBuild(job, name, node)
641 elif cmd == 'stop':
642 self.handleStop(job, name)
643 elif cmd == 'set_description':
644 self.handleSetDescription(job, name)
645
646 def handleBuild(self, job, name, node):
647 build = FakeBuild(self, job, self.build_counter, node)
648 job.build = build
649 self.gearman_jobs[job.unique] = job
650 self.build_counter += 1
651
652 self.running_builds.append(build)
653 build.start()
654
655 def handleStop(self, job, name):
656 self.log.debug("handle stop")
657 parameters = json.loads(job.arguments)
658 name = parameters['name']
659 number = parameters['number']
660 for build in self.running_builds:
661 if build.name == name and build.number == number:
662 build.aborted = True
663 build.release()
664 job.sendWorkComplete()
665 return
666 job.sendWorkFail()
667
668 def handleSetDescription(self, job, name):
669 self.log.debug("handle set description")
670 parameters = json.loads(job.arguments)
671 name = parameters['name']
672 number = parameters['number']
673 descr = parameters['html_description']
674 for build in self.running_builds:
675 if build.name == name and build.number == number:
676 build.description = descr
677 job.sendWorkComplete()
678 return
679 for build in self.build_history:
680 if build.name == name and build.number == number:
681 build.description = descr
682 job.sendWorkComplete()
683 return
684 job.sendWorkFail()
685
686 def work(self):
687 while self.running:
688 try:
689 job = self.getJob()
690 except gear.InterruptedError:
691 continue
692 try:
693 self.handleJob(job)
694 except:
695 self.log.exception("Worker exception:")
696
697 def addFailTest(self, name, change):
698 l = self.fail_tests.get(name, [])
699 l.append(change)
700 self.fail_tests[name] = l
701
702 def shouldFailTest(self, name, ref):
703 l = self.fail_tests.get(name, [])
704 for change in l:
705 if self.test.ref_has_change(ref, change):
706 return True
707 return False
708
709 def release(self, regex=None):
710 builds = self.running_builds[:]
711 self.log.debug("releasing build %s (%s)" % (regex,
712 len(self.running_builds)))
713 for build in builds:
714 if not regex or re.match(regex, build.name):
715 self.log.debug("releasing build %s" %
716 (build.parameters['ZUUL_UUID']))
717 build.release()
718 else:
719 self.log.debug("not releasing build %s" %
720 (build.parameters['ZUUL_UUID']))
721 self.log.debug("done releasing builds %s (%s)" %
722 (regex, len(self.running_builds)))
723
724
725class FakeGearmanServer(gear.Server):
726 def __init__(self):
727 self.hold_jobs_in_queue = False
728 super(FakeGearmanServer, self).__init__(0)
729
730 def getJobForConnection(self, connection, peek=False):
731 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
732 for job in queue:
733 if not hasattr(job, 'waiting'):
734 if job.name.startswith('build:'):
735 job.waiting = self.hold_jobs_in_queue
736 else:
737 job.waiting = False
738 if job.waiting:
739 continue
740 if job.name in connection.functions:
741 if not peek:
742 queue.remove(job)
743 connection.related_jobs[job.handle] = job
744 job.worker_connection = connection
745 job.running = True
746 return job
747 return None
748
749 def release(self, regex=None):
750 released = False
751 qlen = (len(self.high_queue) + len(self.normal_queue) +
752 len(self.low_queue))
753 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
754 for job in self.getQueue():
755 cmd, name = job.name.split(':')
756 if cmd != 'build':
757 continue
758 if not regex or re.match(regex, name):
759 self.log.debug("releasing queued job %s" %
760 job.unique)
761 job.waiting = False
762 released = True
763 else:
764 self.log.debug("not releasing queued job %s" %
765 job.unique)
766 if released:
767 self.wakeConnections()
768 qlen = (len(self.high_queue) + len(self.normal_queue) +
769 len(self.low_queue))
770 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
771
772
773class FakeSMTP(object):
774 log = logging.getLogger('zuul.FakeSMTP')
775
776 def __init__(self, messages, server, port):
777 self.server = server
778 self.port = port
779 self.messages = messages
780
781 def sendmail(self, from_email, to_email, msg):
782 self.log.info("Sending email from %s, to %s, with msg %s" % (
783 from_email, to_email, msg))
784
785 headers = msg.split('\n\n', 1)[0]
786 body = msg.split('\n\n', 1)[1]
787
788 self.messages.append(dict(
789 from_email=from_email,
790 to_email=to_email,
791 msg=msg,
792 headers=headers,
793 body=body,
794 ))
795
796 return True
797
798 def quit(self):
799 return True
800
801
802class FakeSwiftClientConnection(swiftclient.client.Connection):
803 def post_account(self, headers):
804 # Do nothing
805 pass
806
807 def get_auth(self):
808 # Returns endpoint and (unused) auth token
809 endpoint = os.path.join('https://storage.example.org', 'V1',
810 'AUTH_account')
811 return endpoint, ''
812
813
814class ZuulTestCase(testtools.TestCase):
815 log = logging.getLogger("zuul.test")
816
817 def setUp(self):
818 super(ZuulTestCase, self).setUp()
819 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
820 try:
821 test_timeout = int(test_timeout)
822 except ValueError:
823 # If timeout value is invalid do not set a timeout.
824 test_timeout = 0
825 if test_timeout > 0:
826 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
827
828 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
829 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
830 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
831 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
832 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
833 os.environ.get('OS_STDERR_CAPTURE') == '1'):
834 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
835 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
836 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
837 os.environ.get('OS_LOG_CAPTURE') == '1'):
838 self.useFixture(fixtures.FakeLogger(
839 level=logging.DEBUG,
840 format='%(asctime)s %(name)-32s '
841 '%(levelname)-8s %(message)s'))
James E. Blair97d902e2014-08-21 13:25:56 -0700842 if USE_TEMPDIR:
843 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000844 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
845 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700846 else:
847 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700848 self.test_root = os.path.join(tmp_root, "zuul-test")
849 self.upstream_root = os.path.join(self.test_root, "upstream")
850 self.git_root = os.path.join(self.test_root, "git")
851
852 if os.path.exists(self.test_root):
853 shutil.rmtree(self.test_root)
854 os.makedirs(self.test_root)
855 os.makedirs(self.upstream_root)
856 os.makedirs(self.git_root)
857
858 # Make per test copy of Configuration.
859 self.setup_config()
860 self.config.set('zuul', 'layout_config',
861 os.path.join(FIXTURE_DIR, "layout.yaml"))
862 self.config.set('merger', 'git_dir', self.git_root)
863
864 # For each project in config:
865 self.init_repo("org/project")
866 self.init_repo("org/project1")
867 self.init_repo("org/project2")
868 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700869 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700870 self.init_repo("org/project5")
871 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700872 self.init_repo("org/one-job-project")
873 self.init_repo("org/nonvoting-project")
874 self.init_repo("org/templated-project")
875 self.init_repo("org/layered-project")
876 self.init_repo("org/node-project")
877 self.init_repo("org/conflict-project")
878 self.init_repo("org/noop-project")
879 self.init_repo("org/experimental-project")
880
881 self.statsd = FakeStatsd()
882 os.environ['STATSD_HOST'] = 'localhost'
883 os.environ['STATSD_PORT'] = str(self.statsd.port)
884 self.statsd.start()
885 # the statsd client object is configured in the statsd module import
886 reload(statsd)
887 reload(zuul.scheduler)
888
889 self.gearman_server = FakeGearmanServer()
890
891 self.config.set('gearman', 'port', str(self.gearman_server.port))
892
893 self.worker = FakeWorker('fake_worker', self)
894 self.worker.addServer('127.0.0.1', self.gearman_server.port)
895 self.gearman_server.worker = self.worker
896
897 self.merge_server = zuul.merger.server.MergeServer(self.config)
898 self.merge_server.start()
899
900 self.sched = zuul.scheduler.Scheduler()
901
902 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
903 FakeSwiftClientConnection))
904 self.swift = zuul.lib.swift.Swift(self.config)
905
906 def URLOpenerFactory(*args, **kw):
907 if isinstance(args[0], urllib2.Request):
908 return old_urlopen(*args, **kw)
909 args = [self.fake_gerrit] + list(args)
910 return FakeURLOpener(self.upstream_root, *args, **kw)
911
912 old_urlopen = urllib2.urlopen
913 urllib2.urlopen = URLOpenerFactory
914
915 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
916 self.swift)
917 self.merge_client = zuul.merger.client.MergeClient(
918 self.config, self.sched)
919
920 self.smtp_messages = []
921
922 def FakeSMTPFactory(*args, **kw):
923 args = [self.smtp_messages] + list(args)
924 return FakeSMTP(*args, **kw)
925
926 zuul.lib.gerrit.Gerrit = FakeGerrit
927 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
928
929 self.gerrit = FakeGerritTrigger(
930 self.upstream_root, self.config, self.sched)
931 self.gerrit.replication_timeout = 1.5
932 self.gerrit.replication_retry_interval = 0.5
933 self.fake_gerrit = self.gerrit.gerrit
934 self.fake_gerrit.upstream_root = self.upstream_root
935
936 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
937 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
938
939 self.sched.setLauncher(self.launcher)
940 self.sched.setMerger(self.merge_client)
941 self.sched.registerTrigger(self.gerrit)
942 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
943 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000944 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
945 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700946 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700947
948 self.sched.registerReporter(
949 zuul.reporter.gerrit.Reporter(self.gerrit))
950 self.smtp_reporter = zuul.reporter.smtp.Reporter(
951 self.config.get('smtp', 'default_from'),
952 self.config.get('smtp', 'default_to'),
953 self.config.get('smtp', 'server'))
954 self.sched.registerReporter(self.smtp_reporter)
955
956 self.sched.start()
957 self.sched.reconfigure(self.config)
958 self.sched.resume()
959 self.webapp.start()
960 self.rpc.start()
961 self.launcher.gearman.waitForServer()
962 self.registerJobs()
963 self.builds = self.worker.running_builds
964 self.history = self.worker.build_history
965
966 self.addCleanup(self.assertFinalState)
967 self.addCleanup(self.shutdown)
968
969 def setup_config(self):
970 """Per test config object. Override to set different config."""
971 self.config = ConfigParser.ConfigParser()
972 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
973
974 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700975 # Make sure that git.Repo objects have been garbage collected.
976 repos = []
977 gc.collect()
978 for obj in gc.get_objects():
979 if isinstance(obj, git.Repo):
980 repos.append(obj)
981 self.assertEqual(len(repos), 0)
982 self.assertEmptyQueues()
983
984 def shutdown(self):
985 self.log.debug("Shutting down after tests")
986 self.launcher.stop()
987 self.merge_server.stop()
988 self.merge_server.join()
989 self.merge_client.stop()
990 self.worker.shutdown()
991 self.gerrit.stop()
992 self.timer.stop()
993 self.sched.stop()
994 self.sched.join()
995 self.statsd.stop()
996 self.statsd.join()
997 self.webapp.stop()
998 self.webapp.join()
999 self.rpc.stop()
1000 self.rpc.join()
1001 self.gearman_server.shutdown()
1002 threads = threading.enumerate()
1003 if len(threads) > 1:
1004 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001005
1006 def init_repo(self, project):
1007 parts = project.split('/')
1008 path = os.path.join(self.upstream_root, *parts[:-1])
1009 if not os.path.exists(path):
1010 os.makedirs(path)
1011 path = os.path.join(self.upstream_root, project)
1012 repo = git.Repo.init(path)
1013
1014 repo.config_writer().set_value('user', 'email', 'user@example.com')
1015 repo.config_writer().set_value('user', 'name', 'User Name')
1016 repo.config_writer().write()
1017
1018 fn = os.path.join(path, 'README')
1019 f = open(fn, 'w')
1020 f.write("test\n")
1021 f.close()
1022 repo.index.add([fn])
1023 repo.index.commit('initial commit')
1024 master = repo.create_head('master')
1025 repo.create_tag('init')
1026
James E. Blair97d902e2014-08-21 13:25:56 -07001027 repo.head.reference = master
1028 repo.head.reset(index=True, working_tree=True)
1029 repo.git.clean('-x', '-f', '-d')
1030
1031 self.create_branch(project, 'mp')
1032
1033 def create_branch(self, project, branch):
1034 path = os.path.join(self.upstream_root, project)
1035 repo = git.Repo.init(path)
1036 fn = os.path.join(path, 'README')
1037
1038 branch_head = repo.create_head(branch)
1039 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001040 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001041 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001042 f.close()
1043 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001044 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001045
James E. Blair97d902e2014-08-21 13:25:56 -07001046 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001047 repo.head.reset(index=True, working_tree=True)
1048 repo.git.clean('-x', '-f', '-d')
1049
1050 def ref_has_change(self, ref, change):
1051 path = os.path.join(self.git_root, change.project)
1052 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001053 try:
1054 for commit in repo.iter_commits(ref):
1055 if commit.message.strip() == ('%s-1' % change.subject):
1056 return True
1057 except GitCommandError:
1058 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001059 return False
1060
1061 def job_has_changes(self, *args):
1062 job = args[0]
1063 commits = args[1:]
1064 if isinstance(job, FakeBuild):
1065 parameters = job.parameters
1066 else:
1067 parameters = json.loads(job.arguments)
1068 project = parameters['ZUUL_PROJECT']
1069 path = os.path.join(self.git_root, project)
1070 repo = git.Repo(path)
1071 ref = parameters['ZUUL_REF']
1072 sha = parameters['ZUUL_COMMIT']
1073 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1074 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1075 commit_messages = ['%s-1' % commit.subject for commit in commits]
1076 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1077 " repo_messages %s; sha %s" % (job, commit_messages,
1078 repo_messages, sha))
1079 for msg in commit_messages:
1080 if msg not in repo_messages:
1081 self.log.debug(" messages do not match")
1082 return False
1083 if repo_shas[0] != sha:
1084 self.log.debug(" sha does not match")
1085 return False
1086 self.log.debug(" OK")
1087 return True
1088
1089 def registerJobs(self):
1090 count = 0
1091 for job in self.sched.layout.jobs.keys():
1092 self.worker.registerFunction('build:' + job)
1093 count += 1
1094 self.worker.registerFunction('stop:' + self.worker.worker_id)
1095 count += 1
1096
1097 while len(self.gearman_server.functions) < count:
1098 time.sleep(0)
1099
1100 def release(self, job):
1101 if isinstance(job, FakeBuild):
1102 job.release()
1103 else:
1104 job.waiting = False
1105 self.log.debug("Queued job %s released" % job.unique)
1106 self.gearman_server.wakeConnections()
1107
1108 def getParameter(self, job, name):
1109 if isinstance(job, FakeBuild):
1110 return job.parameters[name]
1111 else:
1112 parameters = json.loads(job.arguments)
1113 return parameters[name]
1114
1115 def resetGearmanServer(self):
1116 self.worker.setFunctions([])
1117 while True:
1118 done = True
1119 for connection in self.gearman_server.active_connections:
1120 if (connection.functions and
1121 connection.client_id not in ['Zuul RPC Listener',
1122 'Zuul Merger']):
1123 done = False
1124 if done:
1125 break
1126 time.sleep(0)
1127 self.gearman_server.functions = set()
1128 self.rpc.register()
1129 self.merge_server.register()
1130
1131 def haveAllBuildsReported(self):
1132 # See if Zuul is waiting on a meta job to complete
1133 if self.launcher.meta_jobs:
1134 return False
1135 # Find out if every build that the worker has completed has been
1136 # reported back to Zuul. If it hasn't then that means a Gearman
1137 # event is still in transit and the system is not stable.
1138 for build in self.worker.build_history:
1139 zbuild = self.launcher.builds.get(build.uuid)
1140 if not zbuild:
1141 # It has already been reported
1142 continue
1143 # It hasn't been reported yet.
1144 return False
1145 # Make sure that none of the worker connections are in GRAB_WAIT
1146 for connection in self.worker.active_connections:
1147 if connection.state == 'GRAB_WAIT':
1148 return False
1149 return True
1150
1151 def areAllBuildsWaiting(self):
1152 ret = True
1153
1154 builds = self.launcher.builds.values()
1155 for build in builds:
1156 client_job = None
1157 for conn in self.launcher.gearman.active_connections:
1158 for j in conn.related_jobs.values():
1159 if j.unique == build.uuid:
1160 client_job = j
1161 break
1162 if not client_job:
1163 self.log.debug("%s is not known to the gearman client" %
1164 build)
1165 ret = False
1166 continue
1167 if not client_job.handle:
1168 self.log.debug("%s has no handle" % client_job)
1169 ret = False
1170 continue
1171 server_job = self.gearman_server.jobs.get(client_job.handle)
1172 if not server_job:
1173 self.log.debug("%s is not known to the gearman server" %
1174 client_job)
1175 ret = False
1176 continue
1177 if not hasattr(server_job, 'waiting'):
1178 self.log.debug("%s is being enqueued" % server_job)
1179 ret = False
1180 continue
1181 if server_job.waiting:
1182 continue
1183 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1184 if worker_job:
1185 if worker_job.build.isWaiting():
1186 continue
1187 else:
1188 self.log.debug("%s is running" % worker_job)
1189 ret = False
1190 else:
1191 self.log.debug("%s is unassigned" % server_job)
1192 ret = False
1193 return ret
1194
1195 def waitUntilSettled(self):
1196 self.log.debug("Waiting until settled...")
1197 start = time.time()
1198 while True:
1199 if time.time() - start > 10:
1200 print 'queue status:',
1201 print self.sched.trigger_event_queue.empty(),
1202 print self.sched.result_event_queue.empty(),
1203 print self.fake_gerrit.event_queue.empty(),
1204 print self.areAllBuildsWaiting()
1205 raise Exception("Timeout waiting for Zuul to settle")
1206 # Make sure no new events show up while we're checking
1207 self.worker.lock.acquire()
1208 # have all build states propogated to zuul?
1209 if self.haveAllBuildsReported():
1210 # Join ensures that the queue is empty _and_ events have been
1211 # processed
1212 self.fake_gerrit.event_queue.join()
1213 self.sched.trigger_event_queue.join()
1214 self.sched.result_event_queue.join()
1215 self.sched.run_handler_lock.acquire()
1216 if (self.sched.trigger_event_queue.empty() and
1217 self.sched.result_event_queue.empty() and
1218 self.fake_gerrit.event_queue.empty() and
1219 not self.merge_client.build_sets and
1220 self.haveAllBuildsReported() and
1221 self.areAllBuildsWaiting()):
1222 self.sched.run_handler_lock.release()
1223 self.worker.lock.release()
1224 self.log.debug("...settled.")
1225 return
1226 self.sched.run_handler_lock.release()
1227 self.worker.lock.release()
1228 self.sched.wake_event.wait(0.1)
1229
1230 def countJobResults(self, jobs, result):
1231 jobs = filter(lambda x: x.result == result, jobs)
1232 return len(jobs)
1233
1234 def getJobFromHistory(self, name):
1235 history = self.worker.build_history
1236 for job in history:
1237 if job.name == name:
1238 return job
1239 raise Exception("Unable to find job %s in history" % name)
1240
1241 def assertEmptyQueues(self):
1242 # Make sure there are no orphaned jobs
1243 for pipeline in self.sched.layout.pipelines.values():
1244 for queue in pipeline.queues:
1245 if len(queue.queue) != 0:
1246 print 'pipeline %s queue %s contents %s' % (
1247 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001248 self.assertEqual(len(queue.queue), 0,
1249 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001250
1251 def assertReportedStat(self, key, value=None, kind=None):
1252 start = time.time()
1253 while time.time() < (start + 5):
1254 for stat in self.statsd.stats:
1255 pprint.pprint(self.statsd.stats)
1256 k, v = stat.split(':')
1257 if key == k:
1258 if value is None and kind is None:
1259 return
1260 elif value:
1261 if value == v:
1262 return
1263 elif kind:
1264 if v.endswith('|' + kind):
1265 return
1266 time.sleep(0.1)
1267
1268 pprint.pprint(self.statsd.stats)
1269 raise Exception("Key %s not found in reported stats" % key)