blob: 2534f1168ebb88409a7ed90937938f8e46b513c4 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Clark Boylanb640e052014-04-03 16:41:46 -070025import random
26import re
27import select
28import shutil
29import socket
30import string
31import subprocess
32import swiftclient
33import threading
34import time
35import urllib2
36
37import git
38import gear
39import fixtures
40import six.moves.urllib.parse as urlparse
41import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
Joshua Hesketh352264b2015-08-11 23:42:08 +100045import zuul.connection.gerrit
46import zuul.connection.smtp
Clark Boylanb640e052014-04-03 16:41:46 -070047import zuul.scheduler
48import zuul.webapp
49import zuul.rpclistener
50import zuul.launcher.gearman
51import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070052import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070053import zuul.merger.merger
54import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070055import zuul.reporter.gerrit
56import zuul.reporter.smtp
Joshua Hesketh850ccb62014-11-27 11:31:02 +110057import zuul.source.gerrit
Clark Boylanb640e052014-04-03 16:41:46 -070058import zuul.trigger.gerrit
59import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070060import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070061
62FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
63 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070064USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070065
66logging.basicConfig(level=logging.DEBUG,
67 format='%(asctime)s %(name)-32s '
68 '%(levelname)-8s %(message)s')
69
70
71def repack_repo(path):
72 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
73 output = subprocess.Popen(cmd, close_fds=True,
74 stdout=subprocess.PIPE,
75 stderr=subprocess.PIPE)
76 out = output.communicate()
77 if output.returncode:
78 raise Exception("git repack returned %d" % output.returncode)
79 return out
80
81
82def random_sha1():
83 return hashlib.sha1(str(random.random())).hexdigest()
84
85
James E. Blaira190f3b2015-01-05 14:56:54 -080086def iterate_timeout(max_seconds, purpose):
87 start = time.time()
88 count = 0
89 while (time.time() < start + max_seconds):
90 count += 1
91 yield count
92 time.sleep(0)
93 raise Exception("Timeout waiting for %s" % purpose)
94
95
Clark Boylanb640e052014-04-03 16:41:46 -070096class ChangeReference(git.Reference):
97 _common_path_default = "refs/changes"
98 _points_to_commits_only = True
99
100
101class FakeChange(object):
102 categories = {'APRV': ('Approved', -1, 1),
103 'CRVW': ('Code-Review', -2, 2),
104 'VRFY': ('Verified', -2, 2)}
105
106 def __init__(self, gerrit, number, project, branch, subject,
107 status='NEW', upstream_root=None):
108 self.gerrit = gerrit
109 self.reported = 0
110 self.queried = 0
111 self.patchsets = []
112 self.number = number
113 self.project = project
114 self.branch = branch
115 self.subject = subject
116 self.latest_patchset = 0
117 self.depends_on_change = None
118 self.needed_by_changes = []
119 self.fail_merge = False
120 self.messages = []
121 self.data = {
122 'branch': branch,
123 'comments': [],
124 'commitMessage': subject,
125 'createdOn': time.time(),
126 'id': 'I' + random_sha1(),
127 'lastUpdated': time.time(),
128 'number': str(number),
129 'open': status == 'NEW',
130 'owner': {'email': 'user@example.com',
131 'name': 'User Name',
132 'username': 'username'},
133 'patchSets': self.patchsets,
134 'project': project,
135 'status': status,
136 'subject': subject,
137 'submitRecords': [],
138 'url': 'https://hostname/%s' % number}
139
140 self.upstream_root = upstream_root
141 self.addPatchset()
142 self.data['submitRecords'] = self.getSubmitRecords()
143 self.open = status == 'NEW'
144
145 def add_fake_change_to_repo(self, msg, fn, large):
146 path = os.path.join(self.upstream_root, self.project)
147 repo = git.Repo(path)
148 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
149 self.latest_patchset),
150 'refs/tags/init')
151 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700152 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700153 repo.git.clean('-x', '-f', '-d')
154
155 path = os.path.join(self.upstream_root, self.project)
156 if not large:
157 fn = os.path.join(path, fn)
158 f = open(fn, 'w')
159 f.write("test %s %s %s\n" %
160 (self.branch, self.number, self.latest_patchset))
161 f.close()
162 repo.index.add([fn])
163 else:
164 for fni in range(100):
165 fn = os.path.join(path, str(fni))
166 f = open(fn, 'w')
167 for ci in range(4096):
168 f.write(random.choice(string.printable))
169 f.close()
170 repo.index.add([fn])
171
172 r = repo.index.commit(msg)
173 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700174 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700175 repo.git.clean('-x', '-f', '-d')
176 repo.heads['master'].checkout()
177 return r
178
179 def addPatchset(self, files=[], large=False):
180 self.latest_patchset += 1
181 if files:
182 fn = files[0]
183 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700184 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700185 msg = self.subject + '-' + str(self.latest_patchset)
186 c = self.add_fake_change_to_repo(msg, fn, large)
187 ps_files = [{'file': '/COMMIT_MSG',
188 'type': 'ADDED'},
189 {'file': 'README',
190 'type': 'MODIFIED'}]
191 for f in files:
192 ps_files.append({'file': f, 'type': 'ADDED'})
193 d = {'approvals': [],
194 'createdOn': time.time(),
195 'files': ps_files,
196 'number': str(self.latest_patchset),
197 'ref': 'refs/changes/1/%s/%s' % (self.number,
198 self.latest_patchset),
199 'revision': c.hexsha,
200 'uploader': {'email': 'user@example.com',
201 'name': 'User name',
202 'username': 'user'}}
203 self.data['currentPatchSet'] = d
204 self.patchsets.append(d)
205 self.data['submitRecords'] = self.getSubmitRecords()
206
207 def getPatchsetCreatedEvent(self, patchset):
208 event = {"type": "patchset-created",
209 "change": {"project": self.project,
210 "branch": self.branch,
211 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
212 "number": str(self.number),
213 "subject": self.subject,
214 "owner": {"name": "User Name"},
215 "url": "https://hostname/3"},
216 "patchSet": self.patchsets[patchset - 1],
217 "uploader": {"name": "User Name"}}
218 return event
219
220 def getChangeRestoredEvent(self):
221 event = {"type": "change-restored",
222 "change": {"project": self.project,
223 "branch": self.branch,
224 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
225 "number": str(self.number),
226 "subject": self.subject,
227 "owner": {"name": "User Name"},
228 "url": "https://hostname/3"},
229 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100230 "patchSet": self.patchsets[-1],
231 "reason": ""}
232 return event
233
234 def getChangeAbandonedEvent(self):
235 event = {"type": "change-abandoned",
236 "change": {"project": self.project,
237 "branch": self.branch,
238 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
239 "number": str(self.number),
240 "subject": self.subject,
241 "owner": {"name": "User Name"},
242 "url": "https://hostname/3"},
243 "abandoner": {"name": "User Name"},
244 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700245 "reason": ""}
246 return event
247
248 def getChangeCommentEvent(self, patchset):
249 event = {"type": "comment-added",
250 "change": {"project": self.project,
251 "branch": self.branch,
252 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
253 "number": str(self.number),
254 "subject": self.subject,
255 "owner": {"name": "User Name"},
256 "url": "https://hostname/3"},
257 "patchSet": self.patchsets[patchset - 1],
258 "author": {"name": "User Name"},
259 "approvals": [{"type": "Code-Review",
260 "description": "Code-Review",
261 "value": "0"}],
262 "comment": "This is a comment"}
263 return event
264
Joshua Hesketh642824b2014-07-01 17:54:59 +1000265 def addApproval(self, category, value, username='reviewer_john',
266 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700267 if not granted_on:
268 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000269 approval = {
270 'description': self.categories[category][0],
271 'type': category,
272 'value': str(value),
273 'by': {
274 'username': username,
275 'email': username + '@example.com',
276 },
277 'grantedOn': int(granted_on)
278 }
Clark Boylanb640e052014-04-03 16:41:46 -0700279 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
280 if x['by']['username'] == username and x['type'] == category:
281 del self.patchsets[-1]['approvals'][i]
282 self.patchsets[-1]['approvals'].append(approval)
283 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000284 'author': {'email': 'author@example.com',
285 'name': 'Patchset Author',
286 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700287 'change': {'branch': self.branch,
288 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
289 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000290 'owner': {'email': 'owner@example.com',
291 'name': 'Change Owner',
292 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700293 'project': self.project,
294 'subject': self.subject,
295 'topic': 'master',
296 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000297 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700298 'patchSet': self.patchsets[-1],
299 'type': 'comment-added'}
300 self.data['submitRecords'] = self.getSubmitRecords()
301 return json.loads(json.dumps(event))
302
303 def getSubmitRecords(self):
304 status = {}
305 for cat in self.categories.keys():
306 status[cat] = 0
307
308 for a in self.patchsets[-1]['approvals']:
309 cur = status[a['type']]
310 cat_min, cat_max = self.categories[a['type']][1:]
311 new = int(a['value'])
312 if new == cat_min:
313 cur = new
314 elif abs(new) > abs(cur):
315 cur = new
316 status[a['type']] = cur
317
318 labels = []
319 ok = True
320 for typ, cat in self.categories.items():
321 cur = status[typ]
322 cat_min, cat_max = cat[1:]
323 if cur == cat_min:
324 value = 'REJECT'
325 ok = False
326 elif cur == cat_max:
327 value = 'OK'
328 else:
329 value = 'NEED'
330 ok = False
331 labels.append({'label': cat[0], 'status': value})
332 if ok:
333 return [{'status': 'OK'}]
334 return [{'status': 'NOT_READY',
335 'labels': labels}]
336
337 def setDependsOn(self, other, patchset):
338 self.depends_on_change = other
339 d = {'id': other.data['id'],
340 'number': other.data['number'],
341 'ref': other.patchsets[patchset - 1]['ref']
342 }
343 self.data['dependsOn'] = [d]
344
345 other.needed_by_changes.append(self)
346 needed = other.data.get('neededBy', [])
347 d = {'id': self.data['id'],
348 'number': self.data['number'],
349 'ref': self.patchsets[patchset - 1]['ref'],
350 'revision': self.patchsets[patchset - 1]['revision']
351 }
352 needed.append(d)
353 other.data['neededBy'] = needed
354
355 def query(self):
356 self.queried += 1
357 d = self.data.get('dependsOn')
358 if d:
359 d = d[0]
360 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
361 d['isCurrentPatchSet'] = True
362 else:
363 d['isCurrentPatchSet'] = False
364 return json.loads(json.dumps(self.data))
365
366 def setMerged(self):
367 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000368 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700369 return
370 if self.fail_merge:
371 return
372 self.data['status'] = 'MERGED'
373 self.open = False
374
375 path = os.path.join(self.upstream_root, self.project)
376 repo = git.Repo(path)
377 repo.heads[self.branch].commit = \
378 repo.commit(self.patchsets[-1]['revision'])
379
380 def setReported(self):
381 self.reported += 1
382
383
Joshua Hesketh352264b2015-08-11 23:42:08 +1000384class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
385 log = logging.getLogger("zuul.test.FakeGerritConnection")
James E. Blair96698e22015-04-02 07:48:21 -0700386
Joshua Hesketh352264b2015-08-11 23:42:08 +1000387 def __init__(self, connection_name, connection_config,
388 changes_db=None, queues_db=None):
389 super(FakeGerritConnection, self).__init__(connection_name,
390 connection_config)
391
392 self.event_queue = queues_db
Clark Boylanb640e052014-04-03 16:41:46 -0700393 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
394 self.change_number = 0
Joshua Hesketh352264b2015-08-11 23:42:08 +1000395 self.changes = changes_db
James E. Blairf8ff9932014-08-15 15:24:24 -0700396 self.queries = []
Joshua Hesketh352264b2015-08-11 23:42:08 +1000397 self.upstream_root = None
Clark Boylanb640e052014-04-03 16:41:46 -0700398
399 def addFakeChange(self, project, branch, subject, status='NEW'):
400 self.change_number += 1
401 c = FakeChange(self, self.change_number, project, branch, subject,
402 upstream_root=self.upstream_root,
403 status=status)
404 self.changes[self.change_number] = c
405 return c
406
Clark Boylanb640e052014-04-03 16:41:46 -0700407 def review(self, project, changeid, message, action):
408 number, ps = changeid.split(',')
409 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000410
411 # Add the approval back onto the change (ie simulate what gerrit would
412 # do).
413 # Usually when zuul leaves a review it'll create a feedback loop where
414 # zuul's review enters another gerrit event (which is then picked up by
415 # zuul). However, we can't mimic this behaviour (by adding this
416 # approval event into the queue) as it stops jobs from checking what
417 # happens before this event is triggered. If a job needs to see what
418 # happens they can add their own verified event into the queue.
419 # Nevertheless, we can update change with the new review in gerrit.
420
421 for cat in ['CRVW', 'VRFY', 'APRV']:
422 if cat in action:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000423 change.addApproval(cat, action[cat], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000424
425 if 'label' in action:
426 parts = action['label'].split('=')
Joshua Hesketh352264b2015-08-11 23:42:08 +1000427 change.addApproval(parts[0], parts[2], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000428
Clark Boylanb640e052014-04-03 16:41:46 -0700429 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000430
Clark Boylanb640e052014-04-03 16:41:46 -0700431 if 'submit' in action:
432 change.setMerged()
433 if message:
434 change.setReported()
435
436 def query(self, number):
437 change = self.changes.get(int(number))
438 if change:
439 return change.query()
440 return {}
441
James E. Blairc494d542014-08-06 09:23:52 -0700442 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700443 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700444 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800445 if query.startswith('change:'):
446 # Query a specific changeid
447 changeid = query[len('change:'):]
448 l = [change.query() for change in self.changes.values()
449 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700450 elif query.startswith('message:'):
451 # Query the content of a commit message
452 msg = query[len('message:'):].strip()
453 l = [change.query() for change in self.changes.values()
454 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800455 else:
456 # Query all open changes
457 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700458 return l
James E. Blairc494d542014-08-06 09:23:52 -0700459
Joshua Hesketh352264b2015-08-11 23:42:08 +1000460 def _start_watcher_thread(self, *args, **kw):
Clark Boylanb640e052014-04-03 16:41:46 -0700461 pass
462
Joshua Hesketh352264b2015-08-11 23:42:08 +1000463 def getGitUrl(self, project):
464 return os.path.join(self.upstream_root, project.name)
465
Clark Boylanb640e052014-04-03 16:41:46 -0700466
467class BuildHistory(object):
468 def __init__(self, **kw):
469 self.__dict__.update(kw)
470
471 def __repr__(self):
472 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
473 (self.result, self.name, self.number, self.changes))
474
475
476class FakeURLOpener(object):
477 def __init__(self, upstream_root, fake_gerrit, url):
478 self.upstream_root = upstream_root
479 self.fake_gerrit = fake_gerrit
480 self.url = url
481
482 def read(self):
483 res = urlparse.urlparse(self.url)
484 path = res.path
485 project = '/'.join(path.split('/')[2:-2])
486 ret = '001e# service=git-upload-pack\n'
487 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
488 'multi_ack thin-pack side-band side-band-64k ofs-delta '
489 'shallow no-progress include-tag multi_ack_detailed no-done\n')
490 path = os.path.join(self.upstream_root, project)
491 repo = git.Repo(path)
492 for ref in repo.refs:
493 r = ref.object.hexsha + ' ' + ref.path + '\n'
494 ret += '%04x%s' % (len(r) + 4, r)
495 ret += '0000'
496 return ret
497
498
Clark Boylanb640e052014-04-03 16:41:46 -0700499class FakeStatsd(threading.Thread):
500 def __init__(self):
501 threading.Thread.__init__(self)
502 self.daemon = True
503 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
504 self.sock.bind(('', 0))
505 self.port = self.sock.getsockname()[1]
506 self.wake_read, self.wake_write = os.pipe()
507 self.stats = []
508
509 def run(self):
510 while True:
511 poll = select.poll()
512 poll.register(self.sock, select.POLLIN)
513 poll.register(self.wake_read, select.POLLIN)
514 ret = poll.poll()
515 for (fd, event) in ret:
516 if fd == self.sock.fileno():
517 data = self.sock.recvfrom(1024)
518 if not data:
519 return
520 self.stats.append(data[0])
521 if fd == self.wake_read:
522 return
523
524 def stop(self):
525 os.write(self.wake_write, '1\n')
526
527
528class FakeBuild(threading.Thread):
529 log = logging.getLogger("zuul.test")
530
531 def __init__(self, worker, job, number, node):
532 threading.Thread.__init__(self)
533 self.daemon = True
534 self.worker = worker
535 self.job = job
536 self.name = job.name.split(':')[1]
537 self.number = number
538 self.node = node
539 self.parameters = json.loads(job.arguments)
540 self.unique = self.parameters['ZUUL_UUID']
541 self.wait_condition = threading.Condition()
542 self.waiting = False
543 self.aborted = False
544 self.created = time.time()
545 self.description = ''
546 self.run_error = False
547
548 def release(self):
549 self.wait_condition.acquire()
550 self.wait_condition.notify()
551 self.waiting = False
552 self.log.debug("Build %s released" % self.unique)
553 self.wait_condition.release()
554
555 def isWaiting(self):
556 self.wait_condition.acquire()
557 if self.waiting:
558 ret = True
559 else:
560 ret = False
561 self.wait_condition.release()
562 return ret
563
564 def _wait(self):
565 self.wait_condition.acquire()
566 self.waiting = True
567 self.log.debug("Build %s waiting" % self.unique)
568 self.wait_condition.wait()
569 self.wait_condition.release()
570
571 def run(self):
572 data = {
573 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
574 'name': self.name,
575 'number': self.number,
576 'manager': self.worker.worker_id,
577 'worker_name': 'My Worker',
578 'worker_hostname': 'localhost',
579 'worker_ips': ['127.0.0.1', '192.168.1.1'],
580 'worker_fqdn': 'zuul.example.org',
581 'worker_program': 'FakeBuilder',
582 'worker_version': 'v1.1',
583 'worker_extra': {'something': 'else'}
584 }
585
586 self.log.debug('Running build %s' % self.unique)
587
588 self.job.sendWorkData(json.dumps(data))
589 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
590 self.job.sendWorkStatus(0, 100)
591
592 if self.worker.hold_jobs_in_build:
593 self.log.debug('Holding build %s' % self.unique)
594 self._wait()
595 self.log.debug("Build %s continuing" % self.unique)
596
597 self.worker.lock.acquire()
598
599 result = 'SUCCESS'
600 if (('ZUUL_REF' in self.parameters) and
601 self.worker.shouldFailTest(self.name,
602 self.parameters['ZUUL_REF'])):
603 result = 'FAILURE'
604 if self.aborted:
605 result = 'ABORTED'
606
607 if self.run_error:
608 work_fail = True
609 result = 'RUN_ERROR'
610 else:
611 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500612 data['node_labels'] = ['bare-necessities']
613 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700614 work_fail = False
615
616 changes = None
617 if 'ZUUL_CHANGE_IDS' in self.parameters:
618 changes = self.parameters['ZUUL_CHANGE_IDS']
619
620 self.worker.build_history.append(
621 BuildHistory(name=self.name, number=self.number,
622 result=result, changes=changes, node=self.node,
623 uuid=self.unique, description=self.description,
624 pipeline=self.parameters['ZUUL_PIPELINE'])
625 )
626
627 self.job.sendWorkData(json.dumps(data))
628 if work_fail:
629 self.job.sendWorkFail()
630 else:
631 self.job.sendWorkComplete(json.dumps(data))
632 del self.worker.gearman_jobs[self.job.unique]
633 self.worker.running_builds.remove(self)
634 self.worker.lock.release()
635
636
637class FakeWorker(gear.Worker):
638 def __init__(self, worker_id, test):
639 super(FakeWorker, self).__init__(worker_id)
640 self.gearman_jobs = {}
641 self.build_history = []
642 self.running_builds = []
643 self.build_counter = 0
644 self.fail_tests = {}
645 self.test = test
646
647 self.hold_jobs_in_build = False
648 self.lock = threading.Lock()
649 self.__work_thread = threading.Thread(target=self.work)
650 self.__work_thread.daemon = True
651 self.__work_thread.start()
652
653 def handleJob(self, job):
654 parts = job.name.split(":")
655 cmd = parts[0]
656 name = parts[1]
657 if len(parts) > 2:
658 node = parts[2]
659 else:
660 node = None
661 if cmd == 'build':
662 self.handleBuild(job, name, node)
663 elif cmd == 'stop':
664 self.handleStop(job, name)
665 elif cmd == 'set_description':
666 self.handleSetDescription(job, name)
667
668 def handleBuild(self, job, name, node):
669 build = FakeBuild(self, job, self.build_counter, node)
670 job.build = build
671 self.gearman_jobs[job.unique] = job
672 self.build_counter += 1
673
674 self.running_builds.append(build)
675 build.start()
676
677 def handleStop(self, job, name):
678 self.log.debug("handle stop")
679 parameters = json.loads(job.arguments)
680 name = parameters['name']
681 number = parameters['number']
682 for build in self.running_builds:
683 if build.name == name and build.number == number:
684 build.aborted = True
685 build.release()
686 job.sendWorkComplete()
687 return
688 job.sendWorkFail()
689
690 def handleSetDescription(self, job, name):
691 self.log.debug("handle set description")
692 parameters = json.loads(job.arguments)
693 name = parameters['name']
694 number = parameters['number']
695 descr = parameters['html_description']
696 for build in self.running_builds:
697 if build.name == name and build.number == number:
698 build.description = descr
699 job.sendWorkComplete()
700 return
701 for build in self.build_history:
702 if build.name == name and build.number == number:
703 build.description = descr
704 job.sendWorkComplete()
705 return
706 job.sendWorkFail()
707
708 def work(self):
709 while self.running:
710 try:
711 job = self.getJob()
712 except gear.InterruptedError:
713 continue
714 try:
715 self.handleJob(job)
716 except:
717 self.log.exception("Worker exception:")
718
719 def addFailTest(self, name, change):
720 l = self.fail_tests.get(name, [])
721 l.append(change)
722 self.fail_tests[name] = l
723
724 def shouldFailTest(self, name, ref):
725 l = self.fail_tests.get(name, [])
726 for change in l:
727 if self.test.ref_has_change(ref, change):
728 return True
729 return False
730
731 def release(self, regex=None):
732 builds = self.running_builds[:]
733 self.log.debug("releasing build %s (%s)" % (regex,
734 len(self.running_builds)))
735 for build in builds:
736 if not regex or re.match(regex, build.name):
737 self.log.debug("releasing build %s" %
738 (build.parameters['ZUUL_UUID']))
739 build.release()
740 else:
741 self.log.debug("not releasing build %s" %
742 (build.parameters['ZUUL_UUID']))
743 self.log.debug("done releasing builds %s (%s)" %
744 (regex, len(self.running_builds)))
745
746
747class FakeGearmanServer(gear.Server):
748 def __init__(self):
749 self.hold_jobs_in_queue = False
750 super(FakeGearmanServer, self).__init__(0)
751
752 def getJobForConnection(self, connection, peek=False):
753 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
754 for job in queue:
755 if not hasattr(job, 'waiting'):
756 if job.name.startswith('build:'):
757 job.waiting = self.hold_jobs_in_queue
758 else:
759 job.waiting = False
760 if job.waiting:
761 continue
762 if job.name in connection.functions:
763 if not peek:
764 queue.remove(job)
765 connection.related_jobs[job.handle] = job
766 job.worker_connection = connection
767 job.running = True
768 return job
769 return None
770
771 def release(self, regex=None):
772 released = False
773 qlen = (len(self.high_queue) + len(self.normal_queue) +
774 len(self.low_queue))
775 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
776 for job in self.getQueue():
777 cmd, name = job.name.split(':')
778 if cmd != 'build':
779 continue
780 if not regex or re.match(regex, name):
781 self.log.debug("releasing queued job %s" %
782 job.unique)
783 job.waiting = False
784 released = True
785 else:
786 self.log.debug("not releasing queued job %s" %
787 job.unique)
788 if released:
789 self.wakeConnections()
790 qlen = (len(self.high_queue) + len(self.normal_queue) +
791 len(self.low_queue))
792 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
793
794
795class FakeSMTP(object):
796 log = logging.getLogger('zuul.FakeSMTP')
797
798 def __init__(self, messages, server, port):
799 self.server = server
800 self.port = port
801 self.messages = messages
802
803 def sendmail(self, from_email, to_email, msg):
804 self.log.info("Sending email from %s, to %s, with msg %s" % (
805 from_email, to_email, msg))
806
807 headers = msg.split('\n\n', 1)[0]
808 body = msg.split('\n\n', 1)[1]
809
810 self.messages.append(dict(
811 from_email=from_email,
812 to_email=to_email,
813 msg=msg,
814 headers=headers,
815 body=body,
816 ))
817
818 return True
819
820 def quit(self):
821 return True
822
823
824class FakeSwiftClientConnection(swiftclient.client.Connection):
825 def post_account(self, headers):
826 # Do nothing
827 pass
828
829 def get_auth(self):
830 # Returns endpoint and (unused) auth token
831 endpoint = os.path.join('https://storage.example.org', 'V1',
832 'AUTH_account')
833 return endpoint, ''
834
835
Maru Newby3fe5f852015-01-13 04:22:14 +0000836class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700837 log = logging.getLogger("zuul.test")
838
839 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000840 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700841 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
842 try:
843 test_timeout = int(test_timeout)
844 except ValueError:
845 # If timeout value is invalid do not set a timeout.
846 test_timeout = 0
847 if test_timeout > 0:
848 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
849
850 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
851 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
852 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
853 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
854 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
855 os.environ.get('OS_STDERR_CAPTURE') == '1'):
856 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
857 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
858 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
859 os.environ.get('OS_LOG_CAPTURE') == '1'):
860 self.useFixture(fixtures.FakeLogger(
861 level=logging.DEBUG,
862 format='%(asctime)s %(name)-32s '
863 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000864
865
866class ZuulTestCase(BaseTestCase):
867
868 def setUp(self):
869 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700870 if USE_TEMPDIR:
871 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000872 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
873 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700874 else:
875 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700876 self.test_root = os.path.join(tmp_root, "zuul-test")
877 self.upstream_root = os.path.join(self.test_root, "upstream")
878 self.git_root = os.path.join(self.test_root, "git")
879
880 if os.path.exists(self.test_root):
881 shutil.rmtree(self.test_root)
882 os.makedirs(self.test_root)
883 os.makedirs(self.upstream_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700884
885 # Make per test copy of Configuration.
886 self.setup_config()
887 self.config.set('zuul', 'layout_config',
888 os.path.join(FIXTURE_DIR, "layout.yaml"))
889 self.config.set('merger', 'git_dir', self.git_root)
890
891 # For each project in config:
892 self.init_repo("org/project")
893 self.init_repo("org/project1")
894 self.init_repo("org/project2")
895 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700896 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700897 self.init_repo("org/project5")
898 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700899 self.init_repo("org/one-job-project")
900 self.init_repo("org/nonvoting-project")
901 self.init_repo("org/templated-project")
902 self.init_repo("org/layered-project")
903 self.init_repo("org/node-project")
904 self.init_repo("org/conflict-project")
905 self.init_repo("org/noop-project")
906 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000907 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700908
909 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000910 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
911 # see: https://github.com/jsocol/pystatsd/issues/61
912 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700913 os.environ['STATSD_PORT'] = str(self.statsd.port)
914 self.statsd.start()
915 # the statsd client object is configured in the statsd module import
916 reload(statsd)
917 reload(zuul.scheduler)
918
919 self.gearman_server = FakeGearmanServer()
920
921 self.config.set('gearman', 'port', str(self.gearman_server.port))
922
923 self.worker = FakeWorker('fake_worker', self)
924 self.worker.addServer('127.0.0.1', self.gearman_server.port)
925 self.gearman_server.worker = self.worker
926
Joshua Hesketh352264b2015-08-11 23:42:08 +1000927 zuul.source.gerrit.GerritSource.replication_timeout = 1.5
928 zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
929 zuul.connection.gerrit.GerritEventConnector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700930
Joshua Hesketh352264b2015-08-11 23:42:08 +1000931 self.sched = zuul.scheduler.Scheduler(self.config)
Clark Boylanb640e052014-04-03 16:41:46 -0700932
933 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
934 FakeSwiftClientConnection))
935 self.swift = zuul.lib.swift.Swift(self.config)
936
Joshua Hesketh352264b2015-08-11 23:42:08 +1000937 # Set up connections and give out the default gerrit for testing
938 self.configure_connections()
939 self.sched.registerConnections(self.connections)
940 self.fake_gerrit = self.connections['gerrit']
941 self.fake_gerrit.upstream_root = self.upstream_root
942
Clark Boylanb640e052014-04-03 16:41:46 -0700943 def URLOpenerFactory(*args, **kw):
944 if isinstance(args[0], urllib2.Request):
945 return old_urlopen(*args, **kw)
946 args = [self.fake_gerrit] + list(args)
947 return FakeURLOpener(self.upstream_root, *args, **kw)
948
949 old_urlopen = urllib2.urlopen
950 urllib2.urlopen = URLOpenerFactory
951
Joshua Hesketh352264b2015-08-11 23:42:08 +1000952 self.merge_server = zuul.merger.server.MergeServer(self.config,
953 self.connections)
954 self.merge_server.start()
Clark Boylanb640e052014-04-03 16:41:46 -0700955
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100956 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
957 self.swift)
958 self.merge_client = zuul.merger.client.MergeClient(
959 self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700960
961 self.sched.setLauncher(self.launcher)
962 self.sched.setMerger(self.merge_client)
Clark Boylanb640e052014-04-03 16:41:46 -0700963
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100964 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
965 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700966
967 self.sched.start()
968 self.sched.reconfigure(self.config)
969 self.sched.resume()
970 self.webapp.start()
971 self.rpc.start()
972 self.launcher.gearman.waitForServer()
973 self.registerJobs()
974 self.builds = self.worker.running_builds
975 self.history = self.worker.build_history
976
977 self.addCleanup(self.assertFinalState)
978 self.addCleanup(self.shutdown)
979
Joshua Hesketh352264b2015-08-11 23:42:08 +1000980 def configure_connections(self):
981 # Register connections from the config
982 self.smtp_messages = []
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100983
Joshua Hesketh352264b2015-08-11 23:42:08 +1000984 def FakeSMTPFactory(*args, **kw):
985 args = [self.smtp_messages] + list(args)
986 return FakeSMTP(*args, **kw)
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100987
Joshua Hesketh352264b2015-08-11 23:42:08 +1000988 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100989
Joshua Hesketh352264b2015-08-11 23:42:08 +1000990 # Set a changes database so multiple FakeGerrit's can report back to
991 # a virtual canonical database given by the configured hostname
992 self.gerrit_changes_dbs = {}
993 self.gerrit_queues_dbs = {}
994 self.connections = {}
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100995
Joshua Hesketh352264b2015-08-11 23:42:08 +1000996 for section_name in self.config.sections():
997 con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
998 section_name, re.I)
999 if not con_match:
1000 continue
1001 con_name = con_match.group(2)
1002 con_config = dict(self.config.items(section_name))
1003
1004 if 'driver' not in con_config:
1005 raise Exception("No driver specified for connection %s."
1006 % con_name)
1007
1008 con_driver = con_config['driver']
1009
1010 # TODO(jhesketh): load the required class automatically
1011 if con_driver == 'gerrit':
1012 self.gerrit_changes_dbs[con_name] = {}
1013 self.gerrit_queues_dbs[con_name] = Queue.Queue()
1014 self.connections[con_name] = FakeGerritConnection(
1015 con_name, con_config,
1016 changes_db=self.gerrit_changes_dbs[con_name],
1017 queues_db=self.gerrit_queues_dbs[con_name]
1018 )
1019 elif con_driver == 'smtp':
1020 self.connections[con_name] = \
1021 zuul.connection.smtp.SMTPConnection(con_name, con_config)
1022 else:
1023 raise Exception("Unknown driver, %s, for connection %s"
1024 % (con_config['driver'], con_name))
1025
1026 # If the [gerrit] or [smtp] sections still exist, load them in as a
1027 # connection named 'gerrit' or 'smtp' respectfully
1028
1029 if 'gerrit' in self.config.sections():
1030 self.gerrit_changes_dbs['gerrit'] = {}
1031 self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
1032 self.connections['gerrit'] = FakeGerritConnection(
1033 '_legacy_gerrit', dict(self.config.items('gerrit')),
1034 changes_db=self.gerrit_changes_dbs['gerrit'],
1035 queues_db=self.gerrit_queues_dbs['gerrit'])
1036
1037 if 'smtp' in self.config.sections():
1038 self.connections['smtp'] = \
1039 zuul.connection.smtp.SMTPConnection(
1040 '_legacy_smtp', dict(self.config.items('smtp')))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001041
Clark Boylanb640e052014-04-03 16:41:46 -07001042 def setup_config(self):
1043 """Per test config object. Override to set different config."""
1044 self.config = ConfigParser.ConfigParser()
1045 self.config.read(os.path.join(FIXTURE_DIR, "zuul.conf"))
1046
1047 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001048 # Make sure that git.Repo objects have been garbage collected.
1049 repos = []
1050 gc.collect()
1051 for obj in gc.get_objects():
1052 if isinstance(obj, git.Repo):
1053 repos.append(obj)
1054 self.assertEqual(len(repos), 0)
1055 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001056 for pipeline in self.sched.layout.pipelines.values():
1057 if isinstance(pipeline.manager,
1058 zuul.scheduler.IndependentPipelineManager):
1059 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001060
1061 def shutdown(self):
1062 self.log.debug("Shutting down after tests")
1063 self.launcher.stop()
1064 self.merge_server.stop()
1065 self.merge_server.join()
1066 self.merge_client.stop()
1067 self.worker.shutdown()
Clark Boylanb640e052014-04-03 16:41:46 -07001068 self.sched.stop()
1069 self.sched.join()
1070 self.statsd.stop()
1071 self.statsd.join()
1072 self.webapp.stop()
1073 self.webapp.join()
1074 self.rpc.stop()
1075 self.rpc.join()
1076 self.gearman_server.shutdown()
1077 threads = threading.enumerate()
1078 if len(threads) > 1:
1079 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001080
1081 def init_repo(self, project):
1082 parts = project.split('/')
1083 path = os.path.join(self.upstream_root, *parts[:-1])
1084 if not os.path.exists(path):
1085 os.makedirs(path)
1086 path = os.path.join(self.upstream_root, project)
1087 repo = git.Repo.init(path)
1088
1089 repo.config_writer().set_value('user', 'email', 'user@example.com')
1090 repo.config_writer().set_value('user', 'name', 'User Name')
1091 repo.config_writer().write()
1092
1093 fn = os.path.join(path, 'README')
1094 f = open(fn, 'w')
1095 f.write("test\n")
1096 f.close()
1097 repo.index.add([fn])
1098 repo.index.commit('initial commit')
1099 master = repo.create_head('master')
1100 repo.create_tag('init')
1101
James E. Blair97d902e2014-08-21 13:25:56 -07001102 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001103 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001104 repo.git.clean('-x', '-f', '-d')
1105
1106 self.create_branch(project, 'mp')
1107
1108 def create_branch(self, project, branch):
1109 path = os.path.join(self.upstream_root, project)
1110 repo = git.Repo.init(path)
1111 fn = os.path.join(path, 'README')
1112
1113 branch_head = repo.create_head(branch)
1114 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001115 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001116 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001117 f.close()
1118 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001119 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001120
James E. Blair97d902e2014-08-21 13:25:56 -07001121 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001122 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001123 repo.git.clean('-x', '-f', '-d')
1124
1125 def ref_has_change(self, ref, change):
1126 path = os.path.join(self.git_root, change.project)
1127 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001128 try:
1129 for commit in repo.iter_commits(ref):
1130 if commit.message.strip() == ('%s-1' % change.subject):
1131 return True
1132 except GitCommandError:
1133 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001134 return False
1135
1136 def job_has_changes(self, *args):
1137 job = args[0]
1138 commits = args[1:]
1139 if isinstance(job, FakeBuild):
1140 parameters = job.parameters
1141 else:
1142 parameters = json.loads(job.arguments)
1143 project = parameters['ZUUL_PROJECT']
1144 path = os.path.join(self.git_root, project)
1145 repo = git.Repo(path)
1146 ref = parameters['ZUUL_REF']
1147 sha = parameters['ZUUL_COMMIT']
1148 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1149 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1150 commit_messages = ['%s-1' % commit.subject for commit in commits]
1151 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1152 " repo_messages %s; sha %s" % (job, commit_messages,
1153 repo_messages, sha))
1154 for msg in commit_messages:
1155 if msg not in repo_messages:
1156 self.log.debug(" messages do not match")
1157 return False
1158 if repo_shas[0] != sha:
1159 self.log.debug(" sha does not match")
1160 return False
1161 self.log.debug(" OK")
1162 return True
1163
1164 def registerJobs(self):
1165 count = 0
1166 for job in self.sched.layout.jobs.keys():
1167 self.worker.registerFunction('build:' + job)
1168 count += 1
1169 self.worker.registerFunction('stop:' + self.worker.worker_id)
1170 count += 1
1171
1172 while len(self.gearman_server.functions) < count:
1173 time.sleep(0)
1174
James E. Blairb8c16472015-05-05 14:55:26 -07001175 def orderedRelease(self):
1176 # Run one build at a time to ensure non-race order:
1177 while len(self.builds):
1178 self.release(self.builds[0])
1179 self.waitUntilSettled()
1180
Clark Boylanb640e052014-04-03 16:41:46 -07001181 def release(self, job):
1182 if isinstance(job, FakeBuild):
1183 job.release()
1184 else:
1185 job.waiting = False
1186 self.log.debug("Queued job %s released" % job.unique)
1187 self.gearman_server.wakeConnections()
1188
1189 def getParameter(self, job, name):
1190 if isinstance(job, FakeBuild):
1191 return job.parameters[name]
1192 else:
1193 parameters = json.loads(job.arguments)
1194 return parameters[name]
1195
1196 def resetGearmanServer(self):
1197 self.worker.setFunctions([])
1198 while True:
1199 done = True
1200 for connection in self.gearman_server.active_connections:
1201 if (connection.functions and
1202 connection.client_id not in ['Zuul RPC Listener',
1203 'Zuul Merger']):
1204 done = False
1205 if done:
1206 break
1207 time.sleep(0)
1208 self.gearman_server.functions = set()
1209 self.rpc.register()
1210 self.merge_server.register()
1211
1212 def haveAllBuildsReported(self):
1213 # See if Zuul is waiting on a meta job to complete
1214 if self.launcher.meta_jobs:
1215 return False
1216 # Find out if every build that the worker has completed has been
1217 # reported back to Zuul. If it hasn't then that means a Gearman
1218 # event is still in transit and the system is not stable.
1219 for build in self.worker.build_history:
1220 zbuild = self.launcher.builds.get(build.uuid)
1221 if not zbuild:
1222 # It has already been reported
1223 continue
1224 # It hasn't been reported yet.
1225 return False
1226 # Make sure that none of the worker connections are in GRAB_WAIT
1227 for connection in self.worker.active_connections:
1228 if connection.state == 'GRAB_WAIT':
1229 return False
1230 return True
1231
1232 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001233 builds = self.launcher.builds.values()
1234 for build in builds:
1235 client_job = None
1236 for conn in self.launcher.gearman.active_connections:
1237 for j in conn.related_jobs.values():
1238 if j.unique == build.uuid:
1239 client_job = j
1240 break
1241 if not client_job:
1242 self.log.debug("%s is not known to the gearman client" %
1243 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001244 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001245 if not client_job.handle:
1246 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001247 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001248 server_job = self.gearman_server.jobs.get(client_job.handle)
1249 if not server_job:
1250 self.log.debug("%s is not known to the gearman server" %
1251 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001252 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001253 if not hasattr(server_job, 'waiting'):
1254 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001255 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001256 if server_job.waiting:
1257 continue
1258 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1259 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001260 if build.number is None:
1261 self.log.debug("%s has not reported start" % worker_job)
1262 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001263 if worker_job.build.isWaiting():
1264 continue
1265 else:
1266 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001267 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001268 else:
1269 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001270 return False
1271 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001272
1273 def waitUntilSettled(self):
1274 self.log.debug("Waiting until settled...")
1275 start = time.time()
1276 while True:
1277 if time.time() - start > 10:
1278 print 'queue status:',
1279 print self.sched.trigger_event_queue.empty(),
1280 print self.sched.result_event_queue.empty(),
1281 print self.fake_gerrit.event_queue.empty(),
1282 print self.areAllBuildsWaiting()
1283 raise Exception("Timeout waiting for Zuul to settle")
1284 # Make sure no new events show up while we're checking
1285 self.worker.lock.acquire()
1286 # have all build states propogated to zuul?
1287 if self.haveAllBuildsReported():
1288 # Join ensures that the queue is empty _and_ events have been
1289 # processed
1290 self.fake_gerrit.event_queue.join()
1291 self.sched.trigger_event_queue.join()
1292 self.sched.result_event_queue.join()
1293 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001294 if (not self.merge_client.build_sets and
1295 self.sched.trigger_event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001296 self.sched.result_event_queue.empty() and
1297 self.fake_gerrit.event_queue.empty() and
Clark Boylanb640e052014-04-03 16:41:46 -07001298 self.haveAllBuildsReported() and
1299 self.areAllBuildsWaiting()):
1300 self.sched.run_handler_lock.release()
1301 self.worker.lock.release()
1302 self.log.debug("...settled.")
1303 return
1304 self.sched.run_handler_lock.release()
1305 self.worker.lock.release()
1306 self.sched.wake_event.wait(0.1)
1307
1308 def countJobResults(self, jobs, result):
1309 jobs = filter(lambda x: x.result == result, jobs)
1310 return len(jobs)
1311
1312 def getJobFromHistory(self, name):
1313 history = self.worker.build_history
1314 for job in history:
1315 if job.name == name:
1316 return job
1317 raise Exception("Unable to find job %s in history" % name)
1318
1319 def assertEmptyQueues(self):
1320 # Make sure there are no orphaned jobs
1321 for pipeline in self.sched.layout.pipelines.values():
1322 for queue in pipeline.queues:
1323 if len(queue.queue) != 0:
1324 print 'pipeline %s queue %s contents %s' % (
1325 pipeline.name, queue.name, queue.queue)
Antoine Mussobd86a312014-01-08 14:51:33 +01001326 self.assertEqual(len(queue.queue), 0,
1327 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001328
1329 def assertReportedStat(self, key, value=None, kind=None):
1330 start = time.time()
1331 while time.time() < (start + 5):
1332 for stat in self.statsd.stats:
1333 pprint.pprint(self.statsd.stats)
1334 k, v = stat.split(':')
1335 if key == k:
1336 if value is None and kind is None:
1337 return
1338 elif value:
1339 if value == v:
1340 return
1341 elif kind:
1342 if v.endswith('|' + kind):
1343 return
1344 time.sleep(0.1)
1345
1346 pprint.pprint(self.statsd.stats)
1347 raise Exception("Key %s not found in reported stats" % key)