blob: 08b3cab411bc1d55e7b7a3b7edce96e83802ca25 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700381 log = logging.getLogger("zuul.test.FakeGerrit")
382
Clark Boylanb640e052014-04-03 16:41:46 -0700383 def __init__(self, *args, **kw):
384 self.event_queue = Queue.Queue()
385 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
386 self.change_number = 0
387 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700388 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700389
390 def addFakeChange(self, project, branch, subject, status='NEW'):
391 self.change_number += 1
392 c = FakeChange(self, self.change_number, project, branch, subject,
393 upstream_root=self.upstream_root,
394 status=status)
395 self.changes[self.change_number] = c
396 return c
397
398 def addEvent(self, data):
399 return self.event_queue.put(data)
400
401 def getEvent(self):
402 return self.event_queue.get()
403
404 def eventDone(self):
405 self.event_queue.task_done()
406
407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
410 change.messages.append(message)
411 if 'submit' in action:
412 change.setMerged()
413 if message:
414 change.setReported()
415
416 def query(self, number):
417 change = self.changes.get(int(number))
418 if change:
419 return change.query()
420 return {}
421
James E. Blairc494d542014-08-06 09:23:52 -0700422 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700423 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700424 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800425 if query.startswith('change:'):
426 # Query a specific changeid
427 changeid = query[len('change:'):]
428 l = [change.query() for change in self.changes.values()
429 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700430 elif query.startswith('message:'):
431 # Query the content of a commit message
432 msg = query[len('message:'):].strip()
433 l = [change.query() for change in self.changes.values()
434 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800435 else:
436 # Query all open changes
437 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700438 return l
James E. Blairc494d542014-08-06 09:23:52 -0700439
Clark Boylanb640e052014-04-03 16:41:46 -0700440 def startWatching(self, *args, **kw):
441 pass
442
443
444class BuildHistory(object):
445 def __init__(self, **kw):
446 self.__dict__.update(kw)
447
448 def __repr__(self):
449 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
450 (self.result, self.name, self.number, self.changes))
451
452
453class FakeURLOpener(object):
454 def __init__(self, upstream_root, fake_gerrit, url):
455 self.upstream_root = upstream_root
456 self.fake_gerrit = fake_gerrit
457 self.url = url
458
459 def read(self):
460 res = urlparse.urlparse(self.url)
461 path = res.path
462 project = '/'.join(path.split('/')[2:-2])
463 ret = '001e# service=git-upload-pack\n'
464 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
465 'multi_ack thin-pack side-band side-band-64k ofs-delta '
466 'shallow no-progress include-tag multi_ack_detailed no-done\n')
467 path = os.path.join(self.upstream_root, project)
468 repo = git.Repo(path)
469 for ref in repo.refs:
470 r = ref.object.hexsha + ' ' + ref.path + '\n'
471 ret += '%04x%s' % (len(r) + 4, r)
472 ret += '0000'
473 return ret
474
475
476class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
477 name = 'gerrit'
478
479 def __init__(self, upstream_root, *args):
480 super(FakeGerritTrigger, self).__init__(*args)
481 self.upstream_root = upstream_root
482
483 def getGitUrl(self, project):
484 return os.path.join(self.upstream_root, project.name)
485
486
487class FakeStatsd(threading.Thread):
488 def __init__(self):
489 threading.Thread.__init__(self)
490 self.daemon = True
491 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
492 self.sock.bind(('', 0))
493 self.port = self.sock.getsockname()[1]
494 self.wake_read, self.wake_write = os.pipe()
495 self.stats = []
496
497 def run(self):
498 while True:
499 poll = select.poll()
500 poll.register(self.sock, select.POLLIN)
501 poll.register(self.wake_read, select.POLLIN)
502 ret = poll.poll()
503 for (fd, event) in ret:
504 if fd == self.sock.fileno():
505 data = self.sock.recvfrom(1024)
506 if not data:
507 return
508 self.stats.append(data[0])
509 if fd == self.wake_read:
510 return
511
512 def stop(self):
513 os.write(self.wake_write, '1\n')
514
515
516class FakeBuild(threading.Thread):
517 log = logging.getLogger("zuul.test")
518
519 def __init__(self, worker, job, number, node):
520 threading.Thread.__init__(self)
521 self.daemon = True
522 self.worker = worker
523 self.job = job
524 self.name = job.name.split(':')[1]
525 self.number = number
526 self.node = node
527 self.parameters = json.loads(job.arguments)
528 self.unique = self.parameters['ZUUL_UUID']
529 self.wait_condition = threading.Condition()
530 self.waiting = False
531 self.aborted = False
532 self.created = time.time()
533 self.description = ''
534 self.run_error = False
535
536 def release(self):
537 self.wait_condition.acquire()
538 self.wait_condition.notify()
539 self.waiting = False
540 self.log.debug("Build %s released" % self.unique)
541 self.wait_condition.release()
542
543 def isWaiting(self):
544 self.wait_condition.acquire()
545 if self.waiting:
546 ret = True
547 else:
548 ret = False
549 self.wait_condition.release()
550 return ret
551
552 def _wait(self):
553 self.wait_condition.acquire()
554 self.waiting = True
555 self.log.debug("Build %s waiting" % self.unique)
556 self.wait_condition.wait()
557 self.wait_condition.release()
558
559 def run(self):
560 data = {
561 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
562 'name': self.name,
563 'number': self.number,
564 'manager': self.worker.worker_id,
565 'worker_name': 'My Worker',
566 'worker_hostname': 'localhost',
567 'worker_ips': ['127.0.0.1', '192.168.1.1'],
568 'worker_fqdn': 'zuul.example.org',
569 'worker_program': 'FakeBuilder',
570 'worker_version': 'v1.1',
571 'worker_extra': {'something': 'else'}
572 }
573
574 self.log.debug('Running build %s' % self.unique)
575
576 self.job.sendWorkData(json.dumps(data))
577 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
578 self.job.sendWorkStatus(0, 100)
579
580 if self.worker.hold_jobs_in_build:
581 self.log.debug('Holding build %s' % self.unique)
582 self._wait()
583 self.log.debug("Build %s continuing" % self.unique)
584
585 self.worker.lock.acquire()
586
587 result = 'SUCCESS'
588 if (('ZUUL_REF' in self.parameters) and
589 self.worker.shouldFailTest(self.name,
590 self.parameters['ZUUL_REF'])):
591 result = 'FAILURE'
592 if self.aborted:
593 result = 'ABORTED'
594
595 if self.run_error:
596 work_fail = True
597 result = 'RUN_ERROR'
598 else:
599 data['result'] = result
600 work_fail = False
601
602 changes = None
603 if 'ZUUL_CHANGE_IDS' in self.parameters:
604 changes = self.parameters['ZUUL_CHANGE_IDS']
605
606 self.worker.build_history.append(
607 BuildHistory(name=self.name, number=self.number,
608 result=result, changes=changes, node=self.node,
609 uuid=self.unique, description=self.description,
610 pipeline=self.parameters['ZUUL_PIPELINE'])
611 )
612
613 self.job.sendWorkData(json.dumps(data))
614 if work_fail:
615 self.job.sendWorkFail()
616 else:
617 self.job.sendWorkComplete(json.dumps(data))
618 del self.worker.gearman_jobs[self.job.unique]
619 self.worker.running_builds.remove(self)
620 self.worker.lock.release()
621
622
623class FakeWorker(gear.Worker):
624 def __init__(self, worker_id, test):
625 super(FakeWorker, self).__init__(worker_id)
626 self.gearman_jobs = {}
627 self.build_history = []
628 self.running_builds = []
629 self.build_counter = 0
630 self.fail_tests = {}
631 self.test = test
632
633 self.hold_jobs_in_build = False
634 self.lock = threading.Lock()
635 self.__work_thread = threading.Thread(target=self.work)
636 self.__work_thread.daemon = True
637 self.__work_thread.start()
638
639 def handleJob(self, job):
640 parts = job.name.split(":")
641 cmd = parts[0]
642 name = parts[1]
643 if len(parts) > 2:
644 node = parts[2]
645 else:
646 node = None
647 if cmd == 'build':
648 self.handleBuild(job, name, node)
649 elif cmd == 'stop':
650 self.handleStop(job, name)
651 elif cmd == 'set_description':
652 self.handleSetDescription(job, name)
653
654 def handleBuild(self, job, name, node):
655 build = FakeBuild(self, job, self.build_counter, node)
656 job.build = build
657 self.gearman_jobs[job.unique] = job
658 self.build_counter += 1
659
660 self.running_builds.append(build)
661 build.start()
662
663 def handleStop(self, job, name):
664 self.log.debug("handle stop")
665 parameters = json.loads(job.arguments)
666 name = parameters['name']
667 number = parameters['number']
668 for build in self.running_builds:
669 if build.name == name and build.number == number:
670 build.aborted = True
671 build.release()
672 job.sendWorkComplete()
673 return
674 job.sendWorkFail()
675
676 def handleSetDescription(self, job, name):
677 self.log.debug("handle set description")
678 parameters = json.loads(job.arguments)
679 name = parameters['name']
680 number = parameters['number']
681 descr = parameters['html_description']
682 for build in self.running_builds:
683 if build.name == name and build.number == number:
684 build.description = descr
685 job.sendWorkComplete()
686 return
687 for build in self.build_history:
688 if build.name == name and build.number == number:
689 build.description = descr
690 job.sendWorkComplete()
691 return
692 job.sendWorkFail()
693
694 def work(self):
695 while self.running:
696 try:
697 job = self.getJob()
698 except gear.InterruptedError:
699 continue
700 try:
701 self.handleJob(job)
702 except:
703 self.log.exception("Worker exception:")
704
705 def addFailTest(self, name, change):
706 l = self.fail_tests.get(name, [])
707 l.append(change)
708 self.fail_tests[name] = l
709
710 def shouldFailTest(self, name, ref):
711 l = self.fail_tests.get(name, [])
712 for change in l:
713 if self.test.ref_has_change(ref, change):
714 return True
715 return False
716
717 def release(self, regex=None):
718 builds = self.running_builds[:]
719 self.log.debug("releasing build %s (%s)" % (regex,
720 len(self.running_builds)))
721 for build in builds:
722 if not regex or re.match(regex, build.name):
723 self.log.debug("releasing build %s" %
724 (build.parameters['ZUUL_UUID']))
725 build.release()
726 else:
727 self.log.debug("not releasing build %s" %
728 (build.parameters['ZUUL_UUID']))
729 self.log.debug("done releasing builds %s (%s)" %
730 (regex, len(self.running_builds)))
731
732
733class FakeGearmanServer(gear.Server):
734 def __init__(self):
735 self.hold_jobs_in_queue = False
736 super(FakeGearmanServer, self).__init__(0)
737
738 def getJobForConnection(self, connection, peek=False):
739 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
740 for job in queue:
741 if not hasattr(job, 'waiting'):
742 if job.name.startswith('build:'):
743 job.waiting = self.hold_jobs_in_queue
744 else:
745 job.waiting = False
746 if job.waiting:
747 continue
748 if job.name in connection.functions:
749 if not peek:
750 queue.remove(job)
751 connection.related_jobs[job.handle] = job
752 job.worker_connection = connection
753 job.running = True
754 return job
755 return None
756
757 def release(self, regex=None):
758 released = False
759 qlen = (len(self.high_queue) + len(self.normal_queue) +
760 len(self.low_queue))
761 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
762 for job in self.getQueue():
763 cmd, name = job.name.split(':')
764 if cmd != 'build':
765 continue
766 if not regex or re.match(regex, name):
767 self.log.debug("releasing queued job %s" %
768 job.unique)
769 job.waiting = False
770 released = True
771 else:
772 self.log.debug("not releasing queued job %s" %
773 job.unique)
774 if released:
775 self.wakeConnections()
776 qlen = (len(self.high_queue) + len(self.normal_queue) +
777 len(self.low_queue))
778 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
779
780
781class FakeSMTP(object):
782 log = logging.getLogger('zuul.FakeSMTP')
783
784 def __init__(self, messages, server, port):
785 self.server = server
786 self.port = port
787 self.messages = messages
788
789 def sendmail(self, from_email, to_email, msg):
790 self.log.info("Sending email from %s, to %s, with msg %s" % (
791 from_email, to_email, msg))
792
793 headers = msg.split('\n\n', 1)[0]
794 body = msg.split('\n\n', 1)[1]
795
796 self.messages.append(dict(
797 from_email=from_email,
798 to_email=to_email,
799 msg=msg,
800 headers=headers,
801 body=body,
802 ))
803
804 return True
805
806 def quit(self):
807 return True
808
809
810class FakeSwiftClientConnection(swiftclient.client.Connection):
811 def post_account(self, headers):
812 # Do nothing
813 pass
814
815 def get_auth(self):
816 # Returns endpoint and (unused) auth token
817 endpoint = os.path.join('https://storage.example.org', 'V1',
818 'AUTH_account')
819 return endpoint, ''
820
821
Maru Newby3fe5f852015-01-13 04:22:14 +0000822class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700823 log = logging.getLogger("zuul.test")
824
825 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000826 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700827 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
828 try:
829 test_timeout = int(test_timeout)
830 except ValueError:
831 # If timeout value is invalid do not set a timeout.
832 test_timeout = 0
833 if test_timeout > 0:
834 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
835
836 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
837 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
838 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
839 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
840 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
841 os.environ.get('OS_STDERR_CAPTURE') == '1'):
842 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
843 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
844 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
845 os.environ.get('OS_LOG_CAPTURE') == '1'):
846 self.useFixture(fixtures.FakeLogger(
847 level=logging.DEBUG,
848 format='%(asctime)s %(name)-32s '
849 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000850
851
852class ZuulTestCase(BaseTestCase):
853
854 def setUp(self):
855 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700856 if USE_TEMPDIR:
857 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000858 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
859 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700860 else:
861 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700862 self.test_root = os.path.join(tmp_root, "zuul-test")
863 self.upstream_root = os.path.join(self.test_root, "upstream")
864 self.git_root = os.path.join(self.test_root, "git")
865
866 if os.path.exists(self.test_root):
867 shutil.rmtree(self.test_root)
868 os.makedirs(self.test_root)
869 os.makedirs(self.upstream_root)
870 os.makedirs(self.git_root)
871
872 # Make per test copy of Configuration.
873 self.setup_config()
874 self.config.set('zuul', 'layout_config',
875 os.path.join(FIXTURE_DIR, "layout.yaml"))
876 self.config.set('merger', 'git_dir', self.git_root)
877
878 # For each project in config:
879 self.init_repo("org/project")
880 self.init_repo("org/project1")
881 self.init_repo("org/project2")
882 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700883 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700884 self.init_repo("org/project5")
885 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700886 self.init_repo("org/one-job-project")
887 self.init_repo("org/nonvoting-project")
888 self.init_repo("org/templated-project")
889 self.init_repo("org/layered-project")
890 self.init_repo("org/node-project")
891 self.init_repo("org/conflict-project")
892 self.init_repo("org/noop-project")
893 self.init_repo("org/experimental-project")
894
895 self.statsd = FakeStatsd()
896 os.environ['STATSD_HOST'] = 'localhost'
897 os.environ['STATSD_PORT'] = str(self.statsd.port)
898 self.statsd.start()
899 # the statsd client object is configured in the statsd module import
900 reload(statsd)
901 reload(zuul.scheduler)
902
903 self.gearman_server = FakeGearmanServer()
904
905 self.config.set('gearman', 'port', str(self.gearman_server.port))
906
907 self.worker = FakeWorker('fake_worker', self)
908 self.worker.addServer('127.0.0.1', self.gearman_server.port)
909 self.gearman_server.worker = self.worker
910
911 self.merge_server = zuul.merger.server.MergeServer(self.config)
912 self.merge_server.start()
913
914 self.sched = zuul.scheduler.Scheduler()
915
916 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
917 FakeSwiftClientConnection))
918 self.swift = zuul.lib.swift.Swift(self.config)
919
920 def URLOpenerFactory(*args, **kw):
921 if isinstance(args[0], urllib2.Request):
922 return old_urlopen(*args, **kw)
923 args = [self.fake_gerrit] + list(args)
924 return FakeURLOpener(self.upstream_root, *args, **kw)
925
926 old_urlopen = urllib2.urlopen
927 urllib2.urlopen = URLOpenerFactory
928
929 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
930 self.swift)
931 self.merge_client = zuul.merger.client.MergeClient(
932 self.config, self.sched)
933
934 self.smtp_messages = []
935
936 def FakeSMTPFactory(*args, **kw):
937 args = [self.smtp_messages] + list(args)
938 return FakeSMTP(*args, **kw)
939
940 zuul.lib.gerrit.Gerrit = FakeGerrit
941 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
942
943 self.gerrit = FakeGerritTrigger(
944 self.upstream_root, self.config, self.sched)
945 self.gerrit.replication_timeout = 1.5
946 self.gerrit.replication_retry_interval = 0.5
947 self.fake_gerrit = self.gerrit.gerrit
948 self.fake_gerrit.upstream_root = self.upstream_root
949
950 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
951 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
952
953 self.sched.setLauncher(self.launcher)
954 self.sched.setMerger(self.merge_client)
955 self.sched.registerTrigger(self.gerrit)
956 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
957 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000958 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
959 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700960 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700961
962 self.sched.registerReporter(
963 zuul.reporter.gerrit.Reporter(self.gerrit))
964 self.smtp_reporter = zuul.reporter.smtp.Reporter(
965 self.config.get('smtp', 'default_from'),
966 self.config.get('smtp', 'default_to'),
967 self.config.get('smtp', 'server'))
968 self.sched.registerReporter(self.smtp_reporter)
969
970 self.sched.start()
971 self.sched.reconfigure(self.config)
972 self.sched.resume()
973 self.webapp.start()
974 self.rpc.start()
975 self.launcher.gearman.waitForServer()
976 self.registerJobs()
977 self.builds = self.worker.running_builds
978 self.history = self.worker.build_history
979
980 self.addCleanup(self.assertFinalState)
981 self.addCleanup(self.shutdown)
982
983 def setup_config(self):
984 """Per test config object. Override to set different config."""
985 self.config = ConfigParser.ConfigParser()
986 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
987
988 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700989 # Make sure that git.Repo objects have been garbage collected.
990 repos = []
991 gc.collect()
992 for obj in gc.get_objects():
993 if isinstance(obj, git.Repo):
994 repos.append(obj)
995 self.assertEqual(len(repos), 0)
996 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -0800997 for pipeline in self.sched.layout.pipelines.values():
998 if isinstance(pipeline.manager,
999 zuul.scheduler.IndependentPipelineManager):
1000 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001001
1002 def shutdown(self):
1003 self.log.debug("Shutting down after tests")
1004 self.launcher.stop()
1005 self.merge_server.stop()
1006 self.merge_server.join()
1007 self.merge_client.stop()
1008 self.worker.shutdown()
1009 self.gerrit.stop()
1010 self.timer.stop()
1011 self.sched.stop()
1012 self.sched.join()
1013 self.statsd.stop()
1014 self.statsd.join()
1015 self.webapp.stop()
1016 self.webapp.join()
1017 self.rpc.stop()
1018 self.rpc.join()
1019 self.gearman_server.shutdown()
1020 threads = threading.enumerate()
1021 if len(threads) > 1:
1022 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001023
1024 def init_repo(self, project):
1025 parts = project.split('/')
1026 path = os.path.join(self.upstream_root, *parts[:-1])
1027 if not os.path.exists(path):
1028 os.makedirs(path)
1029 path = os.path.join(self.upstream_root, project)
1030 repo = git.Repo.init(path)
1031
1032 repo.config_writer().set_value('user', 'email', 'user@example.com')
1033 repo.config_writer().set_value('user', 'name', 'User Name')
1034 repo.config_writer().write()
1035
1036 fn = os.path.join(path, 'README')
1037 f = open(fn, 'w')
1038 f.write("test\n")
1039 f.close()
1040 repo.index.add([fn])
1041 repo.index.commit('initial commit')
1042 master = repo.create_head('master')
1043 repo.create_tag('init')
1044
James E. Blair97d902e2014-08-21 13:25:56 -07001045 repo.head.reference = master
1046 repo.head.reset(index=True, working_tree=True)
1047 repo.git.clean('-x', '-f', '-d')
1048
1049 self.create_branch(project, 'mp')
1050
1051 def create_branch(self, project, branch):
1052 path = os.path.join(self.upstream_root, project)
1053 repo = git.Repo.init(path)
1054 fn = os.path.join(path, 'README')
1055
1056 branch_head = repo.create_head(branch)
1057 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001058 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001059 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001060 f.close()
1061 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001062 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001063
James E. Blair97d902e2014-08-21 13:25:56 -07001064 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001065 repo.head.reset(index=True, working_tree=True)
1066 repo.git.clean('-x', '-f', '-d')
1067
1068 def ref_has_change(self, ref, change):
1069 path = os.path.join(self.git_root, change.project)
1070 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001071 try:
1072 for commit in repo.iter_commits(ref):
1073 if commit.message.strip() == ('%s-1' % change.subject):
1074 return True
1075 except GitCommandError:
1076 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001077 return False
1078
1079 def job_has_changes(self, *args):
1080 job = args[0]
1081 commits = args[1:]
1082 if isinstance(job, FakeBuild):
1083 parameters = job.parameters
1084 else:
1085 parameters = json.loads(job.arguments)
1086 project = parameters['ZUUL_PROJECT']
1087 path = os.path.join(self.git_root, project)
1088 repo = git.Repo(path)
1089 ref = parameters['ZUUL_REF']
1090 sha = parameters['ZUUL_COMMIT']
1091 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1092 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1093 commit_messages = ['%s-1' % commit.subject for commit in commits]
1094 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1095 " repo_messages %s; sha %s" % (job, commit_messages,
1096 repo_messages, sha))
1097 for msg in commit_messages:
1098 if msg not in repo_messages:
1099 self.log.debug(" messages do not match")
1100 return False
1101 if repo_shas[0] != sha:
1102 self.log.debug(" sha does not match")
1103 return False
1104 self.log.debug(" OK")
1105 return True
1106
1107 def registerJobs(self):
1108 count = 0
1109 for job in self.sched.layout.jobs.keys():
1110 self.worker.registerFunction('build:' + job)
1111 count += 1
1112 self.worker.registerFunction('stop:' + self.worker.worker_id)
1113 count += 1
1114
1115 while len(self.gearman_server.functions) < count:
1116 time.sleep(0)
1117
1118 def release(self, job):
1119 if isinstance(job, FakeBuild):
1120 job.release()
1121 else:
1122 job.waiting = False
1123 self.log.debug("Queued job %s released" % job.unique)
1124 self.gearman_server.wakeConnections()
1125
1126 def getParameter(self, job, name):
1127 if isinstance(job, FakeBuild):
1128 return job.parameters[name]
1129 else:
1130 parameters = json.loads(job.arguments)
1131 return parameters[name]
1132
1133 def resetGearmanServer(self):
1134 self.worker.setFunctions([])
1135 while True:
1136 done = True
1137 for connection in self.gearman_server.active_connections:
1138 if (connection.functions and
1139 connection.client_id not in ['Zuul RPC Listener',
1140 'Zuul Merger']):
1141 done = False
1142 if done:
1143 break
1144 time.sleep(0)
1145 self.gearman_server.functions = set()
1146 self.rpc.register()
1147 self.merge_server.register()
1148
1149 def haveAllBuildsReported(self):
1150 # See if Zuul is waiting on a meta job to complete
1151 if self.launcher.meta_jobs:
1152 return False
1153 # Find out if every build that the worker has completed has been
1154 # reported back to Zuul. If it hasn't then that means a Gearman
1155 # event is still in transit and the system is not stable.
1156 for build in self.worker.build_history:
1157 zbuild = self.launcher.builds.get(build.uuid)
1158 if not zbuild:
1159 # It has already been reported
1160 continue
1161 # It hasn't been reported yet.
1162 return False
1163 # Make sure that none of the worker connections are in GRAB_WAIT
1164 for connection in self.worker.active_connections:
1165 if connection.state == 'GRAB_WAIT':
1166 return False
1167 return True
1168
1169 def areAllBuildsWaiting(self):
1170 ret = True
1171
1172 builds = self.launcher.builds.values()
1173 for build in builds:
1174 client_job = None
1175 for conn in self.launcher.gearman.active_connections:
1176 for j in conn.related_jobs.values():
1177 if j.unique == build.uuid:
1178 client_job = j
1179 break
1180 if not client_job:
1181 self.log.debug("%s is not known to the gearman client" %
1182 build)
1183 ret = False
1184 continue
1185 if not client_job.handle:
1186 self.log.debug("%s has no handle" % client_job)
1187 ret = False
1188 continue
1189 server_job = self.gearman_server.jobs.get(client_job.handle)
1190 if not server_job:
1191 self.log.debug("%s is not known to the gearman server" %
1192 client_job)
1193 ret = False
1194 continue
1195 if not hasattr(server_job, 'waiting'):
1196 self.log.debug("%s is being enqueued" % server_job)
1197 ret = False
1198 continue
1199 if server_job.waiting:
1200 continue
1201 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1202 if worker_job:
1203 if worker_job.build.isWaiting():
1204 continue
1205 else:
1206 self.log.debug("%s is running" % worker_job)
1207 ret = False
1208 else:
1209 self.log.debug("%s is unassigned" % server_job)
1210 ret = False
1211 return ret
1212
1213 def waitUntilSettled(self):
1214 self.log.debug("Waiting until settled...")
1215 start = time.time()
1216 while True:
1217 if time.time() - start > 10:
1218 print 'queue status:',
1219 print self.sched.trigger_event_queue.empty(),
1220 print self.sched.result_event_queue.empty(),
1221 print self.fake_gerrit.event_queue.empty(),
1222 print self.areAllBuildsWaiting()
1223 raise Exception("Timeout waiting for Zuul to settle")
1224 # Make sure no new events show up while we're checking
1225 self.worker.lock.acquire()
1226 # have all build states propogated to zuul?
1227 if self.haveAllBuildsReported():
1228 # Join ensures that the queue is empty _and_ events have been
1229 # processed
1230 self.fake_gerrit.event_queue.join()
1231 self.sched.trigger_event_queue.join()
1232 self.sched.result_event_queue.join()
1233 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001234 if (not self.merge_client.build_sets and
1235 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001236 self.sched.result_event_queue.empty() and
1237 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001238 self.haveAllBuildsReported() and
1239 self.areAllBuildsWaiting()):
1240 self.sched.run_handler_lock.release()
1241 self.worker.lock.release()
1242 self.log.debug("...settled.")
1243 return
1244 self.sched.run_handler_lock.release()
1245 self.worker.lock.release()
1246 self.sched.wake_event.wait(0.1)
1247
1248 def countJobResults(self, jobs, result):
1249 jobs = filter(lambda x: x.result == result, jobs)
1250 return len(jobs)
1251
1252 def getJobFromHistory(self, name):
1253 history = self.worker.build_history
1254 for job in history:
1255 if job.name == name:
1256 return job
1257 raise Exception("Unable to find job %s in history" % name)
1258
1259 def assertEmptyQueues(self):
1260 # Make sure there are no orphaned jobs
1261 for pipeline in self.sched.layout.pipelines.values():
1262 for queue in pipeline.queues:
1263 if len(queue.queue) != 0:
1264 print 'pipeline %s queue %s contents %s' % (
1265 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001266 self.assertEqual(len(queue.queue), 0,
1267 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001268
1269 def assertReportedStat(self, key, value=None, kind=None):
1270 start = time.time()
1271 while time.time() < (start + 5):
1272 for stat in self.statsd.stats:
1273 pprint.pprint(self.statsd.stats)
1274 k, v = stat.split(':')
1275 if key == k:
1276 if value is None and kind is None:
1277 return
1278 elif value:
1279 if value == v:
1280 return
1281 elif kind:
1282 if v.endswith('|' + kind):
1283 return
1284 time.sleep(0.1)
1285
1286 pprint.pprint(self.statsd.stats)
1287 raise Exception("Key %s not found in reported stats" % key)