blob: b872a85810b52e9754f1f912dadc6766b0224851 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
381 def __init__(self, *args, **kw):
382 self.event_queue = Queue.Queue()
383 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
384 self.change_number = 0
385 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700386 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700387
388 def addFakeChange(self, project, branch, subject, status='NEW'):
389 self.change_number += 1
390 c = FakeChange(self, self.change_number, project, branch, subject,
391 upstream_root=self.upstream_root,
392 status=status)
393 self.changes[self.change_number] = c
394 return c
395
396 def addEvent(self, data):
397 return self.event_queue.put(data)
398
399 def getEvent(self):
400 return self.event_queue.get()
401
402 def eventDone(self):
403 self.event_queue.task_done()
404
405 def review(self, project, changeid, message, action):
406 number, ps = changeid.split(',')
407 change = self.changes[int(number)]
408 change.messages.append(message)
409 if 'submit' in action:
410 change.setMerged()
411 if message:
412 change.setReported()
413
414 def query(self, number):
415 change = self.changes.get(int(number))
416 if change:
417 return change.query()
418 return {}
419
James E. Blairc494d542014-08-06 09:23:52 -0700420 def simpleQuery(self, query):
421 # This is currently only used to return all open changes for a
422 # project
James E. Blairf8ff9932014-08-15 15:24:24 -0700423 self.queries.append(query)
424 l = [change.query() for change in self.changes.values()]
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000425 l.append({"type": "stats", "rowCount": 1, "runTimeMilliseconds": 3})
James E. Blairf8ff9932014-08-15 15:24:24 -0700426 return l
James E. Blairc494d542014-08-06 09:23:52 -0700427
Clark Boylanb640e052014-04-03 16:41:46 -0700428 def startWatching(self, *args, **kw):
429 pass
430
431
432class BuildHistory(object):
433 def __init__(self, **kw):
434 self.__dict__.update(kw)
435
436 def __repr__(self):
437 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
438 (self.result, self.name, self.number, self.changes))
439
440
441class FakeURLOpener(object):
442 def __init__(self, upstream_root, fake_gerrit, url):
443 self.upstream_root = upstream_root
444 self.fake_gerrit = fake_gerrit
445 self.url = url
446
447 def read(self):
448 res = urlparse.urlparse(self.url)
449 path = res.path
450 project = '/'.join(path.split('/')[2:-2])
451 ret = '001e# service=git-upload-pack\n'
452 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
453 'multi_ack thin-pack side-band side-band-64k ofs-delta '
454 'shallow no-progress include-tag multi_ack_detailed no-done\n')
455 path = os.path.join(self.upstream_root, project)
456 repo = git.Repo(path)
457 for ref in repo.refs:
458 r = ref.object.hexsha + ' ' + ref.path + '\n'
459 ret += '%04x%s' % (len(r) + 4, r)
460 ret += '0000'
461 return ret
462
463
464class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
465 name = 'gerrit'
466
467 def __init__(self, upstream_root, *args):
468 super(FakeGerritTrigger, self).__init__(*args)
469 self.upstream_root = upstream_root
470
471 def getGitUrl(self, project):
472 return os.path.join(self.upstream_root, project.name)
473
474
475class FakeStatsd(threading.Thread):
476 def __init__(self):
477 threading.Thread.__init__(self)
478 self.daemon = True
479 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
480 self.sock.bind(('', 0))
481 self.port = self.sock.getsockname()[1]
482 self.wake_read, self.wake_write = os.pipe()
483 self.stats = []
484
485 def run(self):
486 while True:
487 poll = select.poll()
488 poll.register(self.sock, select.POLLIN)
489 poll.register(self.wake_read, select.POLLIN)
490 ret = poll.poll()
491 for (fd, event) in ret:
492 if fd == self.sock.fileno():
493 data = self.sock.recvfrom(1024)
494 if not data:
495 return
496 self.stats.append(data[0])
497 if fd == self.wake_read:
498 return
499
500 def stop(self):
501 os.write(self.wake_write, '1\n')
502
503
504class FakeBuild(threading.Thread):
505 log = logging.getLogger("zuul.test")
506
507 def __init__(self, worker, job, number, node):
508 threading.Thread.__init__(self)
509 self.daemon = True
510 self.worker = worker
511 self.job = job
512 self.name = job.name.split(':')[1]
513 self.number = number
514 self.node = node
515 self.parameters = json.loads(job.arguments)
516 self.unique = self.parameters['ZUUL_UUID']
517 self.wait_condition = threading.Condition()
518 self.waiting = False
519 self.aborted = False
520 self.created = time.time()
521 self.description = ''
522 self.run_error = False
523
524 def release(self):
525 self.wait_condition.acquire()
526 self.wait_condition.notify()
527 self.waiting = False
528 self.log.debug("Build %s released" % self.unique)
529 self.wait_condition.release()
530
531 def isWaiting(self):
532 self.wait_condition.acquire()
533 if self.waiting:
534 ret = True
535 else:
536 ret = False
537 self.wait_condition.release()
538 return ret
539
540 def _wait(self):
541 self.wait_condition.acquire()
542 self.waiting = True
543 self.log.debug("Build %s waiting" % self.unique)
544 self.wait_condition.wait()
545 self.wait_condition.release()
546
547 def run(self):
548 data = {
549 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
550 'name': self.name,
551 'number': self.number,
552 'manager': self.worker.worker_id,
553 'worker_name': 'My Worker',
554 'worker_hostname': 'localhost',
555 'worker_ips': ['127.0.0.1', '192.168.1.1'],
556 'worker_fqdn': 'zuul.example.org',
557 'worker_program': 'FakeBuilder',
558 'worker_version': 'v1.1',
559 'worker_extra': {'something': 'else'}
560 }
561
562 self.log.debug('Running build %s' % self.unique)
563
564 self.job.sendWorkData(json.dumps(data))
565 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
566 self.job.sendWorkStatus(0, 100)
567
568 if self.worker.hold_jobs_in_build:
569 self.log.debug('Holding build %s' % self.unique)
570 self._wait()
571 self.log.debug("Build %s continuing" % self.unique)
572
573 self.worker.lock.acquire()
574
575 result = 'SUCCESS'
576 if (('ZUUL_REF' in self.parameters) and
577 self.worker.shouldFailTest(self.name,
578 self.parameters['ZUUL_REF'])):
579 result = 'FAILURE'
580 if self.aborted:
581 result = 'ABORTED'
582
583 if self.run_error:
584 work_fail = True
585 result = 'RUN_ERROR'
586 else:
587 data['result'] = result
588 work_fail = False
589
590 changes = None
591 if 'ZUUL_CHANGE_IDS' in self.parameters:
592 changes = self.parameters['ZUUL_CHANGE_IDS']
593
594 self.worker.build_history.append(
595 BuildHistory(name=self.name, number=self.number,
596 result=result, changes=changes, node=self.node,
597 uuid=self.unique, description=self.description,
598 pipeline=self.parameters['ZUUL_PIPELINE'])
599 )
600
601 self.job.sendWorkData(json.dumps(data))
602 if work_fail:
603 self.job.sendWorkFail()
604 else:
605 self.job.sendWorkComplete(json.dumps(data))
606 del self.worker.gearman_jobs[self.job.unique]
607 self.worker.running_builds.remove(self)
608 self.worker.lock.release()
609
610
611class FakeWorker(gear.Worker):
612 def __init__(self, worker_id, test):
613 super(FakeWorker, self).__init__(worker_id)
614 self.gearman_jobs = {}
615 self.build_history = []
616 self.running_builds = []
617 self.build_counter = 0
618 self.fail_tests = {}
619 self.test = test
620
621 self.hold_jobs_in_build = False
622 self.lock = threading.Lock()
623 self.__work_thread = threading.Thread(target=self.work)
624 self.__work_thread.daemon = True
625 self.__work_thread.start()
626
627 def handleJob(self, job):
628 parts = job.name.split(":")
629 cmd = parts[0]
630 name = parts[1]
631 if len(parts) > 2:
632 node = parts[2]
633 else:
634 node = None
635 if cmd == 'build':
636 self.handleBuild(job, name, node)
637 elif cmd == 'stop':
638 self.handleStop(job, name)
639 elif cmd == 'set_description':
640 self.handleSetDescription(job, name)
641
642 def handleBuild(self, job, name, node):
643 build = FakeBuild(self, job, self.build_counter, node)
644 job.build = build
645 self.gearman_jobs[job.unique] = job
646 self.build_counter += 1
647
648 self.running_builds.append(build)
649 build.start()
650
651 def handleStop(self, job, name):
652 self.log.debug("handle stop")
653 parameters = json.loads(job.arguments)
654 name = parameters['name']
655 number = parameters['number']
656 for build in self.running_builds:
657 if build.name == name and build.number == number:
658 build.aborted = True
659 build.release()
660 job.sendWorkComplete()
661 return
662 job.sendWorkFail()
663
664 def handleSetDescription(self, job, name):
665 self.log.debug("handle set description")
666 parameters = json.loads(job.arguments)
667 name = parameters['name']
668 number = parameters['number']
669 descr = parameters['html_description']
670 for build in self.running_builds:
671 if build.name == name and build.number == number:
672 build.description = descr
673 job.sendWorkComplete()
674 return
675 for build in self.build_history:
676 if build.name == name and build.number == number:
677 build.description = descr
678 job.sendWorkComplete()
679 return
680 job.sendWorkFail()
681
682 def work(self):
683 while self.running:
684 try:
685 job = self.getJob()
686 except gear.InterruptedError:
687 continue
688 try:
689 self.handleJob(job)
690 except:
691 self.log.exception("Worker exception:")
692
693 def addFailTest(self, name, change):
694 l = self.fail_tests.get(name, [])
695 l.append(change)
696 self.fail_tests[name] = l
697
698 def shouldFailTest(self, name, ref):
699 l = self.fail_tests.get(name, [])
700 for change in l:
701 if self.test.ref_has_change(ref, change):
702 return True
703 return False
704
705 def release(self, regex=None):
706 builds = self.running_builds[:]
707 self.log.debug("releasing build %s (%s)" % (regex,
708 len(self.running_builds)))
709 for build in builds:
710 if not regex or re.match(regex, build.name):
711 self.log.debug("releasing build %s" %
712 (build.parameters['ZUUL_UUID']))
713 build.release()
714 else:
715 self.log.debug("not releasing build %s" %
716 (build.parameters['ZUUL_UUID']))
717 self.log.debug("done releasing builds %s (%s)" %
718 (regex, len(self.running_builds)))
719
720
721class FakeGearmanServer(gear.Server):
722 def __init__(self):
723 self.hold_jobs_in_queue = False
724 super(FakeGearmanServer, self).__init__(0)
725
726 def getJobForConnection(self, connection, peek=False):
727 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
728 for job in queue:
729 if not hasattr(job, 'waiting'):
730 if job.name.startswith('build:'):
731 job.waiting = self.hold_jobs_in_queue
732 else:
733 job.waiting = False
734 if job.waiting:
735 continue
736 if job.name in connection.functions:
737 if not peek:
738 queue.remove(job)
739 connection.related_jobs[job.handle] = job
740 job.worker_connection = connection
741 job.running = True
742 return job
743 return None
744
745 def release(self, regex=None):
746 released = False
747 qlen = (len(self.high_queue) + len(self.normal_queue) +
748 len(self.low_queue))
749 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
750 for job in self.getQueue():
751 cmd, name = job.name.split(':')
752 if cmd != 'build':
753 continue
754 if not regex or re.match(regex, name):
755 self.log.debug("releasing queued job %s" %
756 job.unique)
757 job.waiting = False
758 released = True
759 else:
760 self.log.debug("not releasing queued job %s" %
761 job.unique)
762 if released:
763 self.wakeConnections()
764 qlen = (len(self.high_queue) + len(self.normal_queue) +
765 len(self.low_queue))
766 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
767
768
769class FakeSMTP(object):
770 log = logging.getLogger('zuul.FakeSMTP')
771
772 def __init__(self, messages, server, port):
773 self.server = server
774 self.port = port
775 self.messages = messages
776
777 def sendmail(self, from_email, to_email, msg):
778 self.log.info("Sending email from %s, to %s, with msg %s" % (
779 from_email, to_email, msg))
780
781 headers = msg.split('\n\n', 1)[0]
782 body = msg.split('\n\n', 1)[1]
783
784 self.messages.append(dict(
785 from_email=from_email,
786 to_email=to_email,
787 msg=msg,
788 headers=headers,
789 body=body,
790 ))
791
792 return True
793
794 def quit(self):
795 return True
796
797
798class FakeSwiftClientConnection(swiftclient.client.Connection):
799 def post_account(self, headers):
800 # Do nothing
801 pass
802
803 def get_auth(self):
804 # Returns endpoint and (unused) auth token
805 endpoint = os.path.join('https://storage.example.org', 'V1',
806 'AUTH_account')
807 return endpoint, ''
808
809
810class ZuulTestCase(testtools.TestCase):
811 log = logging.getLogger("zuul.test")
812
813 def setUp(self):
814 super(ZuulTestCase, self).setUp()
815 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
816 try:
817 test_timeout = int(test_timeout)
818 except ValueError:
819 # If timeout value is invalid do not set a timeout.
820 test_timeout = 0
821 if test_timeout > 0:
822 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
823
824 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
825 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
826 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
827 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
828 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
829 os.environ.get('OS_STDERR_CAPTURE') == '1'):
830 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
831 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
832 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
833 os.environ.get('OS_LOG_CAPTURE') == '1'):
834 self.useFixture(fixtures.FakeLogger(
835 level=logging.DEBUG,
836 format='%(asctime)s %(name)-32s '
837 '%(levelname)-8s %(message)s'))
James E. Blair97d902e2014-08-21 13:25:56 -0700838 if USE_TEMPDIR:
839 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000840 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
841 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700842 else:
843 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700844 self.test_root = os.path.join(tmp_root, "zuul-test")
845 self.upstream_root = os.path.join(self.test_root, "upstream")
846 self.git_root = os.path.join(self.test_root, "git")
847
848 if os.path.exists(self.test_root):
849 shutil.rmtree(self.test_root)
850 os.makedirs(self.test_root)
851 os.makedirs(self.upstream_root)
852 os.makedirs(self.git_root)
853
854 # Make per test copy of Configuration.
855 self.setup_config()
856 self.config.set('zuul', 'layout_config',
857 os.path.join(FIXTURE_DIR, "layout.yaml"))
858 self.config.set('merger', 'git_dir', self.git_root)
859
860 # For each project in config:
861 self.init_repo("org/project")
862 self.init_repo("org/project1")
863 self.init_repo("org/project2")
864 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700865 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700866 self.init_repo("org/project5")
867 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700868 self.init_repo("org/one-job-project")
869 self.init_repo("org/nonvoting-project")
870 self.init_repo("org/templated-project")
871 self.init_repo("org/layered-project")
872 self.init_repo("org/node-project")
873 self.init_repo("org/conflict-project")
874 self.init_repo("org/noop-project")
875 self.init_repo("org/experimental-project")
876
877 self.statsd = FakeStatsd()
878 os.environ['STATSD_HOST'] = 'localhost'
879 os.environ['STATSD_PORT'] = str(self.statsd.port)
880 self.statsd.start()
881 # the statsd client object is configured in the statsd module import
882 reload(statsd)
883 reload(zuul.scheduler)
884
885 self.gearman_server = FakeGearmanServer()
886
887 self.config.set('gearman', 'port', str(self.gearman_server.port))
888
889 self.worker = FakeWorker('fake_worker', self)
890 self.worker.addServer('127.0.0.1', self.gearman_server.port)
891 self.gearman_server.worker = self.worker
892
893 self.merge_server = zuul.merger.server.MergeServer(self.config)
894 self.merge_server.start()
895
896 self.sched = zuul.scheduler.Scheduler()
897
898 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
899 FakeSwiftClientConnection))
900 self.swift = zuul.lib.swift.Swift(self.config)
901
902 def URLOpenerFactory(*args, **kw):
903 if isinstance(args[0], urllib2.Request):
904 return old_urlopen(*args, **kw)
905 args = [self.fake_gerrit] + list(args)
906 return FakeURLOpener(self.upstream_root, *args, **kw)
907
908 old_urlopen = urllib2.urlopen
909 urllib2.urlopen = URLOpenerFactory
910
911 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
912 self.swift)
913 self.merge_client = zuul.merger.client.MergeClient(
914 self.config, self.sched)
915
916 self.smtp_messages = []
917
918 def FakeSMTPFactory(*args, **kw):
919 args = [self.smtp_messages] + list(args)
920 return FakeSMTP(*args, **kw)
921
922 zuul.lib.gerrit.Gerrit = FakeGerrit
923 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
924
925 self.gerrit = FakeGerritTrigger(
926 self.upstream_root, self.config, self.sched)
927 self.gerrit.replication_timeout = 1.5
928 self.gerrit.replication_retry_interval = 0.5
929 self.fake_gerrit = self.gerrit.gerrit
930 self.fake_gerrit.upstream_root = self.upstream_root
931
932 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
933 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
934
935 self.sched.setLauncher(self.launcher)
936 self.sched.setMerger(self.merge_client)
937 self.sched.registerTrigger(self.gerrit)
938 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
939 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000940 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
941 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700942 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700943
944 self.sched.registerReporter(
945 zuul.reporter.gerrit.Reporter(self.gerrit))
946 self.smtp_reporter = zuul.reporter.smtp.Reporter(
947 self.config.get('smtp', 'default_from'),
948 self.config.get('smtp', 'default_to'),
949 self.config.get('smtp', 'server'))
950 self.sched.registerReporter(self.smtp_reporter)
951
952 self.sched.start()
953 self.sched.reconfigure(self.config)
954 self.sched.resume()
955 self.webapp.start()
956 self.rpc.start()
957 self.launcher.gearman.waitForServer()
958 self.registerJobs()
959 self.builds = self.worker.running_builds
960 self.history = self.worker.build_history
961
962 self.addCleanup(self.assertFinalState)
963 self.addCleanup(self.shutdown)
964
965 def setup_config(self):
966 """Per test config object. Override to set different config."""
967 self.config = ConfigParser.ConfigParser()
968 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
969
970 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700971 # Make sure that git.Repo objects have been garbage collected.
972 repos = []
973 gc.collect()
974 for obj in gc.get_objects():
975 if isinstance(obj, git.Repo):
976 repos.append(obj)
977 self.assertEqual(len(repos), 0)
978 self.assertEmptyQueues()
979
980 def shutdown(self):
981 self.log.debug("Shutting down after tests")
982 self.launcher.stop()
983 self.merge_server.stop()
984 self.merge_server.join()
985 self.merge_client.stop()
986 self.worker.shutdown()
987 self.gerrit.stop()
988 self.timer.stop()
989 self.sched.stop()
990 self.sched.join()
991 self.statsd.stop()
992 self.statsd.join()
993 self.webapp.stop()
994 self.webapp.join()
995 self.rpc.stop()
996 self.rpc.join()
997 self.gearman_server.shutdown()
998 threads = threading.enumerate()
999 if len(threads) > 1:
1000 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001001
1002 def init_repo(self, project):
1003 parts = project.split('/')
1004 path = os.path.join(self.upstream_root, *parts[:-1])
1005 if not os.path.exists(path):
1006 os.makedirs(path)
1007 path = os.path.join(self.upstream_root, project)
1008 repo = git.Repo.init(path)
1009
1010 repo.config_writer().set_value('user', 'email', 'user@example.com')
1011 repo.config_writer().set_value('user', 'name', 'User Name')
1012 repo.config_writer().write()
1013
1014 fn = os.path.join(path, 'README')
1015 f = open(fn, 'w')
1016 f.write("test\n")
1017 f.close()
1018 repo.index.add([fn])
1019 repo.index.commit('initial commit')
1020 master = repo.create_head('master')
1021 repo.create_tag('init')
1022
James E. Blair97d902e2014-08-21 13:25:56 -07001023 repo.head.reference = master
1024 repo.head.reset(index=True, working_tree=True)
1025 repo.git.clean('-x', '-f', '-d')
1026
1027 self.create_branch(project, 'mp')
1028
1029 def create_branch(self, project, branch):
1030 path = os.path.join(self.upstream_root, project)
1031 repo = git.Repo.init(path)
1032 fn = os.path.join(path, 'README')
1033
1034 branch_head = repo.create_head(branch)
1035 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001036 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001037 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001038 f.close()
1039 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001040 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001041
James E. Blair97d902e2014-08-21 13:25:56 -07001042 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001043 repo.head.reset(index=True, working_tree=True)
1044 repo.git.clean('-x', '-f', '-d')
1045
1046 def ref_has_change(self, ref, change):
1047 path = os.path.join(self.git_root, change.project)
1048 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001049 try:
1050 for commit in repo.iter_commits(ref):
1051 if commit.message.strip() == ('%s-1' % change.subject):
1052 return True
1053 except GitCommandError:
1054 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001055 return False
1056
1057 def job_has_changes(self, *args):
1058 job = args[0]
1059 commits = args[1:]
1060 if isinstance(job, FakeBuild):
1061 parameters = job.parameters
1062 else:
1063 parameters = json.loads(job.arguments)
1064 project = parameters['ZUUL_PROJECT']
1065 path = os.path.join(self.git_root, project)
1066 repo = git.Repo(path)
1067 ref = parameters['ZUUL_REF']
1068 sha = parameters['ZUUL_COMMIT']
1069 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1070 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1071 commit_messages = ['%s-1' % commit.subject for commit in commits]
1072 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1073 " repo_messages %s; sha %s" % (job, commit_messages,
1074 repo_messages, sha))
1075 for msg in commit_messages:
1076 if msg not in repo_messages:
1077 self.log.debug(" messages do not match")
1078 return False
1079 if repo_shas[0] != sha:
1080 self.log.debug(" sha does not match")
1081 return False
1082 self.log.debug(" OK")
1083 return True
1084
1085 def registerJobs(self):
1086 count = 0
1087 for job in self.sched.layout.jobs.keys():
1088 self.worker.registerFunction('build:' + job)
1089 count += 1
1090 self.worker.registerFunction('stop:' + self.worker.worker_id)
1091 count += 1
1092
1093 while len(self.gearman_server.functions) < count:
1094 time.sleep(0)
1095
1096 def release(self, job):
1097 if isinstance(job, FakeBuild):
1098 job.release()
1099 else:
1100 job.waiting = False
1101 self.log.debug("Queued job %s released" % job.unique)
1102 self.gearman_server.wakeConnections()
1103
1104 def getParameter(self, job, name):
1105 if isinstance(job, FakeBuild):
1106 return job.parameters[name]
1107 else:
1108 parameters = json.loads(job.arguments)
1109 return parameters[name]
1110
1111 def resetGearmanServer(self):
1112 self.worker.setFunctions([])
1113 while True:
1114 done = True
1115 for connection in self.gearman_server.active_connections:
1116 if (connection.functions and
1117 connection.client_id not in ['Zuul RPC Listener',
1118 'Zuul Merger']):
1119 done = False
1120 if done:
1121 break
1122 time.sleep(0)
1123 self.gearman_server.functions = set()
1124 self.rpc.register()
1125 self.merge_server.register()
1126
1127 def haveAllBuildsReported(self):
1128 # See if Zuul is waiting on a meta job to complete
1129 if self.launcher.meta_jobs:
1130 return False
1131 # Find out if every build that the worker has completed has been
1132 # reported back to Zuul. If it hasn't then that means a Gearman
1133 # event is still in transit and the system is not stable.
1134 for build in self.worker.build_history:
1135 zbuild = self.launcher.builds.get(build.uuid)
1136 if not zbuild:
1137 # It has already been reported
1138 continue
1139 # It hasn't been reported yet.
1140 return False
1141 # Make sure that none of the worker connections are in GRAB_WAIT
1142 for connection in self.worker.active_connections:
1143 if connection.state == 'GRAB_WAIT':
1144 return False
1145 return True
1146
1147 def areAllBuildsWaiting(self):
1148 ret = True
1149
1150 builds = self.launcher.builds.values()
1151 for build in builds:
1152 client_job = None
1153 for conn in self.launcher.gearman.active_connections:
1154 for j in conn.related_jobs.values():
1155 if j.unique == build.uuid:
1156 client_job = j
1157 break
1158 if not client_job:
1159 self.log.debug("%s is not known to the gearman client" %
1160 build)
1161 ret = False
1162 continue
1163 if not client_job.handle:
1164 self.log.debug("%s has no handle" % client_job)
1165 ret = False
1166 continue
1167 server_job = self.gearman_server.jobs.get(client_job.handle)
1168 if not server_job:
1169 self.log.debug("%s is not known to the gearman server" %
1170 client_job)
1171 ret = False
1172 continue
1173 if not hasattr(server_job, 'waiting'):
1174 self.log.debug("%s is being enqueued" % server_job)
1175 ret = False
1176 continue
1177 if server_job.waiting:
1178 continue
1179 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1180 if worker_job:
1181 if worker_job.build.isWaiting():
1182 continue
1183 else:
1184 self.log.debug("%s is running" % worker_job)
1185 ret = False
1186 else:
1187 self.log.debug("%s is unassigned" % server_job)
1188 ret = False
1189 return ret
1190
1191 def waitUntilSettled(self):
1192 self.log.debug("Waiting until settled...")
1193 start = time.time()
1194 while True:
1195 if time.time() - start > 10:
1196 print 'queue status:',
1197 print self.sched.trigger_event_queue.empty(),
1198 print self.sched.result_event_queue.empty(),
1199 print self.fake_gerrit.event_queue.empty(),
1200 print self.areAllBuildsWaiting()
1201 raise Exception("Timeout waiting for Zuul to settle")
1202 # Make sure no new events show up while we're checking
1203 self.worker.lock.acquire()
1204 # have all build states propogated to zuul?
1205 if self.haveAllBuildsReported():
1206 # Join ensures that the queue is empty _and_ events have been
1207 # processed
1208 self.fake_gerrit.event_queue.join()
1209 self.sched.trigger_event_queue.join()
1210 self.sched.result_event_queue.join()
1211 self.sched.run_handler_lock.acquire()
1212 if (self.sched.trigger_event_queue.empty() and
1213 self.sched.result_event_queue.empty() and
1214 self.fake_gerrit.event_queue.empty() and
1215 not self.merge_client.build_sets and
1216 self.haveAllBuildsReported() and
1217 self.areAllBuildsWaiting()):
1218 self.sched.run_handler_lock.release()
1219 self.worker.lock.release()
1220 self.log.debug("...settled.")
1221 return
1222 self.sched.run_handler_lock.release()
1223 self.worker.lock.release()
1224 self.sched.wake_event.wait(0.1)
1225
1226 def countJobResults(self, jobs, result):
1227 jobs = filter(lambda x: x.result == result, jobs)
1228 return len(jobs)
1229
1230 def getJobFromHistory(self, name):
1231 history = self.worker.build_history
1232 for job in history:
1233 if job.name == name:
1234 return job
1235 raise Exception("Unable to find job %s in history" % name)
1236
1237 def assertEmptyQueues(self):
1238 # Make sure there are no orphaned jobs
1239 for pipeline in self.sched.layout.pipelines.values():
1240 for queue in pipeline.queues:
1241 if len(queue.queue) != 0:
1242 print 'pipeline %s queue %s contents %s' % (
1243 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001244 self.assertEqual(len(queue.queue), 0,
1245 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001246
1247 def assertReportedStat(self, key, value=None, kind=None):
1248 start = time.time()
1249 while time.time() < (start + 5):
1250 for stat in self.statsd.stats:
1251 pprint.pprint(self.statsd.stats)
1252 k, v = stat.split(':')
1253 if key == k:
1254 if value is None and kind is None:
1255 return
1256 elif value:
1257 if value == v:
1258 return
1259 elif kind:
1260 if v.endswith('|' + kind):
1261 return
1262 time.sleep(0.1)
1263
1264 pprint.pprint(self.statsd.stats)
1265 raise Exception("Key %s not found in reported stats" % key)