blob: 8c96d18a5298aa23445d24cca56e87f46404fbfa [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700381 log = logging.getLogger("zuul.test.FakeGerrit")
382
Clark Boylanb640e052014-04-03 16:41:46 -0700383 def __init__(self, *args, **kw):
384 self.event_queue = Queue.Queue()
385 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
386 self.change_number = 0
387 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700388 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700389
390 def addFakeChange(self, project, branch, subject, status='NEW'):
391 self.change_number += 1
392 c = FakeChange(self, self.change_number, project, branch, subject,
393 upstream_root=self.upstream_root,
394 status=status)
395 self.changes[self.change_number] = c
396 return c
397
398 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700399 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700400
401 def getEvent(self):
402 return self.event_queue.get()
403
404 def eventDone(self):
405 self.event_queue.task_done()
406
407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
410 change.messages.append(message)
411 if 'submit' in action:
412 change.setMerged()
413 if message:
414 change.setReported()
415
416 def query(self, number):
417 change = self.changes.get(int(number))
418 if change:
419 return change.query()
420 return {}
421
James E. Blairc494d542014-08-06 09:23:52 -0700422 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700423 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700424 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800425 if query.startswith('change:'):
426 # Query a specific changeid
427 changeid = query[len('change:'):]
428 l = [change.query() for change in self.changes.values()
429 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700430 elif query.startswith('message:'):
431 # Query the content of a commit message
432 msg = query[len('message:'):].strip()
433 l = [change.query() for change in self.changes.values()
434 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800435 else:
436 # Query all open changes
437 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700438 return l
James E. Blairc494d542014-08-06 09:23:52 -0700439
Clark Boylanb640e052014-04-03 16:41:46 -0700440 def startWatching(self, *args, **kw):
441 pass
442
443
444class BuildHistory(object):
445 def __init__(self, **kw):
446 self.__dict__.update(kw)
447
448 def __repr__(self):
449 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
450 (self.result, self.name, self.number, self.changes))
451
452
453class FakeURLOpener(object):
454 def __init__(self, upstream_root, fake_gerrit, url):
455 self.upstream_root = upstream_root
456 self.fake_gerrit = fake_gerrit
457 self.url = url
458
459 def read(self):
460 res = urlparse.urlparse(self.url)
461 path = res.path
462 project = '/'.join(path.split('/')[2:-2])
463 ret = '001e# service=git-upload-pack\n'
464 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
465 'multi_ack thin-pack side-band side-band-64k ofs-delta '
466 'shallow no-progress include-tag multi_ack_detailed no-done\n')
467 path = os.path.join(self.upstream_root, project)
468 repo = git.Repo(path)
469 for ref in repo.refs:
470 r = ref.object.hexsha + ' ' + ref.path + '\n'
471 ret += '%04x%s' % (len(r) + 4, r)
472 ret += '0000'
473 return ret
474
475
476class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
477 name = 'gerrit'
478
479 def __init__(self, upstream_root, *args):
480 super(FakeGerritTrigger, self).__init__(*args)
481 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700482 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700483
484 def getGitUrl(self, project):
485 return os.path.join(self.upstream_root, project.name)
486
487
488class FakeStatsd(threading.Thread):
489 def __init__(self):
490 threading.Thread.__init__(self)
491 self.daemon = True
492 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
493 self.sock.bind(('', 0))
494 self.port = self.sock.getsockname()[1]
495 self.wake_read, self.wake_write = os.pipe()
496 self.stats = []
497
498 def run(self):
499 while True:
500 poll = select.poll()
501 poll.register(self.sock, select.POLLIN)
502 poll.register(self.wake_read, select.POLLIN)
503 ret = poll.poll()
504 for (fd, event) in ret:
505 if fd == self.sock.fileno():
506 data = self.sock.recvfrom(1024)
507 if not data:
508 return
509 self.stats.append(data[0])
510 if fd == self.wake_read:
511 return
512
513 def stop(self):
514 os.write(self.wake_write, '1\n')
515
516
517class FakeBuild(threading.Thread):
518 log = logging.getLogger("zuul.test")
519
520 def __init__(self, worker, job, number, node):
521 threading.Thread.__init__(self)
522 self.daemon = True
523 self.worker = worker
524 self.job = job
525 self.name = job.name.split(':')[1]
526 self.number = number
527 self.node = node
528 self.parameters = json.loads(job.arguments)
529 self.unique = self.parameters['ZUUL_UUID']
530 self.wait_condition = threading.Condition()
531 self.waiting = False
532 self.aborted = False
533 self.created = time.time()
534 self.description = ''
535 self.run_error = False
536
537 def release(self):
538 self.wait_condition.acquire()
539 self.wait_condition.notify()
540 self.waiting = False
541 self.log.debug("Build %s released" % self.unique)
542 self.wait_condition.release()
543
544 def isWaiting(self):
545 self.wait_condition.acquire()
546 if self.waiting:
547 ret = True
548 else:
549 ret = False
550 self.wait_condition.release()
551 return ret
552
553 def _wait(self):
554 self.wait_condition.acquire()
555 self.waiting = True
556 self.log.debug("Build %s waiting" % self.unique)
557 self.wait_condition.wait()
558 self.wait_condition.release()
559
560 def run(self):
561 data = {
562 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
563 'name': self.name,
564 'number': self.number,
565 'manager': self.worker.worker_id,
566 'worker_name': 'My Worker',
567 'worker_hostname': 'localhost',
568 'worker_ips': ['127.0.0.1', '192.168.1.1'],
569 'worker_fqdn': 'zuul.example.org',
570 'worker_program': 'FakeBuilder',
571 'worker_version': 'v1.1',
572 'worker_extra': {'something': 'else'}
573 }
574
575 self.log.debug('Running build %s' % self.unique)
576
577 self.job.sendWorkData(json.dumps(data))
578 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
579 self.job.sendWorkStatus(0, 100)
580
581 if self.worker.hold_jobs_in_build:
582 self.log.debug('Holding build %s' % self.unique)
583 self._wait()
584 self.log.debug("Build %s continuing" % self.unique)
585
586 self.worker.lock.acquire()
587
588 result = 'SUCCESS'
589 if (('ZUUL_REF' in self.parameters) and
590 self.worker.shouldFailTest(self.name,
591 self.parameters['ZUUL_REF'])):
592 result = 'FAILURE'
593 if self.aborted:
594 result = 'ABORTED'
595
596 if self.run_error:
597 work_fail = True
598 result = 'RUN_ERROR'
599 else:
600 data['result'] = result
601 work_fail = False
602
603 changes = None
604 if 'ZUUL_CHANGE_IDS' in self.parameters:
605 changes = self.parameters['ZUUL_CHANGE_IDS']
606
607 self.worker.build_history.append(
608 BuildHistory(name=self.name, number=self.number,
609 result=result, changes=changes, node=self.node,
610 uuid=self.unique, description=self.description,
611 pipeline=self.parameters['ZUUL_PIPELINE'])
612 )
613
614 self.job.sendWorkData(json.dumps(data))
615 if work_fail:
616 self.job.sendWorkFail()
617 else:
618 self.job.sendWorkComplete(json.dumps(data))
619 del self.worker.gearman_jobs[self.job.unique]
620 self.worker.running_builds.remove(self)
621 self.worker.lock.release()
622
623
624class FakeWorker(gear.Worker):
625 def __init__(self, worker_id, test):
626 super(FakeWorker, self).__init__(worker_id)
627 self.gearman_jobs = {}
628 self.build_history = []
629 self.running_builds = []
630 self.build_counter = 0
631 self.fail_tests = {}
632 self.test = test
633
634 self.hold_jobs_in_build = False
635 self.lock = threading.Lock()
636 self.__work_thread = threading.Thread(target=self.work)
637 self.__work_thread.daemon = True
638 self.__work_thread.start()
639
640 def handleJob(self, job):
641 parts = job.name.split(":")
642 cmd = parts[0]
643 name = parts[1]
644 if len(parts) > 2:
645 node = parts[2]
646 else:
647 node = None
648 if cmd == 'build':
649 self.handleBuild(job, name, node)
650 elif cmd == 'stop':
651 self.handleStop(job, name)
652 elif cmd == 'set_description':
653 self.handleSetDescription(job, name)
654
655 def handleBuild(self, job, name, node):
656 build = FakeBuild(self, job, self.build_counter, node)
657 job.build = build
658 self.gearman_jobs[job.unique] = job
659 self.build_counter += 1
660
661 self.running_builds.append(build)
662 build.start()
663
664 def handleStop(self, job, name):
665 self.log.debug("handle stop")
666 parameters = json.loads(job.arguments)
667 name = parameters['name']
668 number = parameters['number']
669 for build in self.running_builds:
670 if build.name == name and build.number == number:
671 build.aborted = True
672 build.release()
673 job.sendWorkComplete()
674 return
675 job.sendWorkFail()
676
677 def handleSetDescription(self, job, name):
678 self.log.debug("handle set description")
679 parameters = json.loads(job.arguments)
680 name = parameters['name']
681 number = parameters['number']
682 descr = parameters['html_description']
683 for build in self.running_builds:
684 if build.name == name and build.number == number:
685 build.description = descr
686 job.sendWorkComplete()
687 return
688 for build in self.build_history:
689 if build.name == name and build.number == number:
690 build.description = descr
691 job.sendWorkComplete()
692 return
693 job.sendWorkFail()
694
695 def work(self):
696 while self.running:
697 try:
698 job = self.getJob()
699 except gear.InterruptedError:
700 continue
701 try:
702 self.handleJob(job)
703 except:
704 self.log.exception("Worker exception:")
705
706 def addFailTest(self, name, change):
707 l = self.fail_tests.get(name, [])
708 l.append(change)
709 self.fail_tests[name] = l
710
711 def shouldFailTest(self, name, ref):
712 l = self.fail_tests.get(name, [])
713 for change in l:
714 if self.test.ref_has_change(ref, change):
715 return True
716 return False
717
718 def release(self, regex=None):
719 builds = self.running_builds[:]
720 self.log.debug("releasing build %s (%s)" % (regex,
721 len(self.running_builds)))
722 for build in builds:
723 if not regex or re.match(regex, build.name):
724 self.log.debug("releasing build %s" %
725 (build.parameters['ZUUL_UUID']))
726 build.release()
727 else:
728 self.log.debug("not releasing build %s" %
729 (build.parameters['ZUUL_UUID']))
730 self.log.debug("done releasing builds %s (%s)" %
731 (regex, len(self.running_builds)))
732
733
734class FakeGearmanServer(gear.Server):
735 def __init__(self):
736 self.hold_jobs_in_queue = False
737 super(FakeGearmanServer, self).__init__(0)
738
739 def getJobForConnection(self, connection, peek=False):
740 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
741 for job in queue:
742 if not hasattr(job, 'waiting'):
743 if job.name.startswith('build:'):
744 job.waiting = self.hold_jobs_in_queue
745 else:
746 job.waiting = False
747 if job.waiting:
748 continue
749 if job.name in connection.functions:
750 if not peek:
751 queue.remove(job)
752 connection.related_jobs[job.handle] = job
753 job.worker_connection = connection
754 job.running = True
755 return job
756 return None
757
758 def release(self, regex=None):
759 released = False
760 qlen = (len(self.high_queue) + len(self.normal_queue) +
761 len(self.low_queue))
762 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
763 for job in self.getQueue():
764 cmd, name = job.name.split(':')
765 if cmd != 'build':
766 continue
767 if not regex or re.match(regex, name):
768 self.log.debug("releasing queued job %s" %
769 job.unique)
770 job.waiting = False
771 released = True
772 else:
773 self.log.debug("not releasing queued job %s" %
774 job.unique)
775 if released:
776 self.wakeConnections()
777 qlen = (len(self.high_queue) + len(self.normal_queue) +
778 len(self.low_queue))
779 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
780
781
782class FakeSMTP(object):
783 log = logging.getLogger('zuul.FakeSMTP')
784
785 def __init__(self, messages, server, port):
786 self.server = server
787 self.port = port
788 self.messages = messages
789
790 def sendmail(self, from_email, to_email, msg):
791 self.log.info("Sending email from %s, to %s, with msg %s" % (
792 from_email, to_email, msg))
793
794 headers = msg.split('\n\n', 1)[0]
795 body = msg.split('\n\n', 1)[1]
796
797 self.messages.append(dict(
798 from_email=from_email,
799 to_email=to_email,
800 msg=msg,
801 headers=headers,
802 body=body,
803 ))
804
805 return True
806
807 def quit(self):
808 return True
809
810
811class FakeSwiftClientConnection(swiftclient.client.Connection):
812 def post_account(self, headers):
813 # Do nothing
814 pass
815
816 def get_auth(self):
817 # Returns endpoint and (unused) auth token
818 endpoint = os.path.join('https://storage.example.org', 'V1',
819 'AUTH_account')
820 return endpoint, ''
821
822
Maru Newby3fe5f852015-01-13 04:22:14 +0000823class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700824 log = logging.getLogger("zuul.test")
825
826 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000827 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700828 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
829 try:
830 test_timeout = int(test_timeout)
831 except ValueError:
832 # If timeout value is invalid do not set a timeout.
833 test_timeout = 0
834 if test_timeout > 0:
835 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
836
837 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
838 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
839 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
840 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
841 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
842 os.environ.get('OS_STDERR_CAPTURE') == '1'):
843 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
844 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
845 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
846 os.environ.get('OS_LOG_CAPTURE') == '1'):
847 self.useFixture(fixtures.FakeLogger(
848 level=logging.DEBUG,
849 format='%(asctime)s %(name)-32s '
850 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000851
852
853class ZuulTestCase(BaseTestCase):
854
855 def setUp(self):
856 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700857 if USE_TEMPDIR:
858 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000859 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
860 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700861 else:
862 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700863 self.test_root = os.path.join(tmp_root, "zuul-test")
864 self.upstream_root = os.path.join(self.test_root, "upstream")
865 self.git_root = os.path.join(self.test_root, "git")
866
867 if os.path.exists(self.test_root):
868 shutil.rmtree(self.test_root)
869 os.makedirs(self.test_root)
870 os.makedirs(self.upstream_root)
871 os.makedirs(self.git_root)
872
873 # Make per test copy of Configuration.
874 self.setup_config()
875 self.config.set('zuul', 'layout_config',
876 os.path.join(FIXTURE_DIR, "layout.yaml"))
877 self.config.set('merger', 'git_dir', self.git_root)
878
879 # For each project in config:
880 self.init_repo("org/project")
881 self.init_repo("org/project1")
882 self.init_repo("org/project2")
883 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700884 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700885 self.init_repo("org/project5")
886 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700887 self.init_repo("org/one-job-project")
888 self.init_repo("org/nonvoting-project")
889 self.init_repo("org/templated-project")
890 self.init_repo("org/layered-project")
891 self.init_repo("org/node-project")
892 self.init_repo("org/conflict-project")
893 self.init_repo("org/noop-project")
894 self.init_repo("org/experimental-project")
895
896 self.statsd = FakeStatsd()
897 os.environ['STATSD_HOST'] = 'localhost'
898 os.environ['STATSD_PORT'] = str(self.statsd.port)
899 self.statsd.start()
900 # the statsd client object is configured in the statsd module import
901 reload(statsd)
902 reload(zuul.scheduler)
903
904 self.gearman_server = FakeGearmanServer()
905
906 self.config.set('gearman', 'port', str(self.gearman_server.port))
907
908 self.worker = FakeWorker('fake_worker', self)
909 self.worker.addServer('127.0.0.1', self.gearman_server.port)
910 self.gearman_server.worker = self.worker
911
912 self.merge_server = zuul.merger.server.MergeServer(self.config)
913 self.merge_server.start()
914
915 self.sched = zuul.scheduler.Scheduler()
916
917 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
918 FakeSwiftClientConnection))
919 self.swift = zuul.lib.swift.Swift(self.config)
920
921 def URLOpenerFactory(*args, **kw):
922 if isinstance(args[0], urllib2.Request):
923 return old_urlopen(*args, **kw)
924 args = [self.fake_gerrit] + list(args)
925 return FakeURLOpener(self.upstream_root, *args, **kw)
926
927 old_urlopen = urllib2.urlopen
928 urllib2.urlopen = URLOpenerFactory
929
930 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
931 self.swift)
932 self.merge_client = zuul.merger.client.MergeClient(
933 self.config, self.sched)
934
935 self.smtp_messages = []
936
937 def FakeSMTPFactory(*args, **kw):
938 args = [self.smtp_messages] + list(args)
939 return FakeSMTP(*args, **kw)
940
941 zuul.lib.gerrit.Gerrit = FakeGerrit
942 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
943
944 self.gerrit = FakeGerritTrigger(
945 self.upstream_root, self.config, self.sched)
946 self.gerrit.replication_timeout = 1.5
947 self.gerrit.replication_retry_interval = 0.5
948 self.fake_gerrit = self.gerrit.gerrit
949 self.fake_gerrit.upstream_root = self.upstream_root
950
951 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
952 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
953
954 self.sched.setLauncher(self.launcher)
955 self.sched.setMerger(self.merge_client)
956 self.sched.registerTrigger(self.gerrit)
957 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
958 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000959 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
960 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700961 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700962
963 self.sched.registerReporter(
964 zuul.reporter.gerrit.Reporter(self.gerrit))
965 self.smtp_reporter = zuul.reporter.smtp.Reporter(
966 self.config.get('smtp', 'default_from'),
967 self.config.get('smtp', 'default_to'),
968 self.config.get('smtp', 'server'))
969 self.sched.registerReporter(self.smtp_reporter)
970
971 self.sched.start()
972 self.sched.reconfigure(self.config)
973 self.sched.resume()
974 self.webapp.start()
975 self.rpc.start()
976 self.launcher.gearman.waitForServer()
977 self.registerJobs()
978 self.builds = self.worker.running_builds
979 self.history = self.worker.build_history
980
981 self.addCleanup(self.assertFinalState)
982 self.addCleanup(self.shutdown)
983
984 def setup_config(self):
985 """Per test config object. Override to set different config."""
986 self.config = ConfigParser.ConfigParser()
987 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
988
989 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700990 # Make sure that git.Repo objects have been garbage collected.
991 repos = []
992 gc.collect()
993 for obj in gc.get_objects():
994 if isinstance(obj, git.Repo):
995 repos.append(obj)
996 self.assertEqual(len(repos), 0)
997 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -0800998 for pipeline in self.sched.layout.pipelines.values():
999 if isinstance(pipeline.manager,
1000 zuul.scheduler.IndependentPipelineManager):
1001 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001002
1003 def shutdown(self):
1004 self.log.debug("Shutting down after tests")
1005 self.launcher.stop()
1006 self.merge_server.stop()
1007 self.merge_server.join()
1008 self.merge_client.stop()
1009 self.worker.shutdown()
1010 self.gerrit.stop()
1011 self.timer.stop()
1012 self.sched.stop()
1013 self.sched.join()
1014 self.statsd.stop()
1015 self.statsd.join()
1016 self.webapp.stop()
1017 self.webapp.join()
1018 self.rpc.stop()
1019 self.rpc.join()
1020 self.gearman_server.shutdown()
1021 threads = threading.enumerate()
1022 if len(threads) > 1:
1023 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001024
1025 def init_repo(self, project):
1026 parts = project.split('/')
1027 path = os.path.join(self.upstream_root, *parts[:-1])
1028 if not os.path.exists(path):
1029 os.makedirs(path)
1030 path = os.path.join(self.upstream_root, project)
1031 repo = git.Repo.init(path)
1032
1033 repo.config_writer().set_value('user', 'email', 'user@example.com')
1034 repo.config_writer().set_value('user', 'name', 'User Name')
1035 repo.config_writer().write()
1036
1037 fn = os.path.join(path, 'README')
1038 f = open(fn, 'w')
1039 f.write("test\n")
1040 f.close()
1041 repo.index.add([fn])
1042 repo.index.commit('initial commit')
1043 master = repo.create_head('master')
1044 repo.create_tag('init')
1045
James E. Blair97d902e2014-08-21 13:25:56 -07001046 repo.head.reference = master
1047 repo.head.reset(index=True, working_tree=True)
1048 repo.git.clean('-x', '-f', '-d')
1049
1050 self.create_branch(project, 'mp')
1051
1052 def create_branch(self, project, branch):
1053 path = os.path.join(self.upstream_root, project)
1054 repo = git.Repo.init(path)
1055 fn = os.path.join(path, 'README')
1056
1057 branch_head = repo.create_head(branch)
1058 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001059 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001060 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001061 f.close()
1062 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001063 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001064
James E. Blair97d902e2014-08-21 13:25:56 -07001065 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001066 repo.head.reset(index=True, working_tree=True)
1067 repo.git.clean('-x', '-f', '-d')
1068
1069 def ref_has_change(self, ref, change):
1070 path = os.path.join(self.git_root, change.project)
1071 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001072 try:
1073 for commit in repo.iter_commits(ref):
1074 if commit.message.strip() == ('%s-1' % change.subject):
1075 return True
1076 except GitCommandError:
1077 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001078 return False
1079
1080 def job_has_changes(self, *args):
1081 job = args[0]
1082 commits = args[1:]
1083 if isinstance(job, FakeBuild):
1084 parameters = job.parameters
1085 else:
1086 parameters = json.loads(job.arguments)
1087 project = parameters['ZUUL_PROJECT']
1088 path = os.path.join(self.git_root, project)
1089 repo = git.Repo(path)
1090 ref = parameters['ZUUL_REF']
1091 sha = parameters['ZUUL_COMMIT']
1092 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1093 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1094 commit_messages = ['%s-1' % commit.subject for commit in commits]
1095 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1096 " repo_messages %s; sha %s" % (job, commit_messages,
1097 repo_messages, sha))
1098 for msg in commit_messages:
1099 if msg not in repo_messages:
1100 self.log.debug(" messages do not match")
1101 return False
1102 if repo_shas[0] != sha:
1103 self.log.debug(" sha does not match")
1104 return False
1105 self.log.debug(" OK")
1106 return True
1107
1108 def registerJobs(self):
1109 count = 0
1110 for job in self.sched.layout.jobs.keys():
1111 self.worker.registerFunction('build:' + job)
1112 count += 1
1113 self.worker.registerFunction('stop:' + self.worker.worker_id)
1114 count += 1
1115
1116 while len(self.gearman_server.functions) < count:
1117 time.sleep(0)
1118
James E. Blairb8c16472015-05-05 14:55:26 -07001119 def orderedRelease(self):
1120 # Run one build at a time to ensure non-race order:
1121 while len(self.builds):
1122 self.release(self.builds[0])
1123 self.waitUntilSettled()
1124
Clark Boylanb640e052014-04-03 16:41:46 -07001125 def release(self, job):
1126 if isinstance(job, FakeBuild):
1127 job.release()
1128 else:
1129 job.waiting = False
1130 self.log.debug("Queued job %s released" % job.unique)
1131 self.gearman_server.wakeConnections()
1132
1133 def getParameter(self, job, name):
1134 if isinstance(job, FakeBuild):
1135 return job.parameters[name]
1136 else:
1137 parameters = json.loads(job.arguments)
1138 return parameters[name]
1139
1140 def resetGearmanServer(self):
1141 self.worker.setFunctions([])
1142 while True:
1143 done = True
1144 for connection in self.gearman_server.active_connections:
1145 if (connection.functions and
1146 connection.client_id not in ['Zuul RPC Listener',
1147 'Zuul Merger']):
1148 done = False
1149 if done:
1150 break
1151 time.sleep(0)
1152 self.gearman_server.functions = set()
1153 self.rpc.register()
1154 self.merge_server.register()
1155
1156 def haveAllBuildsReported(self):
1157 # See if Zuul is waiting on a meta job to complete
1158 if self.launcher.meta_jobs:
1159 return False
1160 # Find out if every build that the worker has completed has been
1161 # reported back to Zuul. If it hasn't then that means a Gearman
1162 # event is still in transit and the system is not stable.
1163 for build in self.worker.build_history:
1164 zbuild = self.launcher.builds.get(build.uuid)
1165 if not zbuild:
1166 # It has already been reported
1167 continue
1168 # It hasn't been reported yet.
1169 return False
1170 # Make sure that none of the worker connections are in GRAB_WAIT
1171 for connection in self.worker.active_connections:
1172 if connection.state == 'GRAB_WAIT':
1173 return False
1174 return True
1175
1176 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001177 builds = self.launcher.builds.values()
1178 for build in builds:
1179 client_job = None
1180 for conn in self.launcher.gearman.active_connections:
1181 for j in conn.related_jobs.values():
1182 if j.unique == build.uuid:
1183 client_job = j
1184 break
1185 if not client_job:
1186 self.log.debug("%s is not known to the gearman client" %
1187 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001188 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001189 if not client_job.handle:
1190 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001191 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001192 server_job = self.gearman_server.jobs.get(client_job.handle)
1193 if not server_job:
1194 self.log.debug("%s is not known to the gearman server" %
1195 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001196 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001197 if not hasattr(server_job, 'waiting'):
1198 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001199 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001200 if server_job.waiting:
1201 continue
1202 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1203 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001204 if build.number is None:
1205 self.log.debug("%s has not reported start" % worker_job)
1206 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001207 if worker_job.build.isWaiting():
1208 continue
1209 else:
1210 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001211 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001212 else:
1213 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001214 return False
1215 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001216
1217 def waitUntilSettled(self):
1218 self.log.debug("Waiting until settled...")
1219 start = time.time()
1220 while True:
1221 if time.time() - start > 10:
1222 print 'queue status:',
1223 print self.sched.trigger_event_queue.empty(),
1224 print self.sched.result_event_queue.empty(),
1225 print self.fake_gerrit.event_queue.empty(),
1226 print self.areAllBuildsWaiting()
1227 raise Exception("Timeout waiting for Zuul to settle")
1228 # Make sure no new events show up while we're checking
1229 self.worker.lock.acquire()
1230 # have all build states propogated to zuul?
1231 if self.haveAllBuildsReported():
1232 # Join ensures that the queue is empty _and_ events have been
1233 # processed
1234 self.fake_gerrit.event_queue.join()
1235 self.sched.trigger_event_queue.join()
1236 self.sched.result_event_queue.join()
1237 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001238 if (not self.merge_client.build_sets and
1239 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001240 self.sched.result_event_queue.empty() and
1241 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001242 self.haveAllBuildsReported() and
1243 self.areAllBuildsWaiting()):
1244 self.sched.run_handler_lock.release()
1245 self.worker.lock.release()
1246 self.log.debug("...settled.")
1247 return
1248 self.sched.run_handler_lock.release()
1249 self.worker.lock.release()
1250 self.sched.wake_event.wait(0.1)
1251
1252 def countJobResults(self, jobs, result):
1253 jobs = filter(lambda x: x.result == result, jobs)
1254 return len(jobs)
1255
1256 def getJobFromHistory(self, name):
1257 history = self.worker.build_history
1258 for job in history:
1259 if job.name == name:
1260 return job
1261 raise Exception("Unable to find job %s in history" % name)
1262
1263 def assertEmptyQueues(self):
1264 # Make sure there are no orphaned jobs
1265 for pipeline in self.sched.layout.pipelines.values():
1266 for queue in pipeline.queues:
1267 if len(queue.queue) != 0:
1268 print 'pipeline %s queue %s contents %s' % (
1269 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001270 self.assertEqual(len(queue.queue), 0,
1271 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001272
1273 def assertReportedStat(self, key, value=None, kind=None):
1274 start = time.time()
1275 while time.time() < (start + 5):
1276 for stat in self.statsd.stats:
1277 pprint.pprint(self.statsd.stats)
1278 k, v = stat.split(':')
1279 if key == k:
1280 if value is None and kind is None:
1281 return
1282 elif value:
1283 if value == v:
1284 return
1285 elif kind:
1286 if v.endswith('|' + kind):
1287 return
1288 time.sleep(0.1)
1289
1290 pprint.pprint(self.statsd.stats)
1291 raise Exception("Key %s not found in reported stats" % key)