blob: 535bb7fac4af3db2b7a89a3eeaf47bd43853c114 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700381 log = logging.getLogger("zuul.test.FakeGerrit")
382
Clark Boylanb640e052014-04-03 16:41:46 -0700383 def __init__(self, *args, **kw):
384 self.event_queue = Queue.Queue()
385 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
386 self.change_number = 0
387 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700388 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700389
390 def addFakeChange(self, project, branch, subject, status='NEW'):
391 self.change_number += 1
392 c = FakeChange(self, self.change_number, project, branch, subject,
393 upstream_root=self.upstream_root,
394 status=status)
395 self.changes[self.change_number] = c
396 return c
397
398 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700399 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700400
401 def getEvent(self):
402 return self.event_queue.get()
403
404 def eventDone(self):
405 self.event_queue.task_done()
406
407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
410 change.messages.append(message)
411 if 'submit' in action:
412 change.setMerged()
413 if message:
414 change.setReported()
415
416 def query(self, number):
417 change = self.changes.get(int(number))
418 if change:
419 return change.query()
420 return {}
421
James E. Blairc494d542014-08-06 09:23:52 -0700422 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700423 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700424 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800425 if query.startswith('change:'):
426 # Query a specific changeid
427 changeid = query[len('change:'):]
428 l = [change.query() for change in self.changes.values()
429 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700430 elif query.startswith('message:'):
431 # Query the content of a commit message
432 msg = query[len('message:'):].strip()
433 l = [change.query() for change in self.changes.values()
434 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800435 else:
436 # Query all open changes
437 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700438 return l
James E. Blairc494d542014-08-06 09:23:52 -0700439
Clark Boylanb640e052014-04-03 16:41:46 -0700440 def startWatching(self, *args, **kw):
441 pass
442
443
444class BuildHistory(object):
445 def __init__(self, **kw):
446 self.__dict__.update(kw)
447
448 def __repr__(self):
449 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
450 (self.result, self.name, self.number, self.changes))
451
452
453class FakeURLOpener(object):
454 def __init__(self, upstream_root, fake_gerrit, url):
455 self.upstream_root = upstream_root
456 self.fake_gerrit = fake_gerrit
457 self.url = url
458
459 def read(self):
460 res = urlparse.urlparse(self.url)
461 path = res.path
462 project = '/'.join(path.split('/')[2:-2])
463 ret = '001e# service=git-upload-pack\n'
464 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
465 'multi_ack thin-pack side-band side-band-64k ofs-delta '
466 'shallow no-progress include-tag multi_ack_detailed no-done\n')
467 path = os.path.join(self.upstream_root, project)
468 repo = git.Repo(path)
469 for ref in repo.refs:
470 r = ref.object.hexsha + ' ' + ref.path + '\n'
471 ret += '%04x%s' % (len(r) + 4, r)
472 ret += '0000'
473 return ret
474
475
476class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
477 name = 'gerrit'
478
479 def __init__(self, upstream_root, *args):
480 super(FakeGerritTrigger, self).__init__(*args)
481 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700482 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700483
484 def getGitUrl(self, project):
485 return os.path.join(self.upstream_root, project.name)
486
487
488class FakeStatsd(threading.Thread):
489 def __init__(self):
490 threading.Thread.__init__(self)
491 self.daemon = True
492 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
493 self.sock.bind(('', 0))
494 self.port = self.sock.getsockname()[1]
495 self.wake_read, self.wake_write = os.pipe()
496 self.stats = []
497
498 def run(self):
499 while True:
500 poll = select.poll()
501 poll.register(self.sock, select.POLLIN)
502 poll.register(self.wake_read, select.POLLIN)
503 ret = poll.poll()
504 for (fd, event) in ret:
505 if fd == self.sock.fileno():
506 data = self.sock.recvfrom(1024)
507 if not data:
508 return
509 self.stats.append(data[0])
510 if fd == self.wake_read:
511 return
512
513 def stop(self):
514 os.write(self.wake_write, '1\n')
515
516
517class FakeBuild(threading.Thread):
518 log = logging.getLogger("zuul.test")
519
520 def __init__(self, worker, job, number, node):
521 threading.Thread.__init__(self)
522 self.daemon = True
523 self.worker = worker
524 self.job = job
525 self.name = job.name.split(':')[1]
526 self.number = number
527 self.node = node
528 self.parameters = json.loads(job.arguments)
529 self.unique = self.parameters['ZUUL_UUID']
530 self.wait_condition = threading.Condition()
531 self.waiting = False
532 self.aborted = False
533 self.created = time.time()
534 self.description = ''
535 self.run_error = False
536
537 def release(self):
538 self.wait_condition.acquire()
539 self.wait_condition.notify()
540 self.waiting = False
541 self.log.debug("Build %s released" % self.unique)
542 self.wait_condition.release()
543
544 def isWaiting(self):
545 self.wait_condition.acquire()
546 if self.waiting:
547 ret = True
548 else:
549 ret = False
550 self.wait_condition.release()
551 return ret
552
553 def _wait(self):
554 self.wait_condition.acquire()
555 self.waiting = True
556 self.log.debug("Build %s waiting" % self.unique)
557 self.wait_condition.wait()
558 self.wait_condition.release()
559
560 def run(self):
561 data = {
562 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
563 'name': self.name,
564 'number': self.number,
565 'manager': self.worker.worker_id,
566 'worker_name': 'My Worker',
567 'worker_hostname': 'localhost',
568 'worker_ips': ['127.0.0.1', '192.168.1.1'],
569 'worker_fqdn': 'zuul.example.org',
570 'worker_program': 'FakeBuilder',
571 'worker_version': 'v1.1',
572 'worker_extra': {'something': 'else'}
573 }
574
575 self.log.debug('Running build %s' % self.unique)
576
577 self.job.sendWorkData(json.dumps(data))
578 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
579 self.job.sendWorkStatus(0, 100)
580
581 if self.worker.hold_jobs_in_build:
582 self.log.debug('Holding build %s' % self.unique)
583 self._wait()
584 self.log.debug("Build %s continuing" % self.unique)
585
586 self.worker.lock.acquire()
587
588 result = 'SUCCESS'
589 if (('ZUUL_REF' in self.parameters) and
590 self.worker.shouldFailTest(self.name,
591 self.parameters['ZUUL_REF'])):
592 result = 'FAILURE'
593 if self.aborted:
594 result = 'ABORTED'
595
596 if self.run_error:
597 work_fail = True
598 result = 'RUN_ERROR'
599 else:
600 data['result'] = result
601 work_fail = False
602
603 changes = None
604 if 'ZUUL_CHANGE_IDS' in self.parameters:
605 changes = self.parameters['ZUUL_CHANGE_IDS']
606
607 self.worker.build_history.append(
608 BuildHistory(name=self.name, number=self.number,
609 result=result, changes=changes, node=self.node,
610 uuid=self.unique, description=self.description,
611 pipeline=self.parameters['ZUUL_PIPELINE'])
612 )
613
614 self.job.sendWorkData(json.dumps(data))
615 if work_fail:
616 self.job.sendWorkFail()
617 else:
618 self.job.sendWorkComplete(json.dumps(data))
619 del self.worker.gearman_jobs[self.job.unique]
620 self.worker.running_builds.remove(self)
621 self.worker.lock.release()
622
623
624class FakeWorker(gear.Worker):
625 def __init__(self, worker_id, test):
626 super(FakeWorker, self).__init__(worker_id)
627 self.gearman_jobs = {}
628 self.build_history = []
629 self.running_builds = []
630 self.build_counter = 0
631 self.fail_tests = {}
632 self.test = test
633
634 self.hold_jobs_in_build = False
635 self.lock = threading.Lock()
636 self.__work_thread = threading.Thread(target=self.work)
637 self.__work_thread.daemon = True
638 self.__work_thread.start()
639
640 def handleJob(self, job):
641 parts = job.name.split(":")
642 cmd = parts[0]
643 name = parts[1]
644 if len(parts) > 2:
645 node = parts[2]
646 else:
647 node = None
648 if cmd == 'build':
649 self.handleBuild(job, name, node)
650 elif cmd == 'stop':
651 self.handleStop(job, name)
652 elif cmd == 'set_description':
653 self.handleSetDescription(job, name)
654
655 def handleBuild(self, job, name, node):
656 build = FakeBuild(self, job, self.build_counter, node)
657 job.build = build
658 self.gearman_jobs[job.unique] = job
659 self.build_counter += 1
660
661 self.running_builds.append(build)
662 build.start()
663
664 def handleStop(self, job, name):
665 self.log.debug("handle stop")
666 parameters = json.loads(job.arguments)
667 name = parameters['name']
668 number = parameters['number']
669 for build in self.running_builds:
670 if build.name == name and build.number == number:
671 build.aborted = True
672 build.release()
673 job.sendWorkComplete()
674 return
675 job.sendWorkFail()
676
677 def handleSetDescription(self, job, name):
678 self.log.debug("handle set description")
679 parameters = json.loads(job.arguments)
680 name = parameters['name']
681 number = parameters['number']
682 descr = parameters['html_description']
683 for build in self.running_builds:
684 if build.name == name and build.number == number:
685 build.description = descr
686 job.sendWorkComplete()
687 return
688 for build in self.build_history:
689 if build.name == name and build.number == number:
690 build.description = descr
691 job.sendWorkComplete()
692 return
693 job.sendWorkFail()
694
695 def work(self):
696 while self.running:
697 try:
698 job = self.getJob()
699 except gear.InterruptedError:
700 continue
701 try:
702 self.handleJob(job)
703 except:
704 self.log.exception("Worker exception:")
705
706 def addFailTest(self, name, change):
707 l = self.fail_tests.get(name, [])
708 l.append(change)
709 self.fail_tests[name] = l
710
711 def shouldFailTest(self, name, ref):
712 l = self.fail_tests.get(name, [])
713 for change in l:
714 if self.test.ref_has_change(ref, change):
715 return True
716 return False
717
718 def release(self, regex=None):
719 builds = self.running_builds[:]
720 self.log.debug("releasing build %s (%s)" % (regex,
721 len(self.running_builds)))
722 for build in builds:
723 if not regex or re.match(regex, build.name):
724 self.log.debug("releasing build %s" %
725 (build.parameters['ZUUL_UUID']))
726 build.release()
727 else:
728 self.log.debug("not releasing build %s" %
729 (build.parameters['ZUUL_UUID']))
730 self.log.debug("done releasing builds %s (%s)" %
731 (regex, len(self.running_builds)))
732
733
734class FakeGearmanServer(gear.Server):
735 def __init__(self):
736 self.hold_jobs_in_queue = False
737 super(FakeGearmanServer, self).__init__(0)
738
739 def getJobForConnection(self, connection, peek=False):
740 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
741 for job in queue:
742 if not hasattr(job, 'waiting'):
743 if job.name.startswith('build:'):
744 job.waiting = self.hold_jobs_in_queue
745 else:
746 job.waiting = False
747 if job.waiting:
748 continue
749 if job.name in connection.functions:
750 if not peek:
751 queue.remove(job)
752 connection.related_jobs[job.handle] = job
753 job.worker_connection = connection
754 job.running = True
755 return job
756 return None
757
758 def release(self, regex=None):
759 released = False
760 qlen = (len(self.high_queue) + len(self.normal_queue) +
761 len(self.low_queue))
762 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
763 for job in self.getQueue():
764 cmd, name = job.name.split(':')
765 if cmd != 'build':
766 continue
767 if not regex or re.match(regex, name):
768 self.log.debug("releasing queued job %s" %
769 job.unique)
770 job.waiting = False
771 released = True
772 else:
773 self.log.debug("not releasing queued job %s" %
774 job.unique)
775 if released:
776 self.wakeConnections()
777 qlen = (len(self.high_queue) + len(self.normal_queue) +
778 len(self.low_queue))
779 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
780
781
782class FakeSMTP(object):
783 log = logging.getLogger('zuul.FakeSMTP')
784
785 def __init__(self, messages, server, port):
786 self.server = server
787 self.port = port
788 self.messages = messages
789
790 def sendmail(self, from_email, to_email, msg):
791 self.log.info("Sending email from %s, to %s, with msg %s" % (
792 from_email, to_email, msg))
793
794 headers = msg.split('\n\n', 1)[0]
795 body = msg.split('\n\n', 1)[1]
796
797 self.messages.append(dict(
798 from_email=from_email,
799 to_email=to_email,
800 msg=msg,
801 headers=headers,
802 body=body,
803 ))
804
805 return True
806
807 def quit(self):
808 return True
809
810
811class FakeSwiftClientConnection(swiftclient.client.Connection):
812 def post_account(self, headers):
813 # Do nothing
814 pass
815
816 def get_auth(self):
817 # Returns endpoint and (unused) auth token
818 endpoint = os.path.join('https://storage.example.org', 'V1',
819 'AUTH_account')
820 return endpoint, ''
821
822
Maru Newby3fe5f852015-01-13 04:22:14 +0000823class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700824 log = logging.getLogger("zuul.test")
825
826 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000827 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700828 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
829 try:
830 test_timeout = int(test_timeout)
831 except ValueError:
832 # If timeout value is invalid do not set a timeout.
833 test_timeout = 0
834 if test_timeout > 0:
835 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
836
837 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
838 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
839 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
840 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
841 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
842 os.environ.get('OS_STDERR_CAPTURE') == '1'):
843 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
844 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
845 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
846 os.environ.get('OS_LOG_CAPTURE') == '1'):
847 self.useFixture(fixtures.FakeLogger(
848 level=logging.DEBUG,
849 format='%(asctime)s %(name)-32s '
850 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000851
852
853class ZuulTestCase(BaseTestCase):
854
855 def setUp(self):
856 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700857 if USE_TEMPDIR:
858 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000859 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
860 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700861 else:
862 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700863 self.test_root = os.path.join(tmp_root, "zuul-test")
864 self.upstream_root = os.path.join(self.test_root, "upstream")
865 self.git_root = os.path.join(self.test_root, "git")
866
867 if os.path.exists(self.test_root):
868 shutil.rmtree(self.test_root)
869 os.makedirs(self.test_root)
870 os.makedirs(self.upstream_root)
871 os.makedirs(self.git_root)
872
873 # Make per test copy of Configuration.
874 self.setup_config()
875 self.config.set('zuul', 'layout_config',
876 os.path.join(FIXTURE_DIR, "layout.yaml"))
877 self.config.set('merger', 'git_dir', self.git_root)
878
879 # For each project in config:
880 self.init_repo("org/project")
881 self.init_repo("org/project1")
882 self.init_repo("org/project2")
883 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700884 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700885 self.init_repo("org/project5")
886 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700887 self.init_repo("org/one-job-project")
888 self.init_repo("org/nonvoting-project")
889 self.init_repo("org/templated-project")
890 self.init_repo("org/layered-project")
891 self.init_repo("org/node-project")
892 self.init_repo("org/conflict-project")
893 self.init_repo("org/noop-project")
894 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000895 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700896
897 self.statsd = FakeStatsd()
898 os.environ['STATSD_HOST'] = 'localhost'
899 os.environ['STATSD_PORT'] = str(self.statsd.port)
900 self.statsd.start()
901 # the statsd client object is configured in the statsd module import
902 reload(statsd)
903 reload(zuul.scheduler)
904
905 self.gearman_server = FakeGearmanServer()
906
907 self.config.set('gearman', 'port', str(self.gearman_server.port))
908
909 self.worker = FakeWorker('fake_worker', self)
910 self.worker.addServer('127.0.0.1', self.gearman_server.port)
911 self.gearman_server.worker = self.worker
912
913 self.merge_server = zuul.merger.server.MergeServer(self.config)
914 self.merge_server.start()
915
916 self.sched = zuul.scheduler.Scheduler()
917
918 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
919 FakeSwiftClientConnection))
920 self.swift = zuul.lib.swift.Swift(self.config)
921
922 def URLOpenerFactory(*args, **kw):
923 if isinstance(args[0], urllib2.Request):
924 return old_urlopen(*args, **kw)
925 args = [self.fake_gerrit] + list(args)
926 return FakeURLOpener(self.upstream_root, *args, **kw)
927
928 old_urlopen = urllib2.urlopen
929 urllib2.urlopen = URLOpenerFactory
930
931 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
932 self.swift)
933 self.merge_client = zuul.merger.client.MergeClient(
934 self.config, self.sched)
935
936 self.smtp_messages = []
937
938 def FakeSMTPFactory(*args, **kw):
939 args = [self.smtp_messages] + list(args)
940 return FakeSMTP(*args, **kw)
941
942 zuul.lib.gerrit.Gerrit = FakeGerrit
943 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
944
945 self.gerrit = FakeGerritTrigger(
946 self.upstream_root, self.config, self.sched)
947 self.gerrit.replication_timeout = 1.5
948 self.gerrit.replication_retry_interval = 0.5
949 self.fake_gerrit = self.gerrit.gerrit
950 self.fake_gerrit.upstream_root = self.upstream_root
951
952 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
953 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
954
955 self.sched.setLauncher(self.launcher)
956 self.sched.setMerger(self.merge_client)
957 self.sched.registerTrigger(self.gerrit)
958 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
959 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000960 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
961 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700962 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700963
964 self.sched.registerReporter(
965 zuul.reporter.gerrit.Reporter(self.gerrit))
966 self.smtp_reporter = zuul.reporter.smtp.Reporter(
967 self.config.get('smtp', 'default_from'),
968 self.config.get('smtp', 'default_to'),
969 self.config.get('smtp', 'server'))
970 self.sched.registerReporter(self.smtp_reporter)
971
972 self.sched.start()
973 self.sched.reconfigure(self.config)
974 self.sched.resume()
975 self.webapp.start()
976 self.rpc.start()
977 self.launcher.gearman.waitForServer()
978 self.registerJobs()
979 self.builds = self.worker.running_builds
980 self.history = self.worker.build_history
981
982 self.addCleanup(self.assertFinalState)
983 self.addCleanup(self.shutdown)
984
985 def setup_config(self):
986 """Per test config object. Override to set different config."""
987 self.config = ConfigParser.ConfigParser()
988 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
989
990 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700991 # Make sure that git.Repo objects have been garbage collected.
992 repos = []
993 gc.collect()
994 for obj in gc.get_objects():
995 if isinstance(obj, git.Repo):
996 repos.append(obj)
997 self.assertEqual(len(repos), 0)
998 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -0800999 for pipeline in self.sched.layout.pipelines.values():
1000 if isinstance(pipeline.manager,
1001 zuul.scheduler.IndependentPipelineManager):
1002 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001003
1004 def shutdown(self):
1005 self.log.debug("Shutting down after tests")
1006 self.launcher.stop()
1007 self.merge_server.stop()
1008 self.merge_server.join()
1009 self.merge_client.stop()
1010 self.worker.shutdown()
1011 self.gerrit.stop()
1012 self.timer.stop()
1013 self.sched.stop()
1014 self.sched.join()
1015 self.statsd.stop()
1016 self.statsd.join()
1017 self.webapp.stop()
1018 self.webapp.join()
1019 self.rpc.stop()
1020 self.rpc.join()
1021 self.gearman_server.shutdown()
1022 threads = threading.enumerate()
1023 if len(threads) > 1:
1024 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001025
1026 def init_repo(self, project):
1027 parts = project.split('/')
1028 path = os.path.join(self.upstream_root, *parts[:-1])
1029 if not os.path.exists(path):
1030 os.makedirs(path)
1031 path = os.path.join(self.upstream_root, project)
1032 repo = git.Repo.init(path)
1033
1034 repo.config_writer().set_value('user', 'email', 'user@example.com')
1035 repo.config_writer().set_value('user', 'name', 'User Name')
1036 repo.config_writer().write()
1037
1038 fn = os.path.join(path, 'README')
1039 f = open(fn, 'w')
1040 f.write("test\n")
1041 f.close()
1042 repo.index.add([fn])
1043 repo.index.commit('initial commit')
1044 master = repo.create_head('master')
1045 repo.create_tag('init')
1046
James E. Blair97d902e2014-08-21 13:25:56 -07001047 repo.head.reference = master
1048 repo.head.reset(index=True, working_tree=True)
1049 repo.git.clean('-x', '-f', '-d')
1050
1051 self.create_branch(project, 'mp')
1052
1053 def create_branch(self, project, branch):
1054 path = os.path.join(self.upstream_root, project)
1055 repo = git.Repo.init(path)
1056 fn = os.path.join(path, 'README')
1057
1058 branch_head = repo.create_head(branch)
1059 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001060 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001061 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001062 f.close()
1063 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001064 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001065
James E. Blair97d902e2014-08-21 13:25:56 -07001066 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001067 repo.head.reset(index=True, working_tree=True)
1068 repo.git.clean('-x', '-f', '-d')
1069
1070 def ref_has_change(self, ref, change):
1071 path = os.path.join(self.git_root, change.project)
1072 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001073 try:
1074 for commit in repo.iter_commits(ref):
1075 if commit.message.strip() == ('%s-1' % change.subject):
1076 return True
1077 except GitCommandError:
1078 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001079 return False
1080
1081 def job_has_changes(self, *args):
1082 job = args[0]
1083 commits = args[1:]
1084 if isinstance(job, FakeBuild):
1085 parameters = job.parameters
1086 else:
1087 parameters = json.loads(job.arguments)
1088 project = parameters['ZUUL_PROJECT']
1089 path = os.path.join(self.git_root, project)
1090 repo = git.Repo(path)
1091 ref = parameters['ZUUL_REF']
1092 sha = parameters['ZUUL_COMMIT']
1093 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1094 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1095 commit_messages = ['%s-1' % commit.subject for commit in commits]
1096 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1097 " repo_messages %s; sha %s" % (job, commit_messages,
1098 repo_messages, sha))
1099 for msg in commit_messages:
1100 if msg not in repo_messages:
1101 self.log.debug(" messages do not match")
1102 return False
1103 if repo_shas[0] != sha:
1104 self.log.debug(" sha does not match")
1105 return False
1106 self.log.debug(" OK")
1107 return True
1108
1109 def registerJobs(self):
1110 count = 0
1111 for job in self.sched.layout.jobs.keys():
1112 self.worker.registerFunction('build:' + job)
1113 count += 1
1114 self.worker.registerFunction('stop:' + self.worker.worker_id)
1115 count += 1
1116
1117 while len(self.gearman_server.functions) < count:
1118 time.sleep(0)
1119
James E. Blairb8c16472015-05-05 14:55:26 -07001120 def orderedRelease(self):
1121 # Run one build at a time to ensure non-race order:
1122 while len(self.builds):
1123 self.release(self.builds[0])
1124 self.waitUntilSettled()
1125
Clark Boylanb640e052014-04-03 16:41:46 -07001126 def release(self, job):
1127 if isinstance(job, FakeBuild):
1128 job.release()
1129 else:
1130 job.waiting = False
1131 self.log.debug("Queued job %s released" % job.unique)
1132 self.gearman_server.wakeConnections()
1133
1134 def getParameter(self, job, name):
1135 if isinstance(job, FakeBuild):
1136 return job.parameters[name]
1137 else:
1138 parameters = json.loads(job.arguments)
1139 return parameters[name]
1140
1141 def resetGearmanServer(self):
1142 self.worker.setFunctions([])
1143 while True:
1144 done = True
1145 for connection in self.gearman_server.active_connections:
1146 if (connection.functions and
1147 connection.client_id not in ['Zuul RPC Listener',
1148 'Zuul Merger']):
1149 done = False
1150 if done:
1151 break
1152 time.sleep(0)
1153 self.gearman_server.functions = set()
1154 self.rpc.register()
1155 self.merge_server.register()
1156
1157 def haveAllBuildsReported(self):
1158 # See if Zuul is waiting on a meta job to complete
1159 if self.launcher.meta_jobs:
1160 return False
1161 # Find out if every build that the worker has completed has been
1162 # reported back to Zuul. If it hasn't then that means a Gearman
1163 # event is still in transit and the system is not stable.
1164 for build in self.worker.build_history:
1165 zbuild = self.launcher.builds.get(build.uuid)
1166 if not zbuild:
1167 # It has already been reported
1168 continue
1169 # It hasn't been reported yet.
1170 return False
1171 # Make sure that none of the worker connections are in GRAB_WAIT
1172 for connection in self.worker.active_connections:
1173 if connection.state == 'GRAB_WAIT':
1174 return False
1175 return True
1176
1177 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001178 builds = self.launcher.builds.values()
1179 for build in builds:
1180 client_job = None
1181 for conn in self.launcher.gearman.active_connections:
1182 for j in conn.related_jobs.values():
1183 if j.unique == build.uuid:
1184 client_job = j
1185 break
1186 if not client_job:
1187 self.log.debug("%s is not known to the gearman client" %
1188 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001189 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001190 if not client_job.handle:
1191 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001192 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001193 server_job = self.gearman_server.jobs.get(client_job.handle)
1194 if not server_job:
1195 self.log.debug("%s is not known to the gearman server" %
1196 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001197 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001198 if not hasattr(server_job, 'waiting'):
1199 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001200 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001201 if server_job.waiting:
1202 continue
1203 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1204 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001205 if build.number is None:
1206 self.log.debug("%s has not reported start" % worker_job)
1207 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001208 if worker_job.build.isWaiting():
1209 continue
1210 else:
1211 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001212 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001213 else:
1214 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001215 return False
1216 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001217
1218 def waitUntilSettled(self):
1219 self.log.debug("Waiting until settled...")
1220 start = time.time()
1221 while True:
1222 if time.time() - start > 10:
1223 print 'queue status:',
1224 print self.sched.trigger_event_queue.empty(),
1225 print self.sched.result_event_queue.empty(),
1226 print self.fake_gerrit.event_queue.empty(),
1227 print self.areAllBuildsWaiting()
1228 raise Exception("Timeout waiting for Zuul to settle")
1229 # Make sure no new events show up while we're checking
1230 self.worker.lock.acquire()
1231 # have all build states propogated to zuul?
1232 if self.haveAllBuildsReported():
1233 # Join ensures that the queue is empty _and_ events have been
1234 # processed
1235 self.fake_gerrit.event_queue.join()
1236 self.sched.trigger_event_queue.join()
1237 self.sched.result_event_queue.join()
1238 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001239 if (not self.merge_client.build_sets and
1240 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001241 self.sched.result_event_queue.empty() and
1242 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001243 self.haveAllBuildsReported() and
1244 self.areAllBuildsWaiting()):
1245 self.sched.run_handler_lock.release()
1246 self.worker.lock.release()
1247 self.log.debug("...settled.")
1248 return
1249 self.sched.run_handler_lock.release()
1250 self.worker.lock.release()
1251 self.sched.wake_event.wait(0.1)
1252
1253 def countJobResults(self, jobs, result):
1254 jobs = filter(lambda x: x.result == result, jobs)
1255 return len(jobs)
1256
1257 def getJobFromHistory(self, name):
1258 history = self.worker.build_history
1259 for job in history:
1260 if job.name == name:
1261 return job
1262 raise Exception("Unable to find job %s in history" % name)
1263
1264 def assertEmptyQueues(self):
1265 # Make sure there are no orphaned jobs
1266 for pipeline in self.sched.layout.pipelines.values():
1267 for queue in pipeline.queues:
1268 if len(queue.queue) != 0:
1269 print 'pipeline %s queue %s contents %s' % (
1270 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001271 self.assertEqual(len(queue.queue), 0,
1272 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001273
1274 def assertReportedStat(self, key, value=None, kind=None):
1275 start = time.time()
1276 while time.time() < (start + 5):
1277 for stat in self.statsd.stats:
1278 pprint.pprint(self.statsd.stats)
1279 k, v = stat.split(':')
1280 if key == k:
1281 if value is None and kind is None:
1282 return
1283 elif value:
1284 if value == v:
1285 return
1286 elif kind:
1287 if v.endswith('|' + kind):
1288 return
1289 time.sleep(0.1)
1290
1291 pprint.pprint(self.statsd.stats)
1292 raise Exception("Key %s not found in reported stats" % key)