blob: f3bfa4ea8a19bea2a202100d0eb9254c4b0630b1 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
Joshua Hesketh352264b2015-08-11 23:42:08 +100045import zuul.connection.gerrit
46import zuul.connection.smtp
Clark Boylanb640e052014-04-03 16:41:46 -070047import zuul.scheduler
48import zuul.webapp
49import zuul.rpclistener
50import zuul.launcher.gearman
51import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070052import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070053import zuul.merger.merger
54import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070055import zuul.reporter.gerrit
56import zuul.reporter.smtp
Joshua Hesketh850ccb62014-11-27 11:31:02 +110057import zuul.source.gerrit
Clark Boylanb640e052014-04-03 16:41:46 -070058import zuul.trigger.gerrit
59import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070060import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070061
62FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
63 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070064USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070065
66logging.basicConfig(level=logging.DEBUG,
67 format='%(asctime)s %(name)-32s '
68 '%(levelname)-8s %(message)s')
69
70
71def repack_repo(path):
72 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
73 output = subprocess.Popen(cmd, close_fds=True,
74 stdout=subprocess.PIPE,
75 stderr=subprocess.PIPE)
76 out = output.communicate()
77 if output.returncode:
78 raise Exception("git repack returned %d" % output.returncode)
79 return out
80
81
82def random_sha1():
83 return hashlib.sha1(str(random.random())).hexdigest()
84
85
James E. Blaira190f3b2015-01-05 14:56:54 -080086def iterate_timeout(max_seconds, purpose):
87 start = time.time()
88 count = 0
89 while (time.time() < start + max_seconds):
90 count += 1
91 yield count
92 time.sleep(0)
93 raise Exception("Timeout waiting for %s" % purpose)
94
95
Clark Boylanb640e052014-04-03 16:41:46 -070096class ChangeReference(git.Reference):
97 _common_path_default = "refs/changes"
98 _points_to_commits_only = True
99
100
101class FakeChange(object):
102 categories = {'APRV': ('Approved', -1, 1),
103 'CRVW': ('Code-Review', -2, 2),
104 'VRFY': ('Verified', -2, 2)}
105
106 def __init__(self, gerrit, number, project, branch, subject,
107 status='NEW', upstream_root=None):
108 self.gerrit = gerrit
109 self.reported = 0
110 self.queried = 0
111 self.patchsets = []
112 self.number = number
113 self.project = project
114 self.branch = branch
115 self.subject = subject
116 self.latest_patchset = 0
117 self.depends_on_change = None
118 self.needed_by_changes = []
119 self.fail_merge = False
120 self.messages = []
121 self.data = {
122 'branch': branch,
123 'comments': [],
124 'commitMessage': subject,
125 'createdOn': time.time(),
126 'id': 'I' + random_sha1(),
127 'lastUpdated': time.time(),
128 'number': str(number),
129 'open': status == 'NEW',
130 'owner': {'email': 'user@example.com',
131 'name': 'User Name',
132 'username': 'username'},
133 'patchSets': self.patchsets,
134 'project': project,
135 'status': status,
136 'subject': subject,
137 'submitRecords': [],
138 'url': 'https://hostname/%s' % number}
139
140 self.upstream_root = upstream_root
141 self.addPatchset()
142 self.data['submitRecords'] = self.getSubmitRecords()
143 self.open = status == 'NEW'
144
145 def add_fake_change_to_repo(self, msg, fn, large):
146 path = os.path.join(self.upstream_root, self.project)
147 repo = git.Repo(path)
148 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
149 self.latest_patchset),
150 'refs/tags/init')
151 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700152 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700153 repo.git.clean('-x', '-f', '-d')
154
155 path = os.path.join(self.upstream_root, self.project)
156 if not large:
157 fn = os.path.join(path, fn)
158 f = open(fn, 'w')
159 f.write("test %s %s %s\n" %
160 (self.branch, self.number, self.latest_patchset))
161 f.close()
162 repo.index.add([fn])
163 else:
164 for fni in range(100):
165 fn = os.path.join(path, str(fni))
166 f = open(fn, 'w')
167 for ci in range(4096):
168 f.write(random.choice(string.printable))
169 f.close()
170 repo.index.add([fn])
171
172 r = repo.index.commit(msg)
173 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700174 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700175 repo.git.clean('-x', '-f', '-d')
176 repo.heads['master'].checkout()
177 return r
178
179 def addPatchset(self, files=[], large=False):
180 self.latest_patchset += 1
181 if files:
182 fn = files[0]
183 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700184 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700185 msg = self.subject + '-' + str(self.latest_patchset)
186 c = self.add_fake_change_to_repo(msg, fn, large)
187 ps_files = [{'file': '/COMMIT_MSG',
188 'type': 'ADDED'},
189 {'file': 'README',
190 'type': 'MODIFIED'}]
191 for f in files:
192 ps_files.append({'file': f, 'type': 'ADDED'})
193 d = {'approvals': [],
194 'createdOn': time.time(),
195 'files': ps_files,
196 'number': str(self.latest_patchset),
197 'ref': 'refs/changes/1/%s/%s' % (self.number,
198 self.latest_patchset),
199 'revision': c.hexsha,
200 'uploader': {'email': 'user@example.com',
201 'name': 'User name',
202 'username': 'user'}}
203 self.data['currentPatchSet'] = d
204 self.patchsets.append(d)
205 self.data['submitRecords'] = self.getSubmitRecords()
206
207 def getPatchsetCreatedEvent(self, patchset):
208 event = {"type": "patchset-created",
209 "change": {"project": self.project,
210 "branch": self.branch,
211 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
212 "number": str(self.number),
213 "subject": self.subject,
214 "owner": {"name": "User Name"},
215 "url": "https://hostname/3"},
216 "patchSet": self.patchsets[patchset - 1],
217 "uploader": {"name": "User Name"}}
218 return event
219
220 def getChangeRestoredEvent(self):
221 event = {"type": "change-restored",
222 "change": {"project": self.project,
223 "branch": self.branch,
224 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
225 "number": str(self.number),
226 "subject": self.subject,
227 "owner": {"name": "User Name"},
228 "url": "https://hostname/3"},
229 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100230 "patchSet": self.patchsets[-1],
231 "reason": ""}
232 return event
233
234 def getChangeAbandonedEvent(self):
235 event = {"type": "change-abandoned",
236 "change": {"project": self.project,
237 "branch": self.branch,
238 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
239 "number": str(self.number),
240 "subject": self.subject,
241 "owner": {"name": "User Name"},
242 "url": "https://hostname/3"},
243 "abandoner": {"name": "User Name"},
244 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700245 "reason": ""}
246 return event
247
248 def getChangeCommentEvent(self, patchset):
249 event = {"type": "comment-added",
250 "change": {"project": self.project,
251 "branch": self.branch,
252 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
253 "number": str(self.number),
254 "subject": self.subject,
255 "owner": {"name": "User Name"},
256 "url": "https://hostname/3"},
257 "patchSet": self.patchsets[patchset - 1],
258 "author": {"name": "User Name"},
259 "approvals": [{"type": "Code-Review",
260 "description": "Code-Review",
261 "value": "0"}],
262 "comment": "This is a comment"}
263 return event
264
Joshua Hesketh642824b2014-07-01 17:54:59 +1000265 def addApproval(self, category, value, username='reviewer_john',
266 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700267 if not granted_on:
268 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000269 approval = {
270 'description': self.categories[category][0],
271 'type': category,
272 'value': str(value),
273 'by': {
274 'username': username,
275 'email': username + '@example.com',
276 },
277 'grantedOn': int(granted_on)
278 }
Clark Boylanb640e052014-04-03 16:41:46 -0700279 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
280 if x['by']['username'] == username and x['type'] == category:
281 del self.patchsets[-1]['approvals'][i]
282 self.patchsets[-1]['approvals'].append(approval)
283 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000284 'author': {'email': 'author@example.com',
285 'name': 'Patchset Author',
286 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700287 'change': {'branch': self.branch,
288 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
289 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000290 'owner': {'email': 'owner@example.com',
291 'name': 'Change Owner',
292 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700293 'project': self.project,
294 'subject': self.subject,
295 'topic': 'master',
296 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000297 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700298 'patchSet': self.patchsets[-1],
299 'type': 'comment-added'}
300 self.data['submitRecords'] = self.getSubmitRecords()
301 return json.loads(json.dumps(event))
302
303 def getSubmitRecords(self):
304 status = {}
305 for cat in self.categories.keys():
306 status[cat] = 0
307
308 for a in self.patchsets[-1]['approvals']:
309 cur = status[a['type']]
310 cat_min, cat_max = self.categories[a['type']][1:]
311 new = int(a['value'])
312 if new == cat_min:
313 cur = new
314 elif abs(new) > abs(cur):
315 cur = new
316 status[a['type']] = cur
317
318 labels = []
319 ok = True
320 for typ, cat in self.categories.items():
321 cur = status[typ]
322 cat_min, cat_max = cat[1:]
323 if cur == cat_min:
324 value = 'REJECT'
325 ok = False
326 elif cur == cat_max:
327 value = 'OK'
328 else:
329 value = 'NEED'
330 ok = False
331 labels.append({'label': cat[0], 'status': value})
332 if ok:
333 return [{'status': 'OK'}]
334 return [{'status': 'NOT_READY',
335 'labels': labels}]
336
337 def setDependsOn(self, other, patchset):
338 self.depends_on_change = other
339 d = {'id': other.data['id'],
340 'number': other.data['number'],
341 'ref': other.patchsets[patchset - 1]['ref']
342 }
343 self.data['dependsOn'] = [d]
344
345 other.needed_by_changes.append(self)
346 needed = other.data.get('neededBy', [])
347 d = {'id': self.data['id'],
348 'number': self.data['number'],
349 'ref': self.patchsets[patchset - 1]['ref'],
350 'revision': self.patchsets[patchset - 1]['revision']
351 }
352 needed.append(d)
353 other.data['neededBy'] = needed
354
355 def query(self):
356 self.queried += 1
357 d = self.data.get('dependsOn')
358 if d:
359 d = d[0]
360 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
361 d['isCurrentPatchSet'] = True
362 else:
363 d['isCurrentPatchSet'] = False
364 return json.loads(json.dumps(self.data))
365
366 def setMerged(self):
367 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000368 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700369 return
370 if self.fail_merge:
371 return
372 self.data['status'] = 'MERGED'
373 self.open = False
374
375 path = os.path.join(self.upstream_root, self.project)
376 repo = git.Repo(path)
377 repo.heads[self.branch].commit = \
378 repo.commit(self.patchsets[-1]['revision'])
379
380 def setReported(self):
381 self.reported += 1
382
383
Joshua Hesketh352264b2015-08-11 23:42:08 +1000384class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
385 log = logging.getLogger("zuul.test.FakeGerritConnection")
James E. Blair96698e22015-04-02 07:48:21 -0700386
Joshua Hesketh352264b2015-08-11 23:42:08 +1000387 def __init__(self, connection_name, connection_config,
Jan Hruban6b71aff2015-10-22 16:58:08 +0200388 changes_db=None, queues_db=None, upstream_root=None):
Joshua Hesketh352264b2015-08-11 23:42:08 +1000389 super(FakeGerritConnection, self).__init__(connection_name,
390 connection_config)
391
392 self.event_queue = queues_db
Clark Boylanb640e052014-04-03 16:41:46 -0700393 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
394 self.change_number = 0
Joshua Hesketh352264b2015-08-11 23:42:08 +1000395 self.changes = changes_db
James E. Blairf8ff9932014-08-15 15:24:24 -0700396 self.queries = []
Jan Hruban6b71aff2015-10-22 16:58:08 +0200397 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700398
399 def addFakeChange(self, project, branch, subject, status='NEW'):
400 self.change_number += 1
401 c = FakeChange(self, self.change_number, project, branch, subject,
402 upstream_root=self.upstream_root,
403 status=status)
404 self.changes[self.change_number] = c
405 return c
406
Clark Boylanb640e052014-04-03 16:41:46 -0700407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000410
411 # Add the approval back onto the change (ie simulate what gerrit would
412 # do).
413 # Usually when zuul leaves a review it'll create a feedback loop where
414 # zuul's review enters another gerrit event (which is then picked up by
415 # zuul). However, we can't mimic this behaviour (by adding this
416 # approval event into the queue) as it stops jobs from checking what
417 # happens before this event is triggered. If a job needs to see what
418 # happens they can add their own verified event into the queue.
419 # Nevertheless, we can update change with the new review in gerrit.
420
421 for cat in ['CRVW', 'VRFY', 'APRV']:
422 if cat in action:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000423 change.addApproval(cat, action[cat], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000424
425 if 'label' in action:
426 parts = action['label'].split('=')
Joshua Hesketh352264b2015-08-11 23:42:08 +1000427 change.addApproval(parts[0], parts[2], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000428
Clark Boylanb640e052014-04-03 16:41:46 -0700429 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000430
Clark Boylanb640e052014-04-03 16:41:46 -0700431 if 'submit' in action:
432 change.setMerged()
433 if message:
434 change.setReported()
435
436 def query(self, number):
437 change = self.changes.get(int(number))
438 if change:
439 return change.query()
440 return {}
441
James E. Blairc494d542014-08-06 09:23:52 -0700442 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700443 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700444 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800445 if query.startswith('change:'):
446 # Query a specific changeid
447 changeid = query[len('change:'):]
448 l = [change.query() for change in self.changes.values()
449 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700450 elif query.startswith('message:'):
451 # Query the content of a commit message
452 msg = query[len('message:'):].strip()
453 l = [change.query() for change in self.changes.values()
454 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800455 else:
456 # Query all open changes
457 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700458 return l
James E. Blairc494d542014-08-06 09:23:52 -0700459
Joshua Hesketh352264b2015-08-11 23:42:08 +1000460 def _start_watcher_thread(self, *args, **kw):
Clark Boylanb640e052014-04-03 16:41:46 -0700461 pass
462
Joshua Hesketh352264b2015-08-11 23:42:08 +1000463 def getGitUrl(self, project):
464 return os.path.join(self.upstream_root, project.name)
465
Clark Boylanb640e052014-04-03 16:41:46 -0700466
467class BuildHistory(object):
468 def __init__(self, **kw):
469 self.__dict__.update(kw)
470
471 def __repr__(self):
472 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
473 (self.result, self.name, self.number, self.changes))
474
475
476class FakeURLOpener(object):
Jan Hruban6b71aff2015-10-22 16:58:08 +0200477 def __init__(self, upstream_root, url):
Clark Boylanb640e052014-04-03 16:41:46 -0700478 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700479 self.url = url
480
481 def read(self):
482 res = urlparse.urlparse(self.url)
483 path = res.path
484 project = '/'.join(path.split('/')[2:-2])
485 ret = '001e# service=git-upload-pack\n'
486 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
487 'multi_ack thin-pack side-band side-band-64k ofs-delta '
488 'shallow no-progress include-tag multi_ack_detailed no-done\n')
489 path = os.path.join(self.upstream_root, project)
490 repo = git.Repo(path)
491 for ref in repo.refs:
492 r = ref.object.hexsha + ' ' + ref.path + '\n'
493 ret += '%04x%s' % (len(r) + 4, r)
494 ret += '0000'
495 return ret
496
497
Clark Boylanb640e052014-04-03 16:41:46 -0700498class FakeStatsd(threading.Thread):
499 def __init__(self):
500 threading.Thread.__init__(self)
501 self.daemon = True
502 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
503 self.sock.bind(('', 0))
504 self.port = self.sock.getsockname()[1]
505 self.wake_read, self.wake_write = os.pipe()
506 self.stats = []
507
508 def run(self):
509 while True:
510 poll = select.poll()
511 poll.register(self.sock, select.POLLIN)
512 poll.register(self.wake_read, select.POLLIN)
513 ret = poll.poll()
514 for (fd, event) in ret:
515 if fd == self.sock.fileno():
516 data = self.sock.recvfrom(1024)
517 if not data:
518 return
519 self.stats.append(data[0])
520 if fd == self.wake_read:
521 return
522
523 def stop(self):
524 os.write(self.wake_write, '1\n')
525
526
527class FakeBuild(threading.Thread):
528 log = logging.getLogger("zuul.test")
529
530 def __init__(self, worker, job, number, node):
531 threading.Thread.__init__(self)
532 self.daemon = True
533 self.worker = worker
534 self.job = job
535 self.name = job.name.split(':')[1]
536 self.number = number
537 self.node = node
538 self.parameters = json.loads(job.arguments)
539 self.unique = self.parameters['ZUUL_UUID']
540 self.wait_condition = threading.Condition()
541 self.waiting = False
542 self.aborted = False
543 self.created = time.time()
544 self.description = ''
545 self.run_error = False
546
547 def release(self):
548 self.wait_condition.acquire()
549 self.wait_condition.notify()
550 self.waiting = False
551 self.log.debug("Build %s released" % self.unique)
552 self.wait_condition.release()
553
554 def isWaiting(self):
555 self.wait_condition.acquire()
556 if self.waiting:
557 ret = True
558 else:
559 ret = False
560 self.wait_condition.release()
561 return ret
562
563 def _wait(self):
564 self.wait_condition.acquire()
565 self.waiting = True
566 self.log.debug("Build %s waiting" % self.unique)
567 self.wait_condition.wait()
568 self.wait_condition.release()
569
570 def run(self):
571 data = {
572 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
573 'name': self.name,
574 'number': self.number,
575 'manager': self.worker.worker_id,
576 'worker_name': 'My Worker',
577 'worker_hostname': 'localhost',
578 'worker_ips': ['127.0.0.1', '192.168.1.1'],
579 'worker_fqdn': 'zuul.example.org',
580 'worker_program': 'FakeBuilder',
581 'worker_version': 'v1.1',
582 'worker_extra': {'something': 'else'}
583 }
584
585 self.log.debug('Running build %s' % self.unique)
586
587 self.job.sendWorkData(json.dumps(data))
588 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
589 self.job.sendWorkStatus(0, 100)
590
591 if self.worker.hold_jobs_in_build:
592 self.log.debug('Holding build %s' % self.unique)
593 self._wait()
594 self.log.debug("Build %s continuing" % self.unique)
595
596 self.worker.lock.acquire()
597
598 result = 'SUCCESS'
599 if (('ZUUL_REF' in self.parameters) and
600 self.worker.shouldFailTest(self.name,
601 self.parameters['ZUUL_REF'])):
602 result = 'FAILURE'
603 if self.aborted:
604 result = 'ABORTED'
605
606 if self.run_error:
607 work_fail = True
608 result = 'RUN_ERROR'
609 else:
610 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500611 data['node_labels'] = ['bare-necessities']
612 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700613 work_fail = False
614
615 changes = None
616 if 'ZUUL_CHANGE_IDS' in self.parameters:
617 changes = self.parameters['ZUUL_CHANGE_IDS']
618
619 self.worker.build_history.append(
620 BuildHistory(name=self.name, number=self.number,
621 result=result, changes=changes, node=self.node,
622 uuid=self.unique, description=self.description,
623 pipeline=self.parameters['ZUUL_PIPELINE'])
624 )
625
626 self.job.sendWorkData(json.dumps(data))
627 if work_fail:
628 self.job.sendWorkFail()
629 else:
630 self.job.sendWorkComplete(json.dumps(data))
631 del self.worker.gearman_jobs[self.job.unique]
632 self.worker.running_builds.remove(self)
633 self.worker.lock.release()
634
635
636class FakeWorker(gear.Worker):
637 def __init__(self, worker_id, test):
638 super(FakeWorker, self).__init__(worker_id)
639 self.gearman_jobs = {}
640 self.build_history = []
641 self.running_builds = []
642 self.build_counter = 0
643 self.fail_tests = {}
644 self.test = test
645
646 self.hold_jobs_in_build = False
647 self.lock = threading.Lock()
648 self.__work_thread = threading.Thread(target=self.work)
649 self.__work_thread.daemon = True
650 self.__work_thread.start()
651
652 def handleJob(self, job):
653 parts = job.name.split(":")
654 cmd = parts[0]
655 name = parts[1]
656 if len(parts) > 2:
657 node = parts[2]
658 else:
659 node = None
660 if cmd == 'build':
661 self.handleBuild(job, name, node)
662 elif cmd == 'stop':
663 self.handleStop(job, name)
664 elif cmd == 'set_description':
665 self.handleSetDescription(job, name)
666
667 def handleBuild(self, job, name, node):
668 build = FakeBuild(self, job, self.build_counter, node)
669 job.build = build
670 self.gearman_jobs[job.unique] = job
671 self.build_counter += 1
672
673 self.running_builds.append(build)
674 build.start()
675
676 def handleStop(self, job, name):
677 self.log.debug("handle stop")
678 parameters = json.loads(job.arguments)
679 name = parameters['name']
680 number = parameters['number']
681 for build in self.running_builds:
682 if build.name == name and build.number == number:
683 build.aborted = True
684 build.release()
685 job.sendWorkComplete()
686 return
687 job.sendWorkFail()
688
689 def handleSetDescription(self, job, name):
690 self.log.debug("handle set description")
691 parameters = json.loads(job.arguments)
692 name = parameters['name']
693 number = parameters['number']
694 descr = parameters['html_description']
695 for build in self.running_builds:
696 if build.name == name and build.number == number:
697 build.description = descr
698 job.sendWorkComplete()
699 return
700 for build in self.build_history:
701 if build.name == name and build.number == number:
702 build.description = descr
703 job.sendWorkComplete()
704 return
705 job.sendWorkFail()
706
707 def work(self):
708 while self.running:
709 try:
710 job = self.getJob()
711 except gear.InterruptedError:
712 continue
713 try:
714 self.handleJob(job)
715 except:
716 self.log.exception("Worker exception:")
717
718 def addFailTest(self, name, change):
719 l = self.fail_tests.get(name, [])
720 l.append(change)
721 self.fail_tests[name] = l
722
723 def shouldFailTest(self, name, ref):
724 l = self.fail_tests.get(name, [])
725 for change in l:
726 if self.test.ref_has_change(ref, change):
727 return True
728 return False
729
730 def release(self, regex=None):
731 builds = self.running_builds[:]
732 self.log.debug("releasing build %s (%s)" % (regex,
733 len(self.running_builds)))
734 for build in builds:
735 if not regex or re.match(regex, build.name):
736 self.log.debug("releasing build %s" %
737 (build.parameters['ZUUL_UUID']))
738 build.release()
739 else:
740 self.log.debug("not releasing build %s" %
741 (build.parameters['ZUUL_UUID']))
742 self.log.debug("done releasing builds %s (%s)" %
743 (regex, len(self.running_builds)))
744
745
746class FakeGearmanServer(gear.Server):
747 def __init__(self):
748 self.hold_jobs_in_queue = False
749 super(FakeGearmanServer, self).__init__(0)
750
751 def getJobForConnection(self, connection, peek=False):
752 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
753 for job in queue:
754 if not hasattr(job, 'waiting'):
755 if job.name.startswith('build:'):
756 job.waiting = self.hold_jobs_in_queue
757 else:
758 job.waiting = False
759 if job.waiting:
760 continue
761 if job.name in connection.functions:
762 if not peek:
763 queue.remove(job)
764 connection.related_jobs[job.handle] = job
765 job.worker_connection = connection
766 job.running = True
767 return job
768 return None
769
770 def release(self, regex=None):
771 released = False
772 qlen = (len(self.high_queue) + len(self.normal_queue) +
773 len(self.low_queue))
774 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
775 for job in self.getQueue():
776 cmd, name = job.name.split(':')
777 if cmd != 'build':
778 continue
779 if not regex or re.match(regex, name):
780 self.log.debug("releasing queued job %s" %
781 job.unique)
782 job.waiting = False
783 released = True
784 else:
785 self.log.debug("not releasing queued job %s" %
786 job.unique)
787 if released:
788 self.wakeConnections()
789 qlen = (len(self.high_queue) + len(self.normal_queue) +
790 len(self.low_queue))
791 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
792
793
794class FakeSMTP(object):
795 log = logging.getLogger('zuul.FakeSMTP')
796
797 def __init__(self, messages, server, port):
798 self.server = server
799 self.port = port
800 self.messages = messages
801
802 def sendmail(self, from_email, to_email, msg):
803 self.log.info("Sending email from %s, to %s, with msg %s" % (
804 from_email, to_email, msg))
805
806 headers = msg.split('\n\n', 1)[0]
807 body = msg.split('\n\n', 1)[1]
808
809 self.messages.append(dict(
810 from_email=from_email,
811 to_email=to_email,
812 msg=msg,
813 headers=headers,
814 body=body,
815 ))
816
817 return True
818
819 def quit(self):
820 return True
821
822
823class FakeSwiftClientConnection(swiftclient.client.Connection):
824 def post_account(self, headers):
825 # Do nothing
826 pass
827
828 def get_auth(self):
829 # Returns endpoint and (unused) auth token
830 endpoint = os.path.join('https://storage.example.org', 'V1',
831 'AUTH_account')
832 return endpoint, ''
833
834
Maru Newby3fe5f852015-01-13 04:22:14 +0000835class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700836 log = logging.getLogger("zuul.test")
837
838 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000839 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700840 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
841 try:
842 test_timeout = int(test_timeout)
843 except ValueError:
844 # If timeout value is invalid do not set a timeout.
845 test_timeout = 0
846 if test_timeout > 0:
847 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
848
849 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
850 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
851 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
852 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
853 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
854 os.environ.get('OS_STDERR_CAPTURE') == '1'):
855 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
856 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
857 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
858 os.environ.get('OS_LOG_CAPTURE') == '1'):
859 self.useFixture(fixtures.FakeLogger(
860 level=logging.DEBUG,
861 format='%(asctime)s %(name)-32s '
862 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000863
864
865class ZuulTestCase(BaseTestCase):
866
867 def setUp(self):
868 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700869 if USE_TEMPDIR:
870 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000871 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
872 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700873 else:
874 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700875 self.test_root = os.path.join(tmp_root, "zuul-test")
876 self.upstream_root = os.path.join(self.test_root, "upstream")
877 self.git_root = os.path.join(self.test_root, "git")
878
879 if os.path.exists(self.test_root):
880 shutil.rmtree(self.test_root)
881 os.makedirs(self.test_root)
882 os.makedirs(self.upstream_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700883
884 # Make per test copy of Configuration.
885 self.setup_config()
886 self.config.set('zuul', 'layout_config',
Joshua Heskethacccffc2015-03-31 23:38:17 +1100887 os.path.join(FIXTURE_DIR,
888 self.config.get('zuul', 'layout_config')))
Clark Boylanb640e052014-04-03 16:41:46 -0700889 self.config.set('merger', 'git_dir', self.git_root)
890
891 # For each project in config:
892 self.init_repo("org/project")
893 self.init_repo("org/project1")
894 self.init_repo("org/project2")
895 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700896 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700897 self.init_repo("org/project5")
898 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700899 self.init_repo("org/one-job-project")
900 self.init_repo("org/nonvoting-project")
901 self.init_repo("org/templated-project")
902 self.init_repo("org/layered-project")
903 self.init_repo("org/node-project")
904 self.init_repo("org/conflict-project")
905 self.init_repo("org/noop-project")
906 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000907 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700908
909 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000910 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
911 # see: https://github.com/jsocol/pystatsd/issues/61
912 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700913 os.environ['STATSD_PORT'] = str(self.statsd.port)
914 self.statsd.start()
915 # the statsd client object is configured in the statsd module import
916 reload(statsd)
917 reload(zuul.scheduler)
918
919 self.gearman_server = FakeGearmanServer()
920
921 self.config.set('gearman', 'port', str(self.gearman_server.port))
922
923 self.worker = FakeWorker('fake_worker', self)
924 self.worker.addServer('127.0.0.1', self.gearman_server.port)
925 self.gearman_server.worker = self.worker
926
Joshua Hesketh352264b2015-08-11 23:42:08 +1000927 zuul.source.gerrit.GerritSource.replication_timeout = 1.5
928 zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
929 zuul.connection.gerrit.GerritEventConnector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700930
Joshua Hesketh352264b2015-08-11 23:42:08 +1000931 self.sched = zuul.scheduler.Scheduler(self.config)
Clark Boylanb640e052014-04-03 16:41:46 -0700932
933 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
934 FakeSwiftClientConnection))
935 self.swift = zuul.lib.swift.Swift(self.config)
936
Jan Hruban6b71aff2015-10-22 16:58:08 +0200937 self.event_queues = [
938 self.sched.result_event_queue,
939 self.sched.trigger_event_queue
940 ]
941
Joshua Hesketh352264b2015-08-11 23:42:08 +1000942 self.configure_connections()
943 self.sched.registerConnections(self.connections)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000944
Clark Boylanb640e052014-04-03 16:41:46 -0700945 def URLOpenerFactory(*args, **kw):
946 if isinstance(args[0], urllib2.Request):
947 return old_urlopen(*args, **kw)
Clark Boylanb640e052014-04-03 16:41:46 -0700948 return FakeURLOpener(self.upstream_root, *args, **kw)
949
950 old_urlopen = urllib2.urlopen
951 urllib2.urlopen = URLOpenerFactory
952
Joshua Hesketh352264b2015-08-11 23:42:08 +1000953 self.merge_server = zuul.merger.server.MergeServer(self.config,
954 self.connections)
955 self.merge_server.start()
Clark Boylanb640e052014-04-03 16:41:46 -0700956
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100957 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
958 self.swift)
959 self.merge_client = zuul.merger.client.MergeClient(
960 self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700961
962 self.sched.setLauncher(self.launcher)
963 self.sched.setMerger(self.merge_client)
Clark Boylanb640e052014-04-03 16:41:46 -0700964
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100965 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
966 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700967
968 self.sched.start()
969 self.sched.reconfigure(self.config)
970 self.sched.resume()
971 self.webapp.start()
972 self.rpc.start()
973 self.launcher.gearman.waitForServer()
974 self.registerJobs()
975 self.builds = self.worker.running_builds
976 self.history = self.worker.build_history
977
978 self.addCleanup(self.assertFinalState)
979 self.addCleanup(self.shutdown)
980
Joshua Hesketh352264b2015-08-11 23:42:08 +1000981 def configure_connections(self):
982 # Register connections from the config
983 self.smtp_messages = []
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100984
Joshua Hesketh352264b2015-08-11 23:42:08 +1000985 def FakeSMTPFactory(*args, **kw):
986 args = [self.smtp_messages] + list(args)
987 return FakeSMTP(*args, **kw)
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100988
Joshua Hesketh352264b2015-08-11 23:42:08 +1000989 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100990
Joshua Hesketh352264b2015-08-11 23:42:08 +1000991 # Set a changes database so multiple FakeGerrit's can report back to
992 # a virtual canonical database given by the configured hostname
993 self.gerrit_changes_dbs = {}
994 self.gerrit_queues_dbs = {}
995 self.connections = {}
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100996
Joshua Hesketh352264b2015-08-11 23:42:08 +1000997 for section_name in self.config.sections():
998 con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
999 section_name, re.I)
1000 if not con_match:
1001 continue
1002 con_name = con_match.group(2)
1003 con_config = dict(self.config.items(section_name))
1004
1005 if 'driver' not in con_config:
1006 raise Exception("No driver specified for connection %s."
1007 % con_name)
1008
1009 con_driver = con_config['driver']
1010
1011 # TODO(jhesketh): load the required class automatically
1012 if con_driver == 'gerrit':
Joshua Heskethacccffc2015-03-31 23:38:17 +11001013 if con_config['server'] not in self.gerrit_changes_dbs.keys():
1014 self.gerrit_changes_dbs[con_config['server']] = {}
1015 if con_config['server'] not in self.gerrit_queues_dbs.keys():
1016 self.gerrit_queues_dbs[con_config['server']] = \
1017 Queue.Queue()
1018 self.event_queues.append(
1019 self.gerrit_queues_dbs[con_config['server']])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001020 self.connections[con_name] = FakeGerritConnection(
1021 con_name, con_config,
Joshua Heskethacccffc2015-03-31 23:38:17 +11001022 changes_db=self.gerrit_changes_dbs[con_config['server']],
1023 queues_db=self.gerrit_queues_dbs[con_config['server']],
Jan Hruban6b71aff2015-10-22 16:58:08 +02001024 upstream_root=self.upstream_root
Joshua Hesketh352264b2015-08-11 23:42:08 +10001025 )
Joshua Heskethacccffc2015-03-31 23:38:17 +11001026 setattr(self, 'fake_' + con_name, self.connections[con_name])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001027 elif con_driver == 'smtp':
1028 self.connections[con_name] = \
1029 zuul.connection.smtp.SMTPConnection(con_name, con_config)
1030 else:
1031 raise Exception("Unknown driver, %s, for connection %s"
1032 % (con_config['driver'], con_name))
1033
1034 # If the [gerrit] or [smtp] sections still exist, load them in as a
1035 # connection named 'gerrit' or 'smtp' respectfully
1036
1037 if 'gerrit' in self.config.sections():
1038 self.gerrit_changes_dbs['gerrit'] = {}
1039 self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
Jan Hruban6b71aff2015-10-22 16:58:08 +02001040 self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001041 self.connections['gerrit'] = FakeGerritConnection(
1042 '_legacy_gerrit', dict(self.config.items('gerrit')),
1043 changes_db=self.gerrit_changes_dbs['gerrit'],
1044 queues_db=self.gerrit_queues_dbs['gerrit'])
1045
1046 if 'smtp' in self.config.sections():
1047 self.connections['smtp'] = \
1048 zuul.connection.smtp.SMTPConnection(
1049 '_legacy_smtp', dict(self.config.items('smtp')))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001050
Joshua Heskethacccffc2015-03-31 23:38:17 +11001051 def setup_config(self, config_file='zuul.conf'):
Clark Boylanb640e052014-04-03 16:41:46 -07001052 """Per test config object. Override to set different config."""
1053 self.config = ConfigParser.ConfigParser()
Joshua Heskethacccffc2015-03-31 23:38:17 +11001054 self.config.read(os.path.join(FIXTURE_DIR, config_file))
Clark Boylanb640e052014-04-03 16:41:46 -07001055
1056 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001057 # Make sure that git.Repo objects have been garbage collected.
1058 repos = []
1059 gc.collect()
1060 for obj in gc.get_objects():
1061 if isinstance(obj, git.Repo):
1062 repos.append(obj)
1063 self.assertEqual(len(repos), 0)
1064 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001065 for pipeline in self.sched.layout.pipelines.values():
1066 if isinstance(pipeline.manager,
1067 zuul.scheduler.IndependentPipelineManager):
1068 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001069
1070 def shutdown(self):
1071 self.log.debug("Shutting down after tests")
1072 self.launcher.stop()
1073 self.merge_server.stop()
1074 self.merge_server.join()
1075 self.merge_client.stop()
1076 self.worker.shutdown()
Clark Boylanb640e052014-04-03 16:41:46 -07001077 self.sched.stop()
1078 self.sched.join()
1079 self.statsd.stop()
1080 self.statsd.join()
1081 self.webapp.stop()
1082 self.webapp.join()
1083 self.rpc.stop()
1084 self.rpc.join()
1085 self.gearman_server.shutdown()
1086 threads = threading.enumerate()
1087 if len(threads) > 1:
1088 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001089
1090 def init_repo(self, project):
1091 parts = project.split('/')
1092 path = os.path.join(self.upstream_root, *parts[:-1])
1093 if not os.path.exists(path):
1094 os.makedirs(path)
1095 path = os.path.join(self.upstream_root, project)
1096 repo = git.Repo.init(path)
1097
1098 repo.config_writer().set_value('user', 'email', 'user@example.com')
1099 repo.config_writer().set_value('user', 'name', 'User Name')
1100 repo.config_writer().write()
1101
1102 fn = os.path.join(path, 'README')
1103 f = open(fn, 'w')
1104 f.write("test\n")
1105 f.close()
1106 repo.index.add([fn])
1107 repo.index.commit('initial commit')
1108 master = repo.create_head('master')
1109 repo.create_tag('init')
1110
James E. Blair97d902e2014-08-21 13:25:56 -07001111 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001112 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001113 repo.git.clean('-x', '-f', '-d')
1114
1115 self.create_branch(project, 'mp')
1116
1117 def create_branch(self, project, branch):
1118 path = os.path.join(self.upstream_root, project)
1119 repo = git.Repo.init(path)
1120 fn = os.path.join(path, 'README')
1121
1122 branch_head = repo.create_head(branch)
1123 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001124 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001125 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001126 f.close()
1127 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001128 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001129
James E. Blair97d902e2014-08-21 13:25:56 -07001130 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001131 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001132 repo.git.clean('-x', '-f', '-d')
1133
1134 def ref_has_change(self, ref, change):
1135 path = os.path.join(self.git_root, change.project)
1136 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001137 try:
1138 for commit in repo.iter_commits(ref):
1139 if commit.message.strip() == ('%s-1' % change.subject):
1140 return True
1141 except GitCommandError:
1142 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001143 return False
1144
1145 def job_has_changes(self, *args):
1146 job = args[0]
1147 commits = args[1:]
1148 if isinstance(job, FakeBuild):
1149 parameters = job.parameters
1150 else:
1151 parameters = json.loads(job.arguments)
1152 project = parameters['ZUUL_PROJECT']
1153 path = os.path.join(self.git_root, project)
1154 repo = git.Repo(path)
1155 ref = parameters['ZUUL_REF']
1156 sha = parameters['ZUUL_COMMIT']
1157 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1158 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1159 commit_messages = ['%s-1' % commit.subject for commit in commits]
1160 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1161 " repo_messages %s; sha %s" % (job, commit_messages,
1162 repo_messages, sha))
1163 for msg in commit_messages:
1164 if msg not in repo_messages:
1165 self.log.debug(" messages do not match")
1166 return False
1167 if repo_shas[0] != sha:
1168 self.log.debug(" sha does not match")
1169 return False
1170 self.log.debug(" OK")
1171 return True
1172
1173 def registerJobs(self):
1174 count = 0
1175 for job in self.sched.layout.jobs.keys():
1176 self.worker.registerFunction('build:' + job)
1177 count += 1
1178 self.worker.registerFunction('stop:' + self.worker.worker_id)
1179 count += 1
1180
1181 while len(self.gearman_server.functions) < count:
1182 time.sleep(0)
1183
James E. Blairb8c16472015-05-05 14:55:26 -07001184 def orderedRelease(self):
1185 # Run one build at a time to ensure non-race order:
1186 while len(self.builds):
1187 self.release(self.builds[0])
1188 self.waitUntilSettled()
1189
Clark Boylanb640e052014-04-03 16:41:46 -07001190 def release(self, job):
1191 if isinstance(job, FakeBuild):
1192 job.release()
1193 else:
1194 job.waiting = False
1195 self.log.debug("Queued job %s released" % job.unique)
1196 self.gearman_server.wakeConnections()
1197
1198 def getParameter(self, job, name):
1199 if isinstance(job, FakeBuild):
1200 return job.parameters[name]
1201 else:
1202 parameters = json.loads(job.arguments)
1203 return parameters[name]
1204
1205 def resetGearmanServer(self):
1206 self.worker.setFunctions([])
1207 while True:
1208 done = True
1209 for connection in self.gearman_server.active_connections:
1210 if (connection.functions and
1211 connection.client_id not in ['Zuul RPC Listener',
1212 'Zuul Merger']):
1213 done = False
1214 if done:
1215 break
1216 time.sleep(0)
1217 self.gearman_server.functions = set()
1218 self.rpc.register()
1219 self.merge_server.register()
1220
1221 def haveAllBuildsReported(self):
1222 # See if Zuul is waiting on a meta job to complete
1223 if self.launcher.meta_jobs:
1224 return False
1225 # Find out if every build that the worker has completed has been
1226 # reported back to Zuul. If it hasn't then that means a Gearman
1227 # event is still in transit and the system is not stable.
1228 for build in self.worker.build_history:
1229 zbuild = self.launcher.builds.get(build.uuid)
1230 if not zbuild:
1231 # It has already been reported
1232 continue
1233 # It hasn't been reported yet.
1234 return False
1235 # Make sure that none of the worker connections are in GRAB_WAIT
1236 for connection in self.worker.active_connections:
1237 if connection.state == 'GRAB_WAIT':
1238 return False
1239 return True
1240
1241 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001242 builds = self.launcher.builds.values()
1243 for build in builds:
1244 client_job = None
1245 for conn in self.launcher.gearman.active_connections:
1246 for j in conn.related_jobs.values():
1247 if j.unique == build.uuid:
1248 client_job = j
1249 break
1250 if not client_job:
1251 self.log.debug("%s is not known to the gearman client" %
1252 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001253 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001254 if not client_job.handle:
1255 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001256 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001257 server_job = self.gearman_server.jobs.get(client_job.handle)
1258 if not server_job:
1259 self.log.debug("%s is not known to the gearman server" %
1260 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001261 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001262 if not hasattr(server_job, 'waiting'):
1263 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001264 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001265 if server_job.waiting:
1266 continue
1267 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1268 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001269 if build.number is None:
1270 self.log.debug("%s has not reported start" % worker_job)
1271 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001272 if worker_job.build.isWaiting():
1273 continue
1274 else:
1275 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001276 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001277 else:
1278 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001279 return False
1280 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001281
Jan Hruban6b71aff2015-10-22 16:58:08 +02001282 def eventQueuesEmpty(self):
1283 for queue in self.event_queues:
1284 yield queue.empty()
1285
1286 def eventQueuesJoin(self):
1287 for queue in self.event_queues:
1288 queue.join()
1289
Clark Boylanb640e052014-04-03 16:41:46 -07001290 def waitUntilSettled(self):
1291 self.log.debug("Waiting until settled...")
1292 start = time.time()
1293 while True:
1294 if time.time() - start > 10:
1295 print 'queue status:',
Jan Hruban6b71aff2015-10-22 16:58:08 +02001296 print ' '.join(self.eventQueuesEmpty())
Clark Boylanb640e052014-04-03 16:41:46 -07001297 print self.areAllBuildsWaiting()
1298 raise Exception("Timeout waiting for Zuul to settle")
1299 # Make sure no new events show up while we're checking
1300 self.worker.lock.acquire()
1301 # have all build states propogated to zuul?
1302 if self.haveAllBuildsReported():
1303 # Join ensures that the queue is empty _and_ events have been
1304 # processed
Jan Hruban6b71aff2015-10-22 16:58:08 +02001305 self.eventQueuesJoin()
Clark Boylanb640e052014-04-03 16:41:46 -07001306 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001307 if (not self.merge_client.build_sets and
Jan Hruban6b71aff2015-10-22 16:58:08 +02001308 all(self.eventQueuesEmpty()) and
Clark Boylanb640e052014-04-03 16:41:46 -07001309 self.haveAllBuildsReported() and
1310 self.areAllBuildsWaiting()):
1311 self.sched.run_handler_lock.release()
1312 self.worker.lock.release()
1313 self.log.debug("...settled.")
1314 return
1315 self.sched.run_handler_lock.release()
1316 self.worker.lock.release()
1317 self.sched.wake_event.wait(0.1)
1318
1319 def countJobResults(self, jobs, result):
1320 jobs = filter(lambda x: x.result == result, jobs)
1321 return len(jobs)
1322
1323 def getJobFromHistory(self, name):
1324 history = self.worker.build_history
1325 for job in history:
1326 if job.name == name:
1327 return job
1328 raise Exception("Unable to find job %s in history" % name)
1329
1330 def assertEmptyQueues(self):
1331 # Make sure there are no orphaned jobs
1332 for pipeline in self.sched.layout.pipelines.values():
1333 for queue in pipeline.queues:
1334 if len(queue.queue) != 0:
1335 print 'pipeline %s queue %s contents %s' % (
1336 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001337 self.assertEqual(len(queue.queue), 0,
1338 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001339
1340 def assertReportedStat(self, key, value=None, kind=None):
1341 start = time.time()
1342 while time.time() < (start + 5):
1343 for stat in self.statsd.stats:
1344 pprint.pprint(self.statsd.stats)
1345 k, v = stat.split(':')
1346 if key == k:
1347 if value is None and kind is None:
1348 return
1349 elif value:
1350 if value == v:
1351 return
1352 elif kind:
1353 if v.endswith('|' + kind):
1354 return
1355 time.sleep(0.1)
1356
1357 pprint.pprint(self.statsd.stats)
1358 raise Exception("Key %s not found in reported stats" % key)