blob: fd27a89f8718ec401126f3611537a1cfd7674489 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
45import zuul.scheduler
46import zuul.webapp
47import zuul.rpclistener
48import zuul.launcher.gearman
49import zuul.lib.swift
50import zuul.merger.server
51import zuul.merger.client
52import zuul.reporter.gerrit
53import zuul.reporter.smtp
54import zuul.trigger.gerrit
55import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070056import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070057
58FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
59 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070060USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070061
62logging.basicConfig(level=logging.DEBUG,
63 format='%(asctime)s %(name)-32s '
64 '%(levelname)-8s %(message)s')
65
66
67def repack_repo(path):
68 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
69 output = subprocess.Popen(cmd, close_fds=True,
70 stdout=subprocess.PIPE,
71 stderr=subprocess.PIPE)
72 out = output.communicate()
73 if output.returncode:
74 raise Exception("git repack returned %d" % output.returncode)
75 return out
76
77
78def random_sha1():
79 return hashlib.sha1(str(random.random())).hexdigest()
80
81
James E. Blaira190f3b2015-01-05 14:56:54 -080082def iterate_timeout(max_seconds, purpose):
83 start = time.time()
84 count = 0
85 while (time.time() < start + max_seconds):
86 count += 1
87 yield count
88 time.sleep(0)
89 raise Exception("Timeout waiting for %s" % purpose)
90
91
Clark Boylanb640e052014-04-03 16:41:46 -070092class ChangeReference(git.Reference):
93 _common_path_default = "refs/changes"
94 _points_to_commits_only = True
95
96
97class FakeChange(object):
98 categories = {'APRV': ('Approved', -1, 1),
99 'CRVW': ('Code-Review', -2, 2),
100 'VRFY': ('Verified', -2, 2)}
101
102 def __init__(self, gerrit, number, project, branch, subject,
103 status='NEW', upstream_root=None):
104 self.gerrit = gerrit
105 self.reported = 0
106 self.queried = 0
107 self.patchsets = []
108 self.number = number
109 self.project = project
110 self.branch = branch
111 self.subject = subject
112 self.latest_patchset = 0
113 self.depends_on_change = None
114 self.needed_by_changes = []
115 self.fail_merge = False
116 self.messages = []
117 self.data = {
118 'branch': branch,
119 'comments': [],
120 'commitMessage': subject,
121 'createdOn': time.time(),
122 'id': 'I' + random_sha1(),
123 'lastUpdated': time.time(),
124 'number': str(number),
125 'open': status == 'NEW',
126 'owner': {'email': 'user@example.com',
127 'name': 'User Name',
128 'username': 'username'},
129 'patchSets': self.patchsets,
130 'project': project,
131 'status': status,
132 'subject': subject,
133 'submitRecords': [],
134 'url': 'https://hostname/%s' % number}
135
136 self.upstream_root = upstream_root
137 self.addPatchset()
138 self.data['submitRecords'] = self.getSubmitRecords()
139 self.open = status == 'NEW'
140
141 def add_fake_change_to_repo(self, msg, fn, large):
142 path = os.path.join(self.upstream_root, self.project)
143 repo = git.Repo(path)
144 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
145 self.latest_patchset),
146 'refs/tags/init')
147 repo.head.reference = ref
148 repo.head.reset(index=True, working_tree=True)
149 repo.git.clean('-x', '-f', '-d')
150
151 path = os.path.join(self.upstream_root, self.project)
152 if not large:
153 fn = os.path.join(path, fn)
154 f = open(fn, 'w')
155 f.write("test %s %s %s\n" %
156 (self.branch, self.number, self.latest_patchset))
157 f.close()
158 repo.index.add([fn])
159 else:
160 for fni in range(100):
161 fn = os.path.join(path, str(fni))
162 f = open(fn, 'w')
163 for ci in range(4096):
164 f.write(random.choice(string.printable))
165 f.close()
166 repo.index.add([fn])
167
168 r = repo.index.commit(msg)
169 repo.head.reference = 'master'
170 repo.head.reset(index=True, working_tree=True)
171 repo.git.clean('-x', '-f', '-d')
172 repo.heads['master'].checkout()
173 return r
174
175 def addPatchset(self, files=[], large=False):
176 self.latest_patchset += 1
177 if files:
178 fn = files[0]
179 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700180 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700181 msg = self.subject + '-' + str(self.latest_patchset)
182 c = self.add_fake_change_to_repo(msg, fn, large)
183 ps_files = [{'file': '/COMMIT_MSG',
184 'type': 'ADDED'},
185 {'file': 'README',
186 'type': 'MODIFIED'}]
187 for f in files:
188 ps_files.append({'file': f, 'type': 'ADDED'})
189 d = {'approvals': [],
190 'createdOn': time.time(),
191 'files': ps_files,
192 'number': str(self.latest_patchset),
193 'ref': 'refs/changes/1/%s/%s' % (self.number,
194 self.latest_patchset),
195 'revision': c.hexsha,
196 'uploader': {'email': 'user@example.com',
197 'name': 'User name',
198 'username': 'user'}}
199 self.data['currentPatchSet'] = d
200 self.patchsets.append(d)
201 self.data['submitRecords'] = self.getSubmitRecords()
202
203 def getPatchsetCreatedEvent(self, patchset):
204 event = {"type": "patchset-created",
205 "change": {"project": self.project,
206 "branch": self.branch,
207 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
208 "number": str(self.number),
209 "subject": self.subject,
210 "owner": {"name": "User Name"},
211 "url": "https://hostname/3"},
212 "patchSet": self.patchsets[patchset - 1],
213 "uploader": {"name": "User Name"}}
214 return event
215
216 def getChangeRestoredEvent(self):
217 event = {"type": "change-restored",
218 "change": {"project": self.project,
219 "branch": self.branch,
220 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
221 "number": str(self.number),
222 "subject": self.subject,
223 "owner": {"name": "User Name"},
224 "url": "https://hostname/3"},
225 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100226 "patchSet": self.patchsets[-1],
227 "reason": ""}
228 return event
229
230 def getChangeAbandonedEvent(self):
231 event = {"type": "change-abandoned",
232 "change": {"project": self.project,
233 "branch": self.branch,
234 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
235 "number": str(self.number),
236 "subject": self.subject,
237 "owner": {"name": "User Name"},
238 "url": "https://hostname/3"},
239 "abandoner": {"name": "User Name"},
240 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700241 "reason": ""}
242 return event
243
244 def getChangeCommentEvent(self, patchset):
245 event = {"type": "comment-added",
246 "change": {"project": self.project,
247 "branch": self.branch,
248 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
249 "number": str(self.number),
250 "subject": self.subject,
251 "owner": {"name": "User Name"},
252 "url": "https://hostname/3"},
253 "patchSet": self.patchsets[patchset - 1],
254 "author": {"name": "User Name"},
255 "approvals": [{"type": "Code-Review",
256 "description": "Code-Review",
257 "value": "0"}],
258 "comment": "This is a comment"}
259 return event
260
261 def addApproval(self, category, value, username='jenkins',
262 granted_on=None):
263 if not granted_on:
264 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000265 approval = {
266 'description': self.categories[category][0],
267 'type': category,
268 'value': str(value),
269 'by': {
270 'username': username,
271 'email': username + '@example.com',
272 },
273 'grantedOn': int(granted_on)
274 }
Clark Boylanb640e052014-04-03 16:41:46 -0700275 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
276 if x['by']['username'] == username and x['type'] == category:
277 del self.patchsets[-1]['approvals'][i]
278 self.patchsets[-1]['approvals'].append(approval)
279 event = {'approvals': [approval],
280 'author': {'email': 'user@example.com',
281 'name': 'User Name',
282 'username': 'username'},
283 'change': {'branch': self.branch,
284 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
285 'number': str(self.number),
286 'owner': {'email': 'user@example.com',
287 'name': 'User Name',
288 'username': 'username'},
289 'project': self.project,
290 'subject': self.subject,
291 'topic': 'master',
292 'url': 'https://hostname/459'},
293 'comment': '',
294 'patchSet': self.patchsets[-1],
295 'type': 'comment-added'}
296 self.data['submitRecords'] = self.getSubmitRecords()
297 return json.loads(json.dumps(event))
298
299 def getSubmitRecords(self):
300 status = {}
301 for cat in self.categories.keys():
302 status[cat] = 0
303
304 for a in self.patchsets[-1]['approvals']:
305 cur = status[a['type']]
306 cat_min, cat_max = self.categories[a['type']][1:]
307 new = int(a['value'])
308 if new == cat_min:
309 cur = new
310 elif abs(new) > abs(cur):
311 cur = new
312 status[a['type']] = cur
313
314 labels = []
315 ok = True
316 for typ, cat in self.categories.items():
317 cur = status[typ]
318 cat_min, cat_max = cat[1:]
319 if cur == cat_min:
320 value = 'REJECT'
321 ok = False
322 elif cur == cat_max:
323 value = 'OK'
324 else:
325 value = 'NEED'
326 ok = False
327 labels.append({'label': cat[0], 'status': value})
328 if ok:
329 return [{'status': 'OK'}]
330 return [{'status': 'NOT_READY',
331 'labels': labels}]
332
333 def setDependsOn(self, other, patchset):
334 self.depends_on_change = other
335 d = {'id': other.data['id'],
336 'number': other.data['number'],
337 'ref': other.patchsets[patchset - 1]['ref']
338 }
339 self.data['dependsOn'] = [d]
340
341 other.needed_by_changes.append(self)
342 needed = other.data.get('neededBy', [])
343 d = {'id': self.data['id'],
344 'number': self.data['number'],
345 'ref': self.patchsets[patchset - 1]['ref'],
346 'revision': self.patchsets[patchset - 1]['revision']
347 }
348 needed.append(d)
349 other.data['neededBy'] = needed
350
351 def query(self):
352 self.queried += 1
353 d = self.data.get('dependsOn')
354 if d:
355 d = d[0]
356 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
357 d['isCurrentPatchSet'] = True
358 else:
359 d['isCurrentPatchSet'] = False
360 return json.loads(json.dumps(self.data))
361
362 def setMerged(self):
363 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000364 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700365 return
366 if self.fail_merge:
367 return
368 self.data['status'] = 'MERGED'
369 self.open = False
370
371 path = os.path.join(self.upstream_root, self.project)
372 repo = git.Repo(path)
373 repo.heads[self.branch].commit = \
374 repo.commit(self.patchsets[-1]['revision'])
375
376 def setReported(self):
377 self.reported += 1
378
379
380class FakeGerrit(object):
James E. Blair96698e22015-04-02 07:48:21 -0700381 log = logging.getLogger("zuul.test.FakeGerrit")
382
Clark Boylanb640e052014-04-03 16:41:46 -0700383 def __init__(self, *args, **kw):
384 self.event_queue = Queue.Queue()
385 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
386 self.change_number = 0
387 self.changes = {}
James E. Blairf8ff9932014-08-15 15:24:24 -0700388 self.queries = []
Clark Boylanb640e052014-04-03 16:41:46 -0700389
390 def addFakeChange(self, project, branch, subject, status='NEW'):
391 self.change_number += 1
392 c = FakeChange(self, self.change_number, project, branch, subject,
393 upstream_root=self.upstream_root,
394 status=status)
395 self.changes[self.change_number] = c
396 return c
397
398 def addEvent(self, data):
James E. Blair5241b882015-04-02 14:56:35 -0700399 return self.event_queue.put((time.time(), data))
Clark Boylanb640e052014-04-03 16:41:46 -0700400
401 def getEvent(self):
402 return self.event_queue.get()
403
404 def eventDone(self):
405 self.event_queue.task_done()
406
407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
410 change.messages.append(message)
411 if 'submit' in action:
412 change.setMerged()
413 if message:
414 change.setReported()
415
416 def query(self, number):
417 change = self.changes.get(int(number))
418 if change:
419 return change.query()
420 return {}
421
James E. Blairc494d542014-08-06 09:23:52 -0700422 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700423 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700424 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800425 if query.startswith('change:'):
426 # Query a specific changeid
427 changeid = query[len('change:'):]
428 l = [change.query() for change in self.changes.values()
429 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700430 elif query.startswith('message:'):
431 # Query the content of a commit message
432 msg = query[len('message:'):].strip()
433 l = [change.query() for change in self.changes.values()
434 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800435 else:
436 # Query all open changes
437 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700438 return l
James E. Blairc494d542014-08-06 09:23:52 -0700439
Clark Boylanb640e052014-04-03 16:41:46 -0700440 def startWatching(self, *args, **kw):
441 pass
442
443
444class BuildHistory(object):
445 def __init__(self, **kw):
446 self.__dict__.update(kw)
447
448 def __repr__(self):
449 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
450 (self.result, self.name, self.number, self.changes))
451
452
453class FakeURLOpener(object):
454 def __init__(self, upstream_root, fake_gerrit, url):
455 self.upstream_root = upstream_root
456 self.fake_gerrit = fake_gerrit
457 self.url = url
458
459 def read(self):
460 res = urlparse.urlparse(self.url)
461 path = res.path
462 project = '/'.join(path.split('/')[2:-2])
463 ret = '001e# service=git-upload-pack\n'
464 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
465 'multi_ack thin-pack side-band side-band-64k ofs-delta '
466 'shallow no-progress include-tag multi_ack_detailed no-done\n')
467 path = os.path.join(self.upstream_root, project)
468 repo = git.Repo(path)
469 for ref in repo.refs:
470 r = ref.object.hexsha + ' ' + ref.path + '\n'
471 ret += '%04x%s' % (len(r) + 4, r)
472 ret += '0000'
473 return ret
474
475
476class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
477 name = 'gerrit'
478
479 def __init__(self, upstream_root, *args):
480 super(FakeGerritTrigger, self).__init__(*args)
481 self.upstream_root = upstream_root
James E. Blair5241b882015-04-02 14:56:35 -0700482 self.gerrit_connector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700483
484 def getGitUrl(self, project):
485 return os.path.join(self.upstream_root, project.name)
486
487
488class FakeStatsd(threading.Thread):
489 def __init__(self):
490 threading.Thread.__init__(self)
491 self.daemon = True
492 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
493 self.sock.bind(('', 0))
494 self.port = self.sock.getsockname()[1]
495 self.wake_read, self.wake_write = os.pipe()
496 self.stats = []
497
498 def run(self):
499 while True:
500 poll = select.poll()
501 poll.register(self.sock, select.POLLIN)
502 poll.register(self.wake_read, select.POLLIN)
503 ret = poll.poll()
504 for (fd, event) in ret:
505 if fd == self.sock.fileno():
506 data = self.sock.recvfrom(1024)
507 if not data:
508 return
509 self.stats.append(data[0])
510 if fd == self.wake_read:
511 return
512
513 def stop(self):
514 os.write(self.wake_write, '1\n')
515
516
517class FakeBuild(threading.Thread):
518 log = logging.getLogger("zuul.test")
519
520 def __init__(self, worker, job, number, node):
521 threading.Thread.__init__(self)
522 self.daemon = True
523 self.worker = worker
524 self.job = job
525 self.name = job.name.split(':')[1]
526 self.number = number
527 self.node = node
528 self.parameters = json.loads(job.arguments)
529 self.unique = self.parameters['ZUUL_UUID']
530 self.wait_condition = threading.Condition()
531 self.waiting = False
532 self.aborted = False
533 self.created = time.time()
534 self.description = ''
535 self.run_error = False
536
537 def release(self):
538 self.wait_condition.acquire()
539 self.wait_condition.notify()
540 self.waiting = False
541 self.log.debug("Build %s released" % self.unique)
542 self.wait_condition.release()
543
544 def isWaiting(self):
545 self.wait_condition.acquire()
546 if self.waiting:
547 ret = True
548 else:
549 ret = False
550 self.wait_condition.release()
551 return ret
552
553 def _wait(self):
554 self.wait_condition.acquire()
555 self.waiting = True
556 self.log.debug("Build %s waiting" % self.unique)
557 self.wait_condition.wait()
558 self.wait_condition.release()
559
560 def run(self):
561 data = {
562 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
563 'name': self.name,
564 'number': self.number,
565 'manager': self.worker.worker_id,
566 'worker_name': 'My Worker',
567 'worker_hostname': 'localhost',
568 'worker_ips': ['127.0.0.1', '192.168.1.1'],
569 'worker_fqdn': 'zuul.example.org',
570 'worker_program': 'FakeBuilder',
571 'worker_version': 'v1.1',
572 'worker_extra': {'something': 'else'}
573 }
574
575 self.log.debug('Running build %s' % self.unique)
576
577 self.job.sendWorkData(json.dumps(data))
578 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
579 self.job.sendWorkStatus(0, 100)
580
581 if self.worker.hold_jobs_in_build:
582 self.log.debug('Holding build %s' % self.unique)
583 self._wait()
584 self.log.debug("Build %s continuing" % self.unique)
585
586 self.worker.lock.acquire()
587
588 result = 'SUCCESS'
589 if (('ZUUL_REF' in self.parameters) and
590 self.worker.shouldFailTest(self.name,
591 self.parameters['ZUUL_REF'])):
592 result = 'FAILURE'
593 if self.aborted:
594 result = 'ABORTED'
595
596 if self.run_error:
597 work_fail = True
598 result = 'RUN_ERROR'
599 else:
600 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500601 data['node_labels'] = ['bare-necessities']
602 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700603 work_fail = False
604
605 changes = None
606 if 'ZUUL_CHANGE_IDS' in self.parameters:
607 changes = self.parameters['ZUUL_CHANGE_IDS']
608
609 self.worker.build_history.append(
610 BuildHistory(name=self.name, number=self.number,
611 result=result, changes=changes, node=self.node,
612 uuid=self.unique, description=self.description,
613 pipeline=self.parameters['ZUUL_PIPELINE'])
614 )
615
616 self.job.sendWorkData(json.dumps(data))
617 if work_fail:
618 self.job.sendWorkFail()
619 else:
620 self.job.sendWorkComplete(json.dumps(data))
621 del self.worker.gearman_jobs[self.job.unique]
622 self.worker.running_builds.remove(self)
623 self.worker.lock.release()
624
625
626class FakeWorker(gear.Worker):
627 def __init__(self, worker_id, test):
628 super(FakeWorker, self).__init__(worker_id)
629 self.gearman_jobs = {}
630 self.build_history = []
631 self.running_builds = []
632 self.build_counter = 0
633 self.fail_tests = {}
634 self.test = test
635
636 self.hold_jobs_in_build = False
637 self.lock = threading.Lock()
638 self.__work_thread = threading.Thread(target=self.work)
639 self.__work_thread.daemon = True
640 self.__work_thread.start()
641
642 def handleJob(self, job):
643 parts = job.name.split(":")
644 cmd = parts[0]
645 name = parts[1]
646 if len(parts) > 2:
647 node = parts[2]
648 else:
649 node = None
650 if cmd == 'build':
651 self.handleBuild(job, name, node)
652 elif cmd == 'stop':
653 self.handleStop(job, name)
654 elif cmd == 'set_description':
655 self.handleSetDescription(job, name)
656
657 def handleBuild(self, job, name, node):
658 build = FakeBuild(self, job, self.build_counter, node)
659 job.build = build
660 self.gearman_jobs[job.unique] = job
661 self.build_counter += 1
662
663 self.running_builds.append(build)
664 build.start()
665
666 def handleStop(self, job, name):
667 self.log.debug("handle stop")
668 parameters = json.loads(job.arguments)
669 name = parameters['name']
670 number = parameters['number']
671 for build in self.running_builds:
672 if build.name == name and build.number == number:
673 build.aborted = True
674 build.release()
675 job.sendWorkComplete()
676 return
677 job.sendWorkFail()
678
679 def handleSetDescription(self, job, name):
680 self.log.debug("handle set description")
681 parameters = json.loads(job.arguments)
682 name = parameters['name']
683 number = parameters['number']
684 descr = parameters['html_description']
685 for build in self.running_builds:
686 if build.name == name and build.number == number:
687 build.description = descr
688 job.sendWorkComplete()
689 return
690 for build in self.build_history:
691 if build.name == name and build.number == number:
692 build.description = descr
693 job.sendWorkComplete()
694 return
695 job.sendWorkFail()
696
697 def work(self):
698 while self.running:
699 try:
700 job = self.getJob()
701 except gear.InterruptedError:
702 continue
703 try:
704 self.handleJob(job)
705 except:
706 self.log.exception("Worker exception:")
707
708 def addFailTest(self, name, change):
709 l = self.fail_tests.get(name, [])
710 l.append(change)
711 self.fail_tests[name] = l
712
713 def shouldFailTest(self, name, ref):
714 l = self.fail_tests.get(name, [])
715 for change in l:
716 if self.test.ref_has_change(ref, change):
717 return True
718 return False
719
720 def release(self, regex=None):
721 builds = self.running_builds[:]
722 self.log.debug("releasing build %s (%s)" % (regex,
723 len(self.running_builds)))
724 for build in builds:
725 if not regex or re.match(regex, build.name):
726 self.log.debug("releasing build %s" %
727 (build.parameters['ZUUL_UUID']))
728 build.release()
729 else:
730 self.log.debug("not releasing build %s" %
731 (build.parameters['ZUUL_UUID']))
732 self.log.debug("done releasing builds %s (%s)" %
733 (regex, len(self.running_builds)))
734
735
736class FakeGearmanServer(gear.Server):
737 def __init__(self):
738 self.hold_jobs_in_queue = False
739 super(FakeGearmanServer, self).__init__(0)
740
741 def getJobForConnection(self, connection, peek=False):
742 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
743 for job in queue:
744 if not hasattr(job, 'waiting'):
745 if job.name.startswith('build:'):
746 job.waiting = self.hold_jobs_in_queue
747 else:
748 job.waiting = False
749 if job.waiting:
750 continue
751 if job.name in connection.functions:
752 if not peek:
753 queue.remove(job)
754 connection.related_jobs[job.handle] = job
755 job.worker_connection = connection
756 job.running = True
757 return job
758 return None
759
760 def release(self, regex=None):
761 released = False
762 qlen = (len(self.high_queue) + len(self.normal_queue) +
763 len(self.low_queue))
764 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
765 for job in self.getQueue():
766 cmd, name = job.name.split(':')
767 if cmd != 'build':
768 continue
769 if not regex or re.match(regex, name):
770 self.log.debug("releasing queued job %s" %
771 job.unique)
772 job.waiting = False
773 released = True
774 else:
775 self.log.debug("not releasing queued job %s" %
776 job.unique)
777 if released:
778 self.wakeConnections()
779 qlen = (len(self.high_queue) + len(self.normal_queue) +
780 len(self.low_queue))
781 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
782
783
784class FakeSMTP(object):
785 log = logging.getLogger('zuul.FakeSMTP')
786
787 def __init__(self, messages, server, port):
788 self.server = server
789 self.port = port
790 self.messages = messages
791
792 def sendmail(self, from_email, to_email, msg):
793 self.log.info("Sending email from %s, to %s, with msg %s" % (
794 from_email, to_email, msg))
795
796 headers = msg.split('\n\n', 1)[0]
797 body = msg.split('\n\n', 1)[1]
798
799 self.messages.append(dict(
800 from_email=from_email,
801 to_email=to_email,
802 msg=msg,
803 headers=headers,
804 body=body,
805 ))
806
807 return True
808
809 def quit(self):
810 return True
811
812
813class FakeSwiftClientConnection(swiftclient.client.Connection):
814 def post_account(self, headers):
815 # Do nothing
816 pass
817
818 def get_auth(self):
819 # Returns endpoint and (unused) auth token
820 endpoint = os.path.join('https://storage.example.org', 'V1',
821 'AUTH_account')
822 return endpoint, ''
823
824
Maru Newby3fe5f852015-01-13 04:22:14 +0000825class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700826 log = logging.getLogger("zuul.test")
827
828 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000829 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700830 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
831 try:
832 test_timeout = int(test_timeout)
833 except ValueError:
834 # If timeout value is invalid do not set a timeout.
835 test_timeout = 0
836 if test_timeout > 0:
837 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
838
839 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
840 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
841 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
842 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
843 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
844 os.environ.get('OS_STDERR_CAPTURE') == '1'):
845 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
846 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
847 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
848 os.environ.get('OS_LOG_CAPTURE') == '1'):
849 self.useFixture(fixtures.FakeLogger(
850 level=logging.DEBUG,
851 format='%(asctime)s %(name)-32s '
852 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000853
854
855class ZuulTestCase(BaseTestCase):
856
857 def setUp(self):
858 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700859 if USE_TEMPDIR:
860 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000861 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
862 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700863 else:
864 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700865 self.test_root = os.path.join(tmp_root, "zuul-test")
866 self.upstream_root = os.path.join(self.test_root, "upstream")
867 self.git_root = os.path.join(self.test_root, "git")
868
869 if os.path.exists(self.test_root):
870 shutil.rmtree(self.test_root)
871 os.makedirs(self.test_root)
872 os.makedirs(self.upstream_root)
873 os.makedirs(self.git_root)
874
875 # Make per test copy of Configuration.
876 self.setup_config()
877 self.config.set('zuul', 'layout_config',
878 os.path.join(FIXTURE_DIR, "layout.yaml"))
879 self.config.set('merger', 'git_dir', self.git_root)
880
881 # For each project in config:
882 self.init_repo("org/project")
883 self.init_repo("org/project1")
884 self.init_repo("org/project2")
885 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700886 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700887 self.init_repo("org/project5")
888 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700889 self.init_repo("org/one-job-project")
890 self.init_repo("org/nonvoting-project")
891 self.init_repo("org/templated-project")
892 self.init_repo("org/layered-project")
893 self.init_repo("org/node-project")
894 self.init_repo("org/conflict-project")
895 self.init_repo("org/noop-project")
896 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000897 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700898
899 self.statsd = FakeStatsd()
900 os.environ['STATSD_HOST'] = 'localhost'
901 os.environ['STATSD_PORT'] = str(self.statsd.port)
902 self.statsd.start()
903 # the statsd client object is configured in the statsd module import
904 reload(statsd)
905 reload(zuul.scheduler)
906
907 self.gearman_server = FakeGearmanServer()
908
909 self.config.set('gearman', 'port', str(self.gearman_server.port))
910
911 self.worker = FakeWorker('fake_worker', self)
912 self.worker.addServer('127.0.0.1', self.gearman_server.port)
913 self.gearman_server.worker = self.worker
914
915 self.merge_server = zuul.merger.server.MergeServer(self.config)
916 self.merge_server.start()
917
918 self.sched = zuul.scheduler.Scheduler()
919
920 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
921 FakeSwiftClientConnection))
922 self.swift = zuul.lib.swift.Swift(self.config)
923
924 def URLOpenerFactory(*args, **kw):
925 if isinstance(args[0], urllib2.Request):
926 return old_urlopen(*args, **kw)
927 args = [self.fake_gerrit] + list(args)
928 return FakeURLOpener(self.upstream_root, *args, **kw)
929
930 old_urlopen = urllib2.urlopen
931 urllib2.urlopen = URLOpenerFactory
932
933 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
934 self.swift)
935 self.merge_client = zuul.merger.client.MergeClient(
936 self.config, self.sched)
937
938 self.smtp_messages = []
939
940 def FakeSMTPFactory(*args, **kw):
941 args = [self.smtp_messages] + list(args)
942 return FakeSMTP(*args, **kw)
943
944 zuul.lib.gerrit.Gerrit = FakeGerrit
945 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
946
947 self.gerrit = FakeGerritTrigger(
948 self.upstream_root, self.config, self.sched)
949 self.gerrit.replication_timeout = 1.5
950 self.gerrit.replication_retry_interval = 0.5
951 self.fake_gerrit = self.gerrit.gerrit
952 self.fake_gerrit.upstream_root = self.upstream_root
953
954 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
955 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
956
957 self.sched.setLauncher(self.launcher)
958 self.sched.setMerger(self.merge_client)
959 self.sched.registerTrigger(self.gerrit)
960 self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
961 self.sched.registerTrigger(self.timer)
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000962 self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
963 self.sched)
James E. Blairc494d542014-08-06 09:23:52 -0700964 self.sched.registerTrigger(self.zuultrigger)
Clark Boylanb640e052014-04-03 16:41:46 -0700965
966 self.sched.registerReporter(
967 zuul.reporter.gerrit.Reporter(self.gerrit))
968 self.smtp_reporter = zuul.reporter.smtp.Reporter(
969 self.config.get('smtp', 'default_from'),
970 self.config.get('smtp', 'default_to'),
971 self.config.get('smtp', 'server'))
972 self.sched.registerReporter(self.smtp_reporter)
973
974 self.sched.start()
975 self.sched.reconfigure(self.config)
976 self.sched.resume()
977 self.webapp.start()
978 self.rpc.start()
979 self.launcher.gearman.waitForServer()
980 self.registerJobs()
981 self.builds = self.worker.running_builds
982 self.history = self.worker.build_history
983
984 self.addCleanup(self.assertFinalState)
985 self.addCleanup(self.shutdown)
986
987 def setup_config(self):
988 """Per test config object. Override to set different config."""
989 self.config = ConfigParser.ConfigParser()
990 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
991
992 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -0700993 # Make sure that git.Repo objects have been garbage collected.
994 repos = []
995 gc.collect()
996 for obj in gc.get_objects():
997 if isinstance(obj, git.Repo):
998 repos.append(obj)
999 self.assertEqual(len(repos), 0)
1000 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001001 for pipeline in self.sched.layout.pipelines.values():
1002 if isinstance(pipeline.manager,
1003 zuul.scheduler.IndependentPipelineManager):
1004 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001005
1006 def shutdown(self):
1007 self.log.debug("Shutting down after tests")
1008 self.launcher.stop()
1009 self.merge_server.stop()
1010 self.merge_server.join()
1011 self.merge_client.stop()
1012 self.worker.shutdown()
1013 self.gerrit.stop()
1014 self.timer.stop()
1015 self.sched.stop()
1016 self.sched.join()
1017 self.statsd.stop()
1018 self.statsd.join()
1019 self.webapp.stop()
1020 self.webapp.join()
1021 self.rpc.stop()
1022 self.rpc.join()
1023 self.gearman_server.shutdown()
1024 threads = threading.enumerate()
1025 if len(threads) > 1:
1026 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001027
1028 def init_repo(self, project):
1029 parts = project.split('/')
1030 path = os.path.join(self.upstream_root, *parts[:-1])
1031 if not os.path.exists(path):
1032 os.makedirs(path)
1033 path = os.path.join(self.upstream_root, project)
1034 repo = git.Repo.init(path)
1035
1036 repo.config_writer().set_value('user', 'email', 'user@example.com')
1037 repo.config_writer().set_value('user', 'name', 'User Name')
1038 repo.config_writer().write()
1039
1040 fn = os.path.join(path, 'README')
1041 f = open(fn, 'w')
1042 f.write("test\n")
1043 f.close()
1044 repo.index.add([fn])
1045 repo.index.commit('initial commit')
1046 master = repo.create_head('master')
1047 repo.create_tag('init')
1048
James E. Blair97d902e2014-08-21 13:25:56 -07001049 repo.head.reference = master
1050 repo.head.reset(index=True, working_tree=True)
1051 repo.git.clean('-x', '-f', '-d')
1052
1053 self.create_branch(project, 'mp')
1054
1055 def create_branch(self, project, branch):
1056 path = os.path.join(self.upstream_root, project)
1057 repo = git.Repo.init(path)
1058 fn = os.path.join(path, 'README')
1059
1060 branch_head = repo.create_head(branch)
1061 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001062 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001063 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001064 f.close()
1065 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001066 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001067
James E. Blair97d902e2014-08-21 13:25:56 -07001068 repo.head.reference = repo.heads['master']
Clark Boylanb640e052014-04-03 16:41:46 -07001069 repo.head.reset(index=True, working_tree=True)
1070 repo.git.clean('-x', '-f', '-d')
1071
1072 def ref_has_change(self, ref, change):
1073 path = os.path.join(self.git_root, change.project)
1074 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001075 try:
1076 for commit in repo.iter_commits(ref):
1077 if commit.message.strip() == ('%s-1' % change.subject):
1078 return True
1079 except GitCommandError:
1080 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001081 return False
1082
1083 def job_has_changes(self, *args):
1084 job = args[0]
1085 commits = args[1:]
1086 if isinstance(job, FakeBuild):
1087 parameters = job.parameters
1088 else:
1089 parameters = json.loads(job.arguments)
1090 project = parameters['ZUUL_PROJECT']
1091 path = os.path.join(self.git_root, project)
1092 repo = git.Repo(path)
1093 ref = parameters['ZUUL_REF']
1094 sha = parameters['ZUUL_COMMIT']
1095 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1096 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1097 commit_messages = ['%s-1' % commit.subject for commit in commits]
1098 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1099 " repo_messages %s; sha %s" % (job, commit_messages,
1100 repo_messages, sha))
1101 for msg in commit_messages:
1102 if msg not in repo_messages:
1103 self.log.debug(" messages do not match")
1104 return False
1105 if repo_shas[0] != sha:
1106 self.log.debug(" sha does not match")
1107 return False
1108 self.log.debug(" OK")
1109 return True
1110
1111 def registerJobs(self):
1112 count = 0
1113 for job in self.sched.layout.jobs.keys():
1114 self.worker.registerFunction('build:' + job)
1115 count += 1
1116 self.worker.registerFunction('stop:' + self.worker.worker_id)
1117 count += 1
1118
1119 while len(self.gearman_server.functions) < count:
1120 time.sleep(0)
1121
James E. Blairb8c16472015-05-05 14:55:26 -07001122 def orderedRelease(self):
1123 # Run one build at a time to ensure non-race order:
1124 while len(self.builds):
1125 self.release(self.builds[0])
1126 self.waitUntilSettled()
1127
Clark Boylanb640e052014-04-03 16:41:46 -07001128 def release(self, job):
1129 if isinstance(job, FakeBuild):
1130 job.release()
1131 else:
1132 job.waiting = False
1133 self.log.debug("Queued job %s released" % job.unique)
1134 self.gearman_server.wakeConnections()
1135
1136 def getParameter(self, job, name):
1137 if isinstance(job, FakeBuild):
1138 return job.parameters[name]
1139 else:
1140 parameters = json.loads(job.arguments)
1141 return parameters[name]
1142
1143 def resetGearmanServer(self):
1144 self.worker.setFunctions([])
1145 while True:
1146 done = True
1147 for connection in self.gearman_server.active_connections:
1148 if (connection.functions and
1149 connection.client_id not in ['Zuul RPC Listener',
1150 'Zuul Merger']):
1151 done = False
1152 if done:
1153 break
1154 time.sleep(0)
1155 self.gearman_server.functions = set()
1156 self.rpc.register()
1157 self.merge_server.register()
1158
1159 def haveAllBuildsReported(self):
1160 # See if Zuul is waiting on a meta job to complete
1161 if self.launcher.meta_jobs:
1162 return False
1163 # Find out if every build that the worker has completed has been
1164 # reported back to Zuul. If it hasn't then that means a Gearman
1165 # event is still in transit and the system is not stable.
1166 for build in self.worker.build_history:
1167 zbuild = self.launcher.builds.get(build.uuid)
1168 if not zbuild:
1169 # It has already been reported
1170 continue
1171 # It hasn't been reported yet.
1172 return False
1173 # Make sure that none of the worker connections are in GRAB_WAIT
1174 for connection in self.worker.active_connections:
1175 if connection.state == 'GRAB_WAIT':
1176 return False
1177 return True
1178
1179 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001180 builds = self.launcher.builds.values()
1181 for build in builds:
1182 client_job = None
1183 for conn in self.launcher.gearman.active_connections:
1184 for j in conn.related_jobs.values():
1185 if j.unique == build.uuid:
1186 client_job = j
1187 break
1188 if not client_job:
1189 self.log.debug("%s is not known to the gearman client" %
1190 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001191 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001192 if not client_job.handle:
1193 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001194 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001195 server_job = self.gearman_server.jobs.get(client_job.handle)
1196 if not server_job:
1197 self.log.debug("%s is not known to the gearman server" %
1198 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001199 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001200 if not hasattr(server_job, 'waiting'):
1201 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001202 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001203 if server_job.waiting:
1204 continue
1205 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1206 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001207 if build.number is None:
1208 self.log.debug("%s has not reported start" % worker_job)
1209 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001210 if worker_job.build.isWaiting():
1211 continue
1212 else:
1213 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001214 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001215 else:
1216 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001217 return False
1218 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001219
1220 def waitUntilSettled(self):
1221 self.log.debug("Waiting until settled...")
1222 start = time.time()
1223 while True:
1224 if time.time() - start > 10:
1225 print 'queue status:',
1226 print self.sched.trigger_event_queue.empty(),
1227 print self.sched.result_event_queue.empty(),
1228 print self.fake_gerrit.event_queue.empty(),
1229 print self.areAllBuildsWaiting()
1230 raise Exception("Timeout waiting for Zuul to settle")
1231 # Make sure no new events show up while we're checking
1232 self.worker.lock.acquire()
1233 # have all build states propogated to zuul?
1234 if self.haveAllBuildsReported():
1235 # Join ensures that the queue is empty _and_ events have been
1236 # processed
1237 self.fake_gerrit.event_queue.join()
1238 self.sched.trigger_event_queue.join()
1239 self.sched.result_event_queue.join()
1240 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001241 if (not self.merge_client.build_sets and
1242 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001243 self.sched.result_event_queue.empty() and
1244 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001245 self.haveAllBuildsReported() and
1246 self.areAllBuildsWaiting()):
1247 self.sched.run_handler_lock.release()
1248 self.worker.lock.release()
1249 self.log.debug("...settled.")
1250 return
1251 self.sched.run_handler_lock.release()
1252 self.worker.lock.release()
1253 self.sched.wake_event.wait(0.1)
1254
1255 def countJobResults(self, jobs, result):
1256 jobs = filter(lambda x: x.result == result, jobs)
1257 return len(jobs)
1258
1259 def getJobFromHistory(self, name):
1260 history = self.worker.build_history
1261 for job in history:
1262 if job.name == name:
1263 return job
1264 raise Exception("Unable to find job %s in history" % name)
1265
1266 def assertEmptyQueues(self):
1267 # Make sure there are no orphaned jobs
1268 for pipeline in self.sched.layout.pipelines.values():
1269 for queue in pipeline.queues:
1270 if len(queue.queue) != 0:
1271 print 'pipeline %s queue %s contents %s' % (
1272 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001273 self.assertEqual(len(queue.queue), 0,
1274 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001275
1276 def assertReportedStat(self, key, value=None, kind=None):
1277 start = time.time()
1278 while time.time() < (start + 5):
1279 for stat in self.statsd.stats:
1280 pprint.pprint(self.statsd.stats)
1281 k, v = stat.split(':')
1282 if key == k:
1283 if value is None and kind is None:
1284 return
1285 elif value:
1286 if value == v:
1287 return
1288 elif kind:
1289 if v.endswith('|' + kind):
1290 return
1291 time.sleep(0.1)
1292
1293 pprint.pprint(self.statsd.stats)
1294 raise Exception("Key %s not found in reported stats" % key)