blob: 585f2d203b00e4ca964eeb7099129293b0c4aee1 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
Joshua Hesketh352264b2015-08-11 23:42:08 +100045import zuul.connection.gerrit
46import zuul.connection.smtp
Clark Boylanb640e052014-04-03 16:41:46 -070047import zuul.scheduler
48import zuul.webapp
49import zuul.rpclistener
50import zuul.launcher.gearman
51import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070052import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070053import zuul.merger.merger
54import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070055import zuul.reporter.gerrit
56import zuul.reporter.smtp
Joshua Hesketh850ccb62014-11-27 11:31:02 +110057import zuul.source.gerrit
Clark Boylanb640e052014-04-03 16:41:46 -070058import zuul.trigger.gerrit
59import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070060import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070061
62FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
63 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070064USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070065
66logging.basicConfig(level=logging.DEBUG,
67 format='%(asctime)s %(name)-32s '
68 '%(levelname)-8s %(message)s')
69
70
71def repack_repo(path):
72 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
73 output = subprocess.Popen(cmd, close_fds=True,
74 stdout=subprocess.PIPE,
75 stderr=subprocess.PIPE)
76 out = output.communicate()
77 if output.returncode:
78 raise Exception("git repack returned %d" % output.returncode)
79 return out
80
81
82def random_sha1():
83 return hashlib.sha1(str(random.random())).hexdigest()
84
85
James E. Blaira190f3b2015-01-05 14:56:54 -080086def iterate_timeout(max_seconds, purpose):
87 start = time.time()
88 count = 0
89 while (time.time() < start + max_seconds):
90 count += 1
91 yield count
92 time.sleep(0)
93 raise Exception("Timeout waiting for %s" % purpose)
94
95
Clark Boylanb640e052014-04-03 16:41:46 -070096class ChangeReference(git.Reference):
97 _common_path_default = "refs/changes"
98 _points_to_commits_only = True
99
100
101class FakeChange(object):
102 categories = {'APRV': ('Approved', -1, 1),
103 'CRVW': ('Code-Review', -2, 2),
104 'VRFY': ('Verified', -2, 2)}
105
106 def __init__(self, gerrit, number, project, branch, subject,
107 status='NEW', upstream_root=None):
108 self.gerrit = gerrit
109 self.reported = 0
110 self.queried = 0
111 self.patchsets = []
112 self.number = number
113 self.project = project
114 self.branch = branch
115 self.subject = subject
116 self.latest_patchset = 0
117 self.depends_on_change = None
118 self.needed_by_changes = []
119 self.fail_merge = False
120 self.messages = []
121 self.data = {
122 'branch': branch,
123 'comments': [],
124 'commitMessage': subject,
125 'createdOn': time.time(),
126 'id': 'I' + random_sha1(),
127 'lastUpdated': time.time(),
128 'number': str(number),
129 'open': status == 'NEW',
130 'owner': {'email': 'user@example.com',
131 'name': 'User Name',
132 'username': 'username'},
133 'patchSets': self.patchsets,
134 'project': project,
135 'status': status,
136 'subject': subject,
137 'submitRecords': [],
138 'url': 'https://hostname/%s' % number}
139
140 self.upstream_root = upstream_root
141 self.addPatchset()
142 self.data['submitRecords'] = self.getSubmitRecords()
143 self.open = status == 'NEW'
144
145 def add_fake_change_to_repo(self, msg, fn, large):
146 path = os.path.join(self.upstream_root, self.project)
147 repo = git.Repo(path)
148 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
149 self.latest_patchset),
150 'refs/tags/init')
151 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700152 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700153 repo.git.clean('-x', '-f', '-d')
154
155 path = os.path.join(self.upstream_root, self.project)
156 if not large:
157 fn = os.path.join(path, fn)
158 f = open(fn, 'w')
159 f.write("test %s %s %s\n" %
160 (self.branch, self.number, self.latest_patchset))
161 f.close()
162 repo.index.add([fn])
163 else:
164 for fni in range(100):
165 fn = os.path.join(path, str(fni))
166 f = open(fn, 'w')
167 for ci in range(4096):
168 f.write(random.choice(string.printable))
169 f.close()
170 repo.index.add([fn])
171
172 r = repo.index.commit(msg)
173 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700174 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700175 repo.git.clean('-x', '-f', '-d')
176 repo.heads['master'].checkout()
177 return r
178
179 def addPatchset(self, files=[], large=False):
180 self.latest_patchset += 1
181 if files:
182 fn = files[0]
183 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700184 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700185 msg = self.subject + '-' + str(self.latest_patchset)
186 c = self.add_fake_change_to_repo(msg, fn, large)
187 ps_files = [{'file': '/COMMIT_MSG',
188 'type': 'ADDED'},
189 {'file': 'README',
190 'type': 'MODIFIED'}]
191 for f in files:
192 ps_files.append({'file': f, 'type': 'ADDED'})
193 d = {'approvals': [],
194 'createdOn': time.time(),
195 'files': ps_files,
196 'number': str(self.latest_patchset),
197 'ref': 'refs/changes/1/%s/%s' % (self.number,
198 self.latest_patchset),
199 'revision': c.hexsha,
200 'uploader': {'email': 'user@example.com',
201 'name': 'User name',
202 'username': 'user'}}
203 self.data['currentPatchSet'] = d
204 self.patchsets.append(d)
205 self.data['submitRecords'] = self.getSubmitRecords()
206
207 def getPatchsetCreatedEvent(self, patchset):
208 event = {"type": "patchset-created",
209 "change": {"project": self.project,
210 "branch": self.branch,
211 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
212 "number": str(self.number),
213 "subject": self.subject,
214 "owner": {"name": "User Name"},
215 "url": "https://hostname/3"},
216 "patchSet": self.patchsets[patchset - 1],
217 "uploader": {"name": "User Name"}}
218 return event
219
220 def getChangeRestoredEvent(self):
221 event = {"type": "change-restored",
222 "change": {"project": self.project,
223 "branch": self.branch,
224 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
225 "number": str(self.number),
226 "subject": self.subject,
227 "owner": {"name": "User Name"},
228 "url": "https://hostname/3"},
229 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100230 "patchSet": self.patchsets[-1],
231 "reason": ""}
232 return event
233
234 def getChangeAbandonedEvent(self):
235 event = {"type": "change-abandoned",
236 "change": {"project": self.project,
237 "branch": self.branch,
238 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
239 "number": str(self.number),
240 "subject": self.subject,
241 "owner": {"name": "User Name"},
242 "url": "https://hostname/3"},
243 "abandoner": {"name": "User Name"},
244 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700245 "reason": ""}
246 return event
247
248 def getChangeCommentEvent(self, patchset):
249 event = {"type": "comment-added",
250 "change": {"project": self.project,
251 "branch": self.branch,
252 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
253 "number": str(self.number),
254 "subject": self.subject,
255 "owner": {"name": "User Name"},
256 "url": "https://hostname/3"},
257 "patchSet": self.patchsets[patchset - 1],
258 "author": {"name": "User Name"},
259 "approvals": [{"type": "Code-Review",
260 "description": "Code-Review",
261 "value": "0"}],
262 "comment": "This is a comment"}
263 return event
264
Joshua Hesketh642824b2014-07-01 17:54:59 +1000265 def addApproval(self, category, value, username='reviewer_john',
266 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700267 if not granted_on:
268 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000269 approval = {
270 'description': self.categories[category][0],
271 'type': category,
272 'value': str(value),
273 'by': {
274 'username': username,
275 'email': username + '@example.com',
276 },
277 'grantedOn': int(granted_on)
278 }
Clark Boylanb640e052014-04-03 16:41:46 -0700279 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
280 if x['by']['username'] == username and x['type'] == category:
281 del self.patchsets[-1]['approvals'][i]
282 self.patchsets[-1]['approvals'].append(approval)
283 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000284 'author': {'email': 'author@example.com',
285 'name': 'Patchset Author',
286 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700287 'change': {'branch': self.branch,
288 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
289 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000290 'owner': {'email': 'owner@example.com',
291 'name': 'Change Owner',
292 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700293 'project': self.project,
294 'subject': self.subject,
295 'topic': 'master',
296 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000297 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700298 'patchSet': self.patchsets[-1],
299 'type': 'comment-added'}
300 self.data['submitRecords'] = self.getSubmitRecords()
301 return json.loads(json.dumps(event))
302
303 def getSubmitRecords(self):
304 status = {}
305 for cat in self.categories.keys():
306 status[cat] = 0
307
308 for a in self.patchsets[-1]['approvals']:
309 cur = status[a['type']]
310 cat_min, cat_max = self.categories[a['type']][1:]
311 new = int(a['value'])
312 if new == cat_min:
313 cur = new
314 elif abs(new) > abs(cur):
315 cur = new
316 status[a['type']] = cur
317
318 labels = []
319 ok = True
320 for typ, cat in self.categories.items():
321 cur = status[typ]
322 cat_min, cat_max = cat[1:]
323 if cur == cat_min:
324 value = 'REJECT'
325 ok = False
326 elif cur == cat_max:
327 value = 'OK'
328 else:
329 value = 'NEED'
330 ok = False
331 labels.append({'label': cat[0], 'status': value})
332 if ok:
333 return [{'status': 'OK'}]
334 return [{'status': 'NOT_READY',
335 'labels': labels}]
336
337 def setDependsOn(self, other, patchset):
338 self.depends_on_change = other
339 d = {'id': other.data['id'],
340 'number': other.data['number'],
341 'ref': other.patchsets[patchset - 1]['ref']
342 }
343 self.data['dependsOn'] = [d]
344
345 other.needed_by_changes.append(self)
346 needed = other.data.get('neededBy', [])
347 d = {'id': self.data['id'],
348 'number': self.data['number'],
349 'ref': self.patchsets[patchset - 1]['ref'],
350 'revision': self.patchsets[patchset - 1]['revision']
351 }
352 needed.append(d)
353 other.data['neededBy'] = needed
354
355 def query(self):
356 self.queried += 1
357 d = self.data.get('dependsOn')
358 if d:
359 d = d[0]
360 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
361 d['isCurrentPatchSet'] = True
362 else:
363 d['isCurrentPatchSet'] = False
364 return json.loads(json.dumps(self.data))
365
366 def setMerged(self):
367 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000368 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700369 return
370 if self.fail_merge:
371 return
372 self.data['status'] = 'MERGED'
373 self.open = False
374
375 path = os.path.join(self.upstream_root, self.project)
376 repo = git.Repo(path)
377 repo.heads[self.branch].commit = \
378 repo.commit(self.patchsets[-1]['revision'])
379
380 def setReported(self):
381 self.reported += 1
382
383
Joshua Hesketh352264b2015-08-11 23:42:08 +1000384class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
385 log = logging.getLogger("zuul.test.FakeGerritConnection")
James E. Blair96698e22015-04-02 07:48:21 -0700386
Joshua Hesketh352264b2015-08-11 23:42:08 +1000387 def __init__(self, connection_name, connection_config,
Jan Hruban6b71aff2015-10-22 16:58:08 +0200388 changes_db=None, queues_db=None, upstream_root=None):
Joshua Hesketh352264b2015-08-11 23:42:08 +1000389 super(FakeGerritConnection, self).__init__(connection_name,
390 connection_config)
391
392 self.event_queue = queues_db
Clark Boylanb640e052014-04-03 16:41:46 -0700393 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
394 self.change_number = 0
Joshua Hesketh352264b2015-08-11 23:42:08 +1000395 self.changes = changes_db
James E. Blairf8ff9932014-08-15 15:24:24 -0700396 self.queries = []
Jan Hruban6b71aff2015-10-22 16:58:08 +0200397 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700398
399 def addFakeChange(self, project, branch, subject, status='NEW'):
400 self.change_number += 1
401 c = FakeChange(self, self.change_number, project, branch, subject,
402 upstream_root=self.upstream_root,
403 status=status)
404 self.changes[self.change_number] = c
405 return c
406
Clark Boylanb640e052014-04-03 16:41:46 -0700407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000410
411 # Add the approval back onto the change (ie simulate what gerrit would
412 # do).
413 # Usually when zuul leaves a review it'll create a feedback loop where
414 # zuul's review enters another gerrit event (which is then picked up by
415 # zuul). However, we can't mimic this behaviour (by adding this
416 # approval event into the queue) as it stops jobs from checking what
417 # happens before this event is triggered. If a job needs to see what
418 # happens they can add their own verified event into the queue.
419 # Nevertheless, we can update change with the new review in gerrit.
420
421 for cat in ['CRVW', 'VRFY', 'APRV']:
422 if cat in action:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000423 change.addApproval(cat, action[cat], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000424
425 if 'label' in action:
426 parts = action['label'].split('=')
Joshua Hesketh352264b2015-08-11 23:42:08 +1000427 change.addApproval(parts[0], parts[2], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000428
Clark Boylanb640e052014-04-03 16:41:46 -0700429 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000430
Clark Boylanb640e052014-04-03 16:41:46 -0700431 if 'submit' in action:
432 change.setMerged()
433 if message:
434 change.setReported()
435
436 def query(self, number):
437 change = self.changes.get(int(number))
438 if change:
439 return change.query()
440 return {}
441
James E. Blairc494d542014-08-06 09:23:52 -0700442 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700443 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700444 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800445 if query.startswith('change:'):
446 # Query a specific changeid
447 changeid = query[len('change:'):]
448 l = [change.query() for change in self.changes.values()
449 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700450 elif query.startswith('message:'):
451 # Query the content of a commit message
452 msg = query[len('message:'):].strip()
453 l = [change.query() for change in self.changes.values()
454 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800455 else:
456 # Query all open changes
457 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700458 return l
James E. Blairc494d542014-08-06 09:23:52 -0700459
Joshua Hesketh352264b2015-08-11 23:42:08 +1000460 def _start_watcher_thread(self, *args, **kw):
Clark Boylanb640e052014-04-03 16:41:46 -0700461 pass
462
Joshua Hesketh352264b2015-08-11 23:42:08 +1000463 def getGitUrl(self, project):
464 return os.path.join(self.upstream_root, project.name)
465
Clark Boylanb640e052014-04-03 16:41:46 -0700466
467class BuildHistory(object):
468 def __init__(self, **kw):
469 self.__dict__.update(kw)
470
471 def __repr__(self):
472 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
473 (self.result, self.name, self.number, self.changes))
474
475
476class FakeURLOpener(object):
Jan Hruban6b71aff2015-10-22 16:58:08 +0200477 def __init__(self, upstream_root, url):
Clark Boylanb640e052014-04-03 16:41:46 -0700478 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700479 self.url = url
480
481 def read(self):
482 res = urlparse.urlparse(self.url)
483 path = res.path
484 project = '/'.join(path.split('/')[2:-2])
485 ret = '001e# service=git-upload-pack\n'
486 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
487 'multi_ack thin-pack side-band side-band-64k ofs-delta '
488 'shallow no-progress include-tag multi_ack_detailed no-done\n')
489 path = os.path.join(self.upstream_root, project)
490 repo = git.Repo(path)
491 for ref in repo.refs:
492 r = ref.object.hexsha + ' ' + ref.path + '\n'
493 ret += '%04x%s' % (len(r) + 4, r)
494 ret += '0000'
495 return ret
496
497
Clark Boylanb640e052014-04-03 16:41:46 -0700498class FakeStatsd(threading.Thread):
499 def __init__(self):
500 threading.Thread.__init__(self)
501 self.daemon = True
502 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
503 self.sock.bind(('', 0))
504 self.port = self.sock.getsockname()[1]
505 self.wake_read, self.wake_write = os.pipe()
506 self.stats = []
507
508 def run(self):
509 while True:
510 poll = select.poll()
511 poll.register(self.sock, select.POLLIN)
512 poll.register(self.wake_read, select.POLLIN)
513 ret = poll.poll()
514 for (fd, event) in ret:
515 if fd == self.sock.fileno():
516 data = self.sock.recvfrom(1024)
517 if not data:
518 return
519 self.stats.append(data[0])
520 if fd == self.wake_read:
521 return
522
523 def stop(self):
524 os.write(self.wake_write, '1\n')
525
526
527class FakeBuild(threading.Thread):
528 log = logging.getLogger("zuul.test")
529
530 def __init__(self, worker, job, number, node):
531 threading.Thread.__init__(self)
532 self.daemon = True
533 self.worker = worker
534 self.job = job
535 self.name = job.name.split(':')[1]
536 self.number = number
537 self.node = node
538 self.parameters = json.loads(job.arguments)
539 self.unique = self.parameters['ZUUL_UUID']
540 self.wait_condition = threading.Condition()
541 self.waiting = False
542 self.aborted = False
543 self.created = time.time()
544 self.description = ''
545 self.run_error = False
546
547 def release(self):
548 self.wait_condition.acquire()
549 self.wait_condition.notify()
550 self.waiting = False
551 self.log.debug("Build %s released" % self.unique)
552 self.wait_condition.release()
553
554 def isWaiting(self):
555 self.wait_condition.acquire()
556 if self.waiting:
557 ret = True
558 else:
559 ret = False
560 self.wait_condition.release()
561 return ret
562
563 def _wait(self):
564 self.wait_condition.acquire()
565 self.waiting = True
566 self.log.debug("Build %s waiting" % self.unique)
567 self.wait_condition.wait()
568 self.wait_condition.release()
569
570 def run(self):
571 data = {
572 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
573 'name': self.name,
574 'number': self.number,
575 'manager': self.worker.worker_id,
576 'worker_name': 'My Worker',
577 'worker_hostname': 'localhost',
578 'worker_ips': ['127.0.0.1', '192.168.1.1'],
579 'worker_fqdn': 'zuul.example.org',
580 'worker_program': 'FakeBuilder',
581 'worker_version': 'v1.1',
582 'worker_extra': {'something': 'else'}
583 }
584
585 self.log.debug('Running build %s' % self.unique)
586
587 self.job.sendWorkData(json.dumps(data))
588 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
589 self.job.sendWorkStatus(0, 100)
590
591 if self.worker.hold_jobs_in_build:
592 self.log.debug('Holding build %s' % self.unique)
593 self._wait()
594 self.log.debug("Build %s continuing" % self.unique)
595
596 self.worker.lock.acquire()
597
598 result = 'SUCCESS'
599 if (('ZUUL_REF' in self.parameters) and
600 self.worker.shouldFailTest(self.name,
601 self.parameters['ZUUL_REF'])):
602 result = 'FAILURE'
603 if self.aborted:
604 result = 'ABORTED'
605
606 if self.run_error:
607 work_fail = True
608 result = 'RUN_ERROR'
609 else:
610 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500611 data['node_labels'] = ['bare-necessities']
612 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700613 work_fail = False
614
615 changes = None
616 if 'ZUUL_CHANGE_IDS' in self.parameters:
617 changes = self.parameters['ZUUL_CHANGE_IDS']
618
619 self.worker.build_history.append(
620 BuildHistory(name=self.name, number=self.number,
621 result=result, changes=changes, node=self.node,
622 uuid=self.unique, description=self.description,
James E. Blair456f2fb2016-02-09 09:29:33 -0800623 parameters=self.parameters,
Clark Boylanb640e052014-04-03 16:41:46 -0700624 pipeline=self.parameters['ZUUL_PIPELINE'])
625 )
626
627 self.job.sendWorkData(json.dumps(data))
628 if work_fail:
629 self.job.sendWorkFail()
630 else:
631 self.job.sendWorkComplete(json.dumps(data))
632 del self.worker.gearman_jobs[self.job.unique]
633 self.worker.running_builds.remove(self)
634 self.worker.lock.release()
635
636
637class FakeWorker(gear.Worker):
638 def __init__(self, worker_id, test):
639 super(FakeWorker, self).__init__(worker_id)
640 self.gearman_jobs = {}
641 self.build_history = []
642 self.running_builds = []
643 self.build_counter = 0
644 self.fail_tests = {}
645 self.test = test
646
647 self.hold_jobs_in_build = False
648 self.lock = threading.Lock()
649 self.__work_thread = threading.Thread(target=self.work)
650 self.__work_thread.daemon = True
651 self.__work_thread.start()
652
653 def handleJob(self, job):
654 parts = job.name.split(":")
655 cmd = parts[0]
656 name = parts[1]
657 if len(parts) > 2:
658 node = parts[2]
659 else:
660 node = None
661 if cmd == 'build':
662 self.handleBuild(job, name, node)
663 elif cmd == 'stop':
664 self.handleStop(job, name)
665 elif cmd == 'set_description':
666 self.handleSetDescription(job, name)
667
668 def handleBuild(self, job, name, node):
669 build = FakeBuild(self, job, self.build_counter, node)
670 job.build = build
671 self.gearman_jobs[job.unique] = job
672 self.build_counter += 1
673
674 self.running_builds.append(build)
675 build.start()
676
677 def handleStop(self, job, name):
678 self.log.debug("handle stop")
679 parameters = json.loads(job.arguments)
680 name = parameters['name']
681 number = parameters['number']
682 for build in self.running_builds:
683 if build.name == name and build.number == number:
684 build.aborted = True
685 build.release()
686 job.sendWorkComplete()
687 return
688 job.sendWorkFail()
689
690 def handleSetDescription(self, job, name):
691 self.log.debug("handle set description")
692 parameters = json.loads(job.arguments)
693 name = parameters['name']
694 number = parameters['number']
695 descr = parameters['html_description']
696 for build in self.running_builds:
697 if build.name == name and build.number == number:
698 build.description = descr
699 job.sendWorkComplete()
700 return
701 for build in self.build_history:
702 if build.name == name and build.number == number:
703 build.description = descr
704 job.sendWorkComplete()
705 return
706 job.sendWorkFail()
707
708 def work(self):
709 while self.running:
710 try:
711 job = self.getJob()
712 except gear.InterruptedError:
713 continue
714 try:
715 self.handleJob(job)
716 except:
717 self.log.exception("Worker exception:")
718
719 def addFailTest(self, name, change):
720 l = self.fail_tests.get(name, [])
721 l.append(change)
722 self.fail_tests[name] = l
723
724 def shouldFailTest(self, name, ref):
725 l = self.fail_tests.get(name, [])
726 for change in l:
727 if self.test.ref_has_change(ref, change):
728 return True
729 return False
730
731 def release(self, regex=None):
732 builds = self.running_builds[:]
733 self.log.debug("releasing build %s (%s)" % (regex,
734 len(self.running_builds)))
735 for build in builds:
736 if not regex or re.match(regex, build.name):
737 self.log.debug("releasing build %s" %
738 (build.parameters['ZUUL_UUID']))
739 build.release()
740 else:
741 self.log.debug("not releasing build %s" %
742 (build.parameters['ZUUL_UUID']))
743 self.log.debug("done releasing builds %s (%s)" %
744 (regex, len(self.running_builds)))
745
746
747class FakeGearmanServer(gear.Server):
748 def __init__(self):
749 self.hold_jobs_in_queue = False
750 super(FakeGearmanServer, self).__init__(0)
751
752 def getJobForConnection(self, connection, peek=False):
753 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
754 for job in queue:
755 if not hasattr(job, 'waiting'):
756 if job.name.startswith('build:'):
757 job.waiting = self.hold_jobs_in_queue
758 else:
759 job.waiting = False
760 if job.waiting:
761 continue
762 if job.name in connection.functions:
763 if not peek:
764 queue.remove(job)
765 connection.related_jobs[job.handle] = job
766 job.worker_connection = connection
767 job.running = True
768 return job
769 return None
770
771 def release(self, regex=None):
772 released = False
773 qlen = (len(self.high_queue) + len(self.normal_queue) +
774 len(self.low_queue))
775 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
776 for job in self.getQueue():
777 cmd, name = job.name.split(':')
778 if cmd != 'build':
779 continue
780 if not regex or re.match(regex, name):
781 self.log.debug("releasing queued job %s" %
782 job.unique)
783 job.waiting = False
784 released = True
785 else:
786 self.log.debug("not releasing queued job %s" %
787 job.unique)
788 if released:
789 self.wakeConnections()
790 qlen = (len(self.high_queue) + len(self.normal_queue) +
791 len(self.low_queue))
792 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
793
794
795class FakeSMTP(object):
796 log = logging.getLogger('zuul.FakeSMTP')
797
798 def __init__(self, messages, server, port):
799 self.server = server
800 self.port = port
801 self.messages = messages
802
803 def sendmail(self, from_email, to_email, msg):
804 self.log.info("Sending email from %s, to %s, with msg %s" % (
805 from_email, to_email, msg))
806
807 headers = msg.split('\n\n', 1)[0]
808 body = msg.split('\n\n', 1)[1]
809
810 self.messages.append(dict(
811 from_email=from_email,
812 to_email=to_email,
813 msg=msg,
814 headers=headers,
815 body=body,
816 ))
817
818 return True
819
820 def quit(self):
821 return True
822
823
824class FakeSwiftClientConnection(swiftclient.client.Connection):
825 def post_account(self, headers):
826 # Do nothing
827 pass
828
829 def get_auth(self):
830 # Returns endpoint and (unused) auth token
831 endpoint = os.path.join('https://storage.example.org', 'V1',
832 'AUTH_account')
833 return endpoint, ''
834
835
Maru Newby3fe5f852015-01-13 04:22:14 +0000836class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700837 log = logging.getLogger("zuul.test")
838
839 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000840 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700841 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
842 try:
843 test_timeout = int(test_timeout)
844 except ValueError:
845 # If timeout value is invalid do not set a timeout.
846 test_timeout = 0
847 if test_timeout > 0:
848 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
849
850 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
851 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
852 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
853 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
854 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
855 os.environ.get('OS_STDERR_CAPTURE') == '1'):
856 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
857 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
858 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
859 os.environ.get('OS_LOG_CAPTURE') == '1'):
860 self.useFixture(fixtures.FakeLogger(
861 level=logging.DEBUG,
862 format='%(asctime)s %(name)-32s '
863 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000864
865
866class ZuulTestCase(BaseTestCase):
867
868 def setUp(self):
869 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700870 if USE_TEMPDIR:
871 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000872 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
873 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700874 else:
875 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700876 self.test_root = os.path.join(tmp_root, "zuul-test")
877 self.upstream_root = os.path.join(self.test_root, "upstream")
878 self.git_root = os.path.join(self.test_root, "git")
James E. Blairce8a2132016-05-19 15:21:52 -0700879 self.state_root = os.path.join(self.test_root, "lib")
Clark Boylanb640e052014-04-03 16:41:46 -0700880
881 if os.path.exists(self.test_root):
882 shutil.rmtree(self.test_root)
883 os.makedirs(self.test_root)
884 os.makedirs(self.upstream_root)
James E. Blairce8a2132016-05-19 15:21:52 -0700885 os.makedirs(self.state_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700886
887 # Make per test copy of Configuration.
888 self.setup_config()
889 self.config.set('zuul', 'layout_config',
Joshua Heskethacccffc2015-03-31 23:38:17 +1100890 os.path.join(FIXTURE_DIR,
891 self.config.get('zuul', 'layout_config')))
Clark Boylanb640e052014-04-03 16:41:46 -0700892 self.config.set('merger', 'git_dir', self.git_root)
James E. Blairce8a2132016-05-19 15:21:52 -0700893 self.config.set('zuul', 'state_dir', self.state_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700894
895 # For each project in config:
896 self.init_repo("org/project")
897 self.init_repo("org/project1")
898 self.init_repo("org/project2")
899 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700900 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700901 self.init_repo("org/project5")
902 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700903 self.init_repo("org/one-job-project")
904 self.init_repo("org/nonvoting-project")
905 self.init_repo("org/templated-project")
906 self.init_repo("org/layered-project")
907 self.init_repo("org/node-project")
908 self.init_repo("org/conflict-project")
909 self.init_repo("org/noop-project")
910 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000911 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700912
913 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000914 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
915 # see: https://github.com/jsocol/pystatsd/issues/61
916 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700917 os.environ['STATSD_PORT'] = str(self.statsd.port)
918 self.statsd.start()
919 # the statsd client object is configured in the statsd module import
920 reload(statsd)
921 reload(zuul.scheduler)
922
923 self.gearman_server = FakeGearmanServer()
924
925 self.config.set('gearman', 'port', str(self.gearman_server.port))
926
927 self.worker = FakeWorker('fake_worker', self)
928 self.worker.addServer('127.0.0.1', self.gearman_server.port)
929 self.gearman_server.worker = self.worker
930
Joshua Hesketh352264b2015-08-11 23:42:08 +1000931 zuul.source.gerrit.GerritSource.replication_timeout = 1.5
932 zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
933 zuul.connection.gerrit.GerritEventConnector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700934
Joshua Hesketh352264b2015-08-11 23:42:08 +1000935 self.sched = zuul.scheduler.Scheduler(self.config)
Clark Boylanb640e052014-04-03 16:41:46 -0700936
937 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
938 FakeSwiftClientConnection))
939 self.swift = zuul.lib.swift.Swift(self.config)
940
Jan Hruban6b71aff2015-10-22 16:58:08 +0200941 self.event_queues = [
942 self.sched.result_event_queue,
943 self.sched.trigger_event_queue
944 ]
945
Joshua Hesketh352264b2015-08-11 23:42:08 +1000946 self.configure_connections()
947 self.sched.registerConnections(self.connections)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000948
Clark Boylanb640e052014-04-03 16:41:46 -0700949 def URLOpenerFactory(*args, **kw):
950 if isinstance(args[0], urllib2.Request):
951 return old_urlopen(*args, **kw)
Clark Boylanb640e052014-04-03 16:41:46 -0700952 return FakeURLOpener(self.upstream_root, *args, **kw)
953
954 old_urlopen = urllib2.urlopen
955 urllib2.urlopen = URLOpenerFactory
956
Joshua Hesketh352264b2015-08-11 23:42:08 +1000957 self.merge_server = zuul.merger.server.MergeServer(self.config,
958 self.connections)
959 self.merge_server.start()
Clark Boylanb640e052014-04-03 16:41:46 -0700960
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100961 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
962 self.swift)
963 self.merge_client = zuul.merger.client.MergeClient(
964 self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700965
966 self.sched.setLauncher(self.launcher)
967 self.sched.setMerger(self.merge_client)
Clark Boylanb640e052014-04-03 16:41:46 -0700968
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100969 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
970 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700971
972 self.sched.start()
973 self.sched.reconfigure(self.config)
974 self.sched.resume()
975 self.webapp.start()
976 self.rpc.start()
977 self.launcher.gearman.waitForServer()
978 self.registerJobs()
979 self.builds = self.worker.running_builds
980 self.history = self.worker.build_history
981
982 self.addCleanup(self.assertFinalState)
983 self.addCleanup(self.shutdown)
984
Joshua Hesketh352264b2015-08-11 23:42:08 +1000985 def configure_connections(self):
986 # Register connections from the config
987 self.smtp_messages = []
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100988
Joshua Hesketh352264b2015-08-11 23:42:08 +1000989 def FakeSMTPFactory(*args, **kw):
990 args = [self.smtp_messages] + list(args)
991 return FakeSMTP(*args, **kw)
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100992
Joshua Hesketh352264b2015-08-11 23:42:08 +1000993 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100994
Joshua Hesketh352264b2015-08-11 23:42:08 +1000995 # Set a changes database so multiple FakeGerrit's can report back to
996 # a virtual canonical database given by the configured hostname
997 self.gerrit_changes_dbs = {}
998 self.gerrit_queues_dbs = {}
999 self.connections = {}
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001000
Joshua Hesketh352264b2015-08-11 23:42:08 +10001001 for section_name in self.config.sections():
1002 con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
1003 section_name, re.I)
1004 if not con_match:
1005 continue
1006 con_name = con_match.group(2)
1007 con_config = dict(self.config.items(section_name))
1008
1009 if 'driver' not in con_config:
1010 raise Exception("No driver specified for connection %s."
1011 % con_name)
1012
1013 con_driver = con_config['driver']
1014
1015 # TODO(jhesketh): load the required class automatically
1016 if con_driver == 'gerrit':
Joshua Heskethacccffc2015-03-31 23:38:17 +11001017 if con_config['server'] not in self.gerrit_changes_dbs.keys():
1018 self.gerrit_changes_dbs[con_config['server']] = {}
1019 if con_config['server'] not in self.gerrit_queues_dbs.keys():
1020 self.gerrit_queues_dbs[con_config['server']] = \
1021 Queue.Queue()
1022 self.event_queues.append(
1023 self.gerrit_queues_dbs[con_config['server']])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001024 self.connections[con_name] = FakeGerritConnection(
1025 con_name, con_config,
Joshua Heskethacccffc2015-03-31 23:38:17 +11001026 changes_db=self.gerrit_changes_dbs[con_config['server']],
1027 queues_db=self.gerrit_queues_dbs[con_config['server']],
Jan Hruban6b71aff2015-10-22 16:58:08 +02001028 upstream_root=self.upstream_root
Joshua Hesketh352264b2015-08-11 23:42:08 +10001029 )
Joshua Heskethacccffc2015-03-31 23:38:17 +11001030 setattr(self, 'fake_' + con_name, self.connections[con_name])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001031 elif con_driver == 'smtp':
1032 self.connections[con_name] = \
1033 zuul.connection.smtp.SMTPConnection(con_name, con_config)
1034 else:
1035 raise Exception("Unknown driver, %s, for connection %s"
1036 % (con_config['driver'], con_name))
1037
1038 # If the [gerrit] or [smtp] sections still exist, load them in as a
1039 # connection named 'gerrit' or 'smtp' respectfully
1040
1041 if 'gerrit' in self.config.sections():
1042 self.gerrit_changes_dbs['gerrit'] = {}
1043 self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
Jan Hruban6b71aff2015-10-22 16:58:08 +02001044 self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001045 self.connections['gerrit'] = FakeGerritConnection(
1046 '_legacy_gerrit', dict(self.config.items('gerrit')),
1047 changes_db=self.gerrit_changes_dbs['gerrit'],
1048 queues_db=self.gerrit_queues_dbs['gerrit'])
1049
1050 if 'smtp' in self.config.sections():
1051 self.connections['smtp'] = \
1052 zuul.connection.smtp.SMTPConnection(
1053 '_legacy_smtp', dict(self.config.items('smtp')))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001054
Joshua Heskethacccffc2015-03-31 23:38:17 +11001055 def setup_config(self, config_file='zuul.conf'):
Clark Boylanb640e052014-04-03 16:41:46 -07001056 """Per test config object. Override to set different config."""
1057 self.config = ConfigParser.ConfigParser()
Joshua Heskethacccffc2015-03-31 23:38:17 +11001058 self.config.read(os.path.join(FIXTURE_DIR, config_file))
Clark Boylanb640e052014-04-03 16:41:46 -07001059
1060 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001061 # Make sure that git.Repo objects have been garbage collected.
1062 repos = []
1063 gc.collect()
1064 for obj in gc.get_objects():
1065 if isinstance(obj, git.Repo):
1066 repos.append(obj)
1067 self.assertEqual(len(repos), 0)
1068 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001069 for pipeline in self.sched.layout.pipelines.values():
1070 if isinstance(pipeline.manager,
1071 zuul.scheduler.IndependentPipelineManager):
1072 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001073
1074 def shutdown(self):
1075 self.log.debug("Shutting down after tests")
1076 self.launcher.stop()
1077 self.merge_server.stop()
1078 self.merge_server.join()
1079 self.merge_client.stop()
1080 self.worker.shutdown()
Clark Boylanb640e052014-04-03 16:41:46 -07001081 self.sched.stop()
1082 self.sched.join()
1083 self.statsd.stop()
1084 self.statsd.join()
1085 self.webapp.stop()
1086 self.webapp.join()
1087 self.rpc.stop()
1088 self.rpc.join()
1089 self.gearman_server.shutdown()
1090 threads = threading.enumerate()
1091 if len(threads) > 1:
1092 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001093
1094 def init_repo(self, project):
1095 parts = project.split('/')
1096 path = os.path.join(self.upstream_root, *parts[:-1])
1097 if not os.path.exists(path):
1098 os.makedirs(path)
1099 path = os.path.join(self.upstream_root, project)
1100 repo = git.Repo.init(path)
1101
1102 repo.config_writer().set_value('user', 'email', 'user@example.com')
1103 repo.config_writer().set_value('user', 'name', 'User Name')
1104 repo.config_writer().write()
1105
1106 fn = os.path.join(path, 'README')
1107 f = open(fn, 'w')
1108 f.write("test\n")
1109 f.close()
1110 repo.index.add([fn])
1111 repo.index.commit('initial commit')
1112 master = repo.create_head('master')
1113 repo.create_tag('init')
1114
James E. Blair97d902e2014-08-21 13:25:56 -07001115 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001116 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001117 repo.git.clean('-x', '-f', '-d')
1118
1119 self.create_branch(project, 'mp')
1120
1121 def create_branch(self, project, branch):
1122 path = os.path.join(self.upstream_root, project)
1123 repo = git.Repo.init(path)
1124 fn = os.path.join(path, 'README')
1125
1126 branch_head = repo.create_head(branch)
1127 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001128 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001129 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001130 f.close()
1131 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001132 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001133
James E. Blair97d902e2014-08-21 13:25:56 -07001134 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001135 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001136 repo.git.clean('-x', '-f', '-d')
1137
1138 def ref_has_change(self, ref, change):
1139 path = os.path.join(self.git_root, change.project)
1140 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001141 try:
1142 for commit in repo.iter_commits(ref):
1143 if commit.message.strip() == ('%s-1' % change.subject):
1144 return True
1145 except GitCommandError:
1146 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001147 return False
1148
1149 def job_has_changes(self, *args):
1150 job = args[0]
1151 commits = args[1:]
1152 if isinstance(job, FakeBuild):
1153 parameters = job.parameters
1154 else:
1155 parameters = json.loads(job.arguments)
1156 project = parameters['ZUUL_PROJECT']
1157 path = os.path.join(self.git_root, project)
1158 repo = git.Repo(path)
1159 ref = parameters['ZUUL_REF']
1160 sha = parameters['ZUUL_COMMIT']
1161 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1162 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1163 commit_messages = ['%s-1' % commit.subject for commit in commits]
1164 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1165 " repo_messages %s; sha %s" % (job, commit_messages,
1166 repo_messages, sha))
1167 for msg in commit_messages:
1168 if msg not in repo_messages:
1169 self.log.debug(" messages do not match")
1170 return False
1171 if repo_shas[0] != sha:
1172 self.log.debug(" sha does not match")
1173 return False
1174 self.log.debug(" OK")
1175 return True
1176
1177 def registerJobs(self):
1178 count = 0
1179 for job in self.sched.layout.jobs.keys():
1180 self.worker.registerFunction('build:' + job)
1181 count += 1
1182 self.worker.registerFunction('stop:' + self.worker.worker_id)
1183 count += 1
1184
1185 while len(self.gearman_server.functions) < count:
1186 time.sleep(0)
1187
James E. Blairb8c16472015-05-05 14:55:26 -07001188 def orderedRelease(self):
1189 # Run one build at a time to ensure non-race order:
1190 while len(self.builds):
1191 self.release(self.builds[0])
1192 self.waitUntilSettled()
1193
Clark Boylanb640e052014-04-03 16:41:46 -07001194 def release(self, job):
1195 if isinstance(job, FakeBuild):
1196 job.release()
1197 else:
1198 job.waiting = False
1199 self.log.debug("Queued job %s released" % job.unique)
1200 self.gearman_server.wakeConnections()
1201
1202 def getParameter(self, job, name):
1203 if isinstance(job, FakeBuild):
1204 return job.parameters[name]
1205 else:
1206 parameters = json.loads(job.arguments)
1207 return parameters[name]
1208
1209 def resetGearmanServer(self):
1210 self.worker.setFunctions([])
1211 while True:
1212 done = True
1213 for connection in self.gearman_server.active_connections:
1214 if (connection.functions and
1215 connection.client_id not in ['Zuul RPC Listener',
1216 'Zuul Merger']):
1217 done = False
1218 if done:
1219 break
1220 time.sleep(0)
1221 self.gearman_server.functions = set()
1222 self.rpc.register()
1223 self.merge_server.register()
1224
1225 def haveAllBuildsReported(self):
1226 # See if Zuul is waiting on a meta job to complete
1227 if self.launcher.meta_jobs:
1228 return False
1229 # Find out if every build that the worker has completed has been
1230 # reported back to Zuul. If it hasn't then that means a Gearman
1231 # event is still in transit and the system is not stable.
1232 for build in self.worker.build_history:
1233 zbuild = self.launcher.builds.get(build.uuid)
1234 if not zbuild:
1235 # It has already been reported
1236 continue
1237 # It hasn't been reported yet.
1238 return False
1239 # Make sure that none of the worker connections are in GRAB_WAIT
1240 for connection in self.worker.active_connections:
1241 if connection.state == 'GRAB_WAIT':
1242 return False
1243 return True
1244
1245 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001246 builds = self.launcher.builds.values()
1247 for build in builds:
1248 client_job = None
1249 for conn in self.launcher.gearman.active_connections:
1250 for j in conn.related_jobs.values():
1251 if j.unique == build.uuid:
1252 client_job = j
1253 break
1254 if not client_job:
1255 self.log.debug("%s is not known to the gearman client" %
1256 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001257 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001258 if not client_job.handle:
1259 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001260 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001261 server_job = self.gearman_server.jobs.get(client_job.handle)
1262 if not server_job:
1263 self.log.debug("%s is not known to the gearman server" %
1264 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001265 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001266 if not hasattr(server_job, 'waiting'):
1267 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001268 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001269 if server_job.waiting:
1270 continue
1271 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1272 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001273 if build.number is None:
1274 self.log.debug("%s has not reported start" % worker_job)
1275 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001276 if worker_job.build.isWaiting():
1277 continue
1278 else:
1279 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001280 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001281 else:
1282 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001283 return False
1284 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001285
Jan Hruban6b71aff2015-10-22 16:58:08 +02001286 def eventQueuesEmpty(self):
1287 for queue in self.event_queues:
1288 yield queue.empty()
1289
1290 def eventQueuesJoin(self):
1291 for queue in self.event_queues:
1292 queue.join()
1293
Clark Boylanb640e052014-04-03 16:41:46 -07001294 def waitUntilSettled(self):
1295 self.log.debug("Waiting until settled...")
1296 start = time.time()
1297 while True:
1298 if time.time() - start > 10:
1299 print 'queue status:',
Jan Hruban6b71aff2015-10-22 16:58:08 +02001300 print ' '.join(self.eventQueuesEmpty())
Clark Boylanb640e052014-04-03 16:41:46 -07001301 print self.areAllBuildsWaiting()
1302 raise Exception("Timeout waiting for Zuul to settle")
1303 # Make sure no new events show up while we're checking
1304 self.worker.lock.acquire()
1305 # have all build states propogated to zuul?
1306 if self.haveAllBuildsReported():
1307 # Join ensures that the queue is empty _and_ events have been
1308 # processed
Jan Hruban6b71aff2015-10-22 16:58:08 +02001309 self.eventQueuesJoin()
Clark Boylanb640e052014-04-03 16:41:46 -07001310 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001311 if (not self.merge_client.build_sets and
Jan Hruban6b71aff2015-10-22 16:58:08 +02001312 all(self.eventQueuesEmpty()) and
Clark Boylanb640e052014-04-03 16:41:46 -07001313 self.haveAllBuildsReported() and
1314 self.areAllBuildsWaiting()):
1315 self.sched.run_handler_lock.release()
1316 self.worker.lock.release()
1317 self.log.debug("...settled.")
1318 return
1319 self.sched.run_handler_lock.release()
1320 self.worker.lock.release()
1321 self.sched.wake_event.wait(0.1)
1322
1323 def countJobResults(self, jobs, result):
1324 jobs = filter(lambda x: x.result == result, jobs)
1325 return len(jobs)
1326
1327 def getJobFromHistory(self, name):
1328 history = self.worker.build_history
1329 for job in history:
1330 if job.name == name:
1331 return job
1332 raise Exception("Unable to find job %s in history" % name)
1333
1334 def assertEmptyQueues(self):
1335 # Make sure there are no orphaned jobs
1336 for pipeline in self.sched.layout.pipelines.values():
1337 for queue in pipeline.queues:
1338 if len(queue.queue) != 0:
1339 print 'pipeline %s queue %s contents %s' % (
1340 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001341 self.assertEqual(len(queue.queue), 0,
1342 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001343
1344 def assertReportedStat(self, key, value=None, kind=None):
1345 start = time.time()
1346 while time.time() < (start + 5):
1347 for stat in self.statsd.stats:
1348 pprint.pprint(self.statsd.stats)
1349 k, v = stat.split(':')
1350 if key == k:
1351 if value is None and kind is None:
1352 return
1353 elif value:
1354 if value == v:
1355 return
1356 elif kind:
1357 if v.endswith('|' + kind):
1358 return
1359 time.sleep(0.1)
1360
1361 pprint.pprint(self.statsd.stats)
1362 raise Exception("Key %s not found in reported stats" % key)