blob: 2559eb444e9b291e93b5caa7222d9b6876ee8d15 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Morgan Fainberg293f7f82016-05-30 14:01:22 -070025from six.moves import urllib
Clark Boylanb640e052014-04-03 16:41:46 -070026import random
27import re
28import select
29import shutil
Monty Taylor74fa3862016-06-02 07:39:49 +030030from six.moves import reload_module
Clark Boylanb640e052014-04-03 16:41:46 -070031import socket
32import string
33import subprocess
34import swiftclient
35import threading
36import time
Clark Boylanb640e052014-04-03 16:41:46 -070037
38import git
39import gear
40import fixtures
Clark Boylanb640e052014-04-03 16:41:46 -070041import statsd
42import testtools
Mike Heald8225f522014-11-21 09:52:33 +000043from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070044
Joshua Hesketh352264b2015-08-11 23:42:08 +100045import zuul.connection.gerrit
46import zuul.connection.smtp
Clark Boylanb640e052014-04-03 16:41:46 -070047import zuul.scheduler
48import zuul.webapp
49import zuul.rpclistener
50import zuul.launcher.gearman
51import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070052import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070053import zuul.merger.merger
54import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070055import zuul.reporter.gerrit
56import zuul.reporter.smtp
Joshua Hesketh850ccb62014-11-27 11:31:02 +110057import zuul.source.gerrit
Clark Boylanb640e052014-04-03 16:41:46 -070058import zuul.trigger.gerrit
59import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070060import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070061
62FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
63 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070064USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070065
66logging.basicConfig(level=logging.DEBUG,
67 format='%(asctime)s %(name)-32s '
68 '%(levelname)-8s %(message)s')
69
70
71def repack_repo(path):
72 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
73 output = subprocess.Popen(cmd, close_fds=True,
74 stdout=subprocess.PIPE,
75 stderr=subprocess.PIPE)
76 out = output.communicate()
77 if output.returncode:
78 raise Exception("git repack returned %d" % output.returncode)
79 return out
80
81
82def random_sha1():
83 return hashlib.sha1(str(random.random())).hexdigest()
84
85
James E. Blaira190f3b2015-01-05 14:56:54 -080086def iterate_timeout(max_seconds, purpose):
87 start = time.time()
88 count = 0
89 while (time.time() < start + max_seconds):
90 count += 1
91 yield count
92 time.sleep(0)
93 raise Exception("Timeout waiting for %s" % purpose)
94
95
Clark Boylanb640e052014-04-03 16:41:46 -070096class ChangeReference(git.Reference):
97 _common_path_default = "refs/changes"
98 _points_to_commits_only = True
99
100
101class FakeChange(object):
102 categories = {'APRV': ('Approved', -1, 1),
103 'CRVW': ('Code-Review', -2, 2),
104 'VRFY': ('Verified', -2, 2)}
105
106 def __init__(self, gerrit, number, project, branch, subject,
107 status='NEW', upstream_root=None):
108 self.gerrit = gerrit
109 self.reported = 0
110 self.queried = 0
111 self.patchsets = []
112 self.number = number
113 self.project = project
114 self.branch = branch
115 self.subject = subject
116 self.latest_patchset = 0
117 self.depends_on_change = None
118 self.needed_by_changes = []
119 self.fail_merge = False
120 self.messages = []
121 self.data = {
122 'branch': branch,
123 'comments': [],
124 'commitMessage': subject,
125 'createdOn': time.time(),
126 'id': 'I' + random_sha1(),
127 'lastUpdated': time.time(),
128 'number': str(number),
129 'open': status == 'NEW',
130 'owner': {'email': 'user@example.com',
131 'name': 'User Name',
132 'username': 'username'},
133 'patchSets': self.patchsets,
134 'project': project,
135 'status': status,
136 'subject': subject,
137 'submitRecords': [],
138 'url': 'https://hostname/%s' % number}
139
140 self.upstream_root = upstream_root
141 self.addPatchset()
142 self.data['submitRecords'] = self.getSubmitRecords()
143 self.open = status == 'NEW'
144
145 def add_fake_change_to_repo(self, msg, fn, large):
146 path = os.path.join(self.upstream_root, self.project)
147 repo = git.Repo(path)
148 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
149 self.latest_patchset),
150 'refs/tags/init')
151 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700152 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700153 repo.git.clean('-x', '-f', '-d')
154
155 path = os.path.join(self.upstream_root, self.project)
156 if not large:
157 fn = os.path.join(path, fn)
158 f = open(fn, 'w')
159 f.write("test %s %s %s\n" %
160 (self.branch, self.number, self.latest_patchset))
161 f.close()
162 repo.index.add([fn])
163 else:
164 for fni in range(100):
165 fn = os.path.join(path, str(fni))
166 f = open(fn, 'w')
167 for ci in range(4096):
168 f.write(random.choice(string.printable))
169 f.close()
170 repo.index.add([fn])
171
172 r = repo.index.commit(msg)
173 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700174 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700175 repo.git.clean('-x', '-f', '-d')
176 repo.heads['master'].checkout()
177 return r
178
179 def addPatchset(self, files=[], large=False):
180 self.latest_patchset += 1
181 if files:
182 fn = files[0]
183 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700184 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700185 msg = self.subject + '-' + str(self.latest_patchset)
186 c = self.add_fake_change_to_repo(msg, fn, large)
187 ps_files = [{'file': '/COMMIT_MSG',
188 'type': 'ADDED'},
189 {'file': 'README',
190 'type': 'MODIFIED'}]
191 for f in files:
192 ps_files.append({'file': f, 'type': 'ADDED'})
193 d = {'approvals': [],
194 'createdOn': time.time(),
195 'files': ps_files,
196 'number': str(self.latest_patchset),
197 'ref': 'refs/changes/1/%s/%s' % (self.number,
198 self.latest_patchset),
199 'revision': c.hexsha,
200 'uploader': {'email': 'user@example.com',
201 'name': 'User name',
202 'username': 'user'}}
203 self.data['currentPatchSet'] = d
204 self.patchsets.append(d)
205 self.data['submitRecords'] = self.getSubmitRecords()
206
207 def getPatchsetCreatedEvent(self, patchset):
208 event = {"type": "patchset-created",
209 "change": {"project": self.project,
210 "branch": self.branch,
211 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
212 "number": str(self.number),
213 "subject": self.subject,
214 "owner": {"name": "User Name"},
215 "url": "https://hostname/3"},
216 "patchSet": self.patchsets[patchset - 1],
217 "uploader": {"name": "User Name"}}
218 return event
219
220 def getChangeRestoredEvent(self):
221 event = {"type": "change-restored",
222 "change": {"project": self.project,
223 "branch": self.branch,
224 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
225 "number": str(self.number),
226 "subject": self.subject,
227 "owner": {"name": "User Name"},
228 "url": "https://hostname/3"},
229 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100230 "patchSet": self.patchsets[-1],
231 "reason": ""}
232 return event
233
234 def getChangeAbandonedEvent(self):
235 event = {"type": "change-abandoned",
236 "change": {"project": self.project,
237 "branch": self.branch,
238 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
239 "number": str(self.number),
240 "subject": self.subject,
241 "owner": {"name": "User Name"},
242 "url": "https://hostname/3"},
243 "abandoner": {"name": "User Name"},
244 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700245 "reason": ""}
246 return event
247
248 def getChangeCommentEvent(self, patchset):
249 event = {"type": "comment-added",
250 "change": {"project": self.project,
251 "branch": self.branch,
252 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
253 "number": str(self.number),
254 "subject": self.subject,
255 "owner": {"name": "User Name"},
256 "url": "https://hostname/3"},
257 "patchSet": self.patchsets[patchset - 1],
258 "author": {"name": "User Name"},
259 "approvals": [{"type": "Code-Review",
260 "description": "Code-Review",
261 "value": "0"}],
262 "comment": "This is a comment"}
263 return event
264
James E. Blair8cce42e2016-10-18 08:18:36 -0700265 def getRefUpdatedEvent(self):
266 path = os.path.join(self.upstream_root, self.project)
267 repo = git.Repo(path)
268 oldrev = repo.heads[self.branch].commit.hexsha
269
270 event = {
271 "type": "ref-updated",
272 "submitter": {
273 "name": "User Name",
274 },
275 "refUpdate": {
276 "oldRev": oldrev,
277 "newRev": self.patchsets[-1]['revision'],
278 "refName": self.branch,
279 "project": self.project,
280 }
281 }
282 return event
283
Joshua Hesketh642824b2014-07-01 17:54:59 +1000284 def addApproval(self, category, value, username='reviewer_john',
285 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700286 if not granted_on:
287 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000288 approval = {
289 'description': self.categories[category][0],
290 'type': category,
291 'value': str(value),
292 'by': {
293 'username': username,
294 'email': username + '@example.com',
295 },
296 'grantedOn': int(granted_on)
297 }
Clark Boylanb640e052014-04-03 16:41:46 -0700298 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
299 if x['by']['username'] == username and x['type'] == category:
300 del self.patchsets[-1]['approvals'][i]
301 self.patchsets[-1]['approvals'].append(approval)
302 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000303 'author': {'email': 'author@example.com',
304 'name': 'Patchset Author',
305 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700306 'change': {'branch': self.branch,
307 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
308 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000309 'owner': {'email': 'owner@example.com',
310 'name': 'Change Owner',
311 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700312 'project': self.project,
313 'subject': self.subject,
314 'topic': 'master',
315 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000316 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700317 'patchSet': self.patchsets[-1],
318 'type': 'comment-added'}
319 self.data['submitRecords'] = self.getSubmitRecords()
320 return json.loads(json.dumps(event))
321
322 def getSubmitRecords(self):
323 status = {}
324 for cat in self.categories.keys():
325 status[cat] = 0
326
327 for a in self.patchsets[-1]['approvals']:
328 cur = status[a['type']]
329 cat_min, cat_max = self.categories[a['type']][1:]
330 new = int(a['value'])
331 if new == cat_min:
332 cur = new
333 elif abs(new) > abs(cur):
334 cur = new
335 status[a['type']] = cur
336
337 labels = []
338 ok = True
339 for typ, cat in self.categories.items():
340 cur = status[typ]
341 cat_min, cat_max = cat[1:]
342 if cur == cat_min:
343 value = 'REJECT'
344 ok = False
345 elif cur == cat_max:
346 value = 'OK'
347 else:
348 value = 'NEED'
349 ok = False
350 labels.append({'label': cat[0], 'status': value})
351 if ok:
352 return [{'status': 'OK'}]
353 return [{'status': 'NOT_READY',
354 'labels': labels}]
355
356 def setDependsOn(self, other, patchset):
357 self.depends_on_change = other
358 d = {'id': other.data['id'],
359 'number': other.data['number'],
360 'ref': other.patchsets[patchset - 1]['ref']
361 }
362 self.data['dependsOn'] = [d]
363
364 other.needed_by_changes.append(self)
365 needed = other.data.get('neededBy', [])
366 d = {'id': self.data['id'],
367 'number': self.data['number'],
368 'ref': self.patchsets[patchset - 1]['ref'],
369 'revision': self.patchsets[patchset - 1]['revision']
370 }
371 needed.append(d)
372 other.data['neededBy'] = needed
373
374 def query(self):
375 self.queried += 1
376 d = self.data.get('dependsOn')
377 if d:
378 d = d[0]
379 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
380 d['isCurrentPatchSet'] = True
381 else:
382 d['isCurrentPatchSet'] = False
383 return json.loads(json.dumps(self.data))
384
385 def setMerged(self):
386 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000387 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700388 return
389 if self.fail_merge:
390 return
391 self.data['status'] = 'MERGED'
392 self.open = False
393
394 path = os.path.join(self.upstream_root, self.project)
395 repo = git.Repo(path)
396 repo.heads[self.branch].commit = \
397 repo.commit(self.patchsets[-1]['revision'])
398
399 def setReported(self):
400 self.reported += 1
401
402
Joshua Hesketh352264b2015-08-11 23:42:08 +1000403class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
404 log = logging.getLogger("zuul.test.FakeGerritConnection")
James E. Blair96698e22015-04-02 07:48:21 -0700405
Joshua Hesketh352264b2015-08-11 23:42:08 +1000406 def __init__(self, connection_name, connection_config,
Jan Hruban6b71aff2015-10-22 16:58:08 +0200407 changes_db=None, queues_db=None, upstream_root=None):
Joshua Hesketh352264b2015-08-11 23:42:08 +1000408 super(FakeGerritConnection, self).__init__(connection_name,
409 connection_config)
410
411 self.event_queue = queues_db
Clark Boylanb640e052014-04-03 16:41:46 -0700412 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
413 self.change_number = 0
Joshua Hesketh352264b2015-08-11 23:42:08 +1000414 self.changes = changes_db
James E. Blairf8ff9932014-08-15 15:24:24 -0700415 self.queries = []
Jan Hruban6b71aff2015-10-22 16:58:08 +0200416 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700417
418 def addFakeChange(self, project, branch, subject, status='NEW'):
419 self.change_number += 1
420 c = FakeChange(self, self.change_number, project, branch, subject,
421 upstream_root=self.upstream_root,
422 status=status)
423 self.changes[self.change_number] = c
424 return c
425
Clark Boylanb640e052014-04-03 16:41:46 -0700426 def review(self, project, changeid, message, action):
427 number, ps = changeid.split(',')
428 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000429
430 # Add the approval back onto the change (ie simulate what gerrit would
431 # do).
432 # Usually when zuul leaves a review it'll create a feedback loop where
433 # zuul's review enters another gerrit event (which is then picked up by
434 # zuul). However, we can't mimic this behaviour (by adding this
435 # approval event into the queue) as it stops jobs from checking what
436 # happens before this event is triggered. If a job needs to see what
437 # happens they can add their own verified event into the queue.
438 # Nevertheless, we can update change with the new review in gerrit.
439
440 for cat in ['CRVW', 'VRFY', 'APRV']:
441 if cat in action:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000442 change.addApproval(cat, action[cat], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000443
444 if 'label' in action:
445 parts = action['label'].split('=')
Joshua Hesketh352264b2015-08-11 23:42:08 +1000446 change.addApproval(parts[0], parts[2], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000447
Clark Boylanb640e052014-04-03 16:41:46 -0700448 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000449
Clark Boylanb640e052014-04-03 16:41:46 -0700450 if 'submit' in action:
451 change.setMerged()
452 if message:
453 change.setReported()
454
455 def query(self, number):
456 change = self.changes.get(int(number))
457 if change:
458 return change.query()
459 return {}
460
James E. Blairc494d542014-08-06 09:23:52 -0700461 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700462 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700463 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800464 if query.startswith('change:'):
465 # Query a specific changeid
466 changeid = query[len('change:'):]
467 l = [change.query() for change in self.changes.values()
468 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700469 elif query.startswith('message:'):
470 # Query the content of a commit message
471 msg = query[len('message:'):].strip()
472 l = [change.query() for change in self.changes.values()
473 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800474 else:
475 # Query all open changes
476 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700477 return l
James E. Blairc494d542014-08-06 09:23:52 -0700478
Joshua Hesketh352264b2015-08-11 23:42:08 +1000479 def _start_watcher_thread(self, *args, **kw):
Clark Boylanb640e052014-04-03 16:41:46 -0700480 pass
481
Joshua Hesketh352264b2015-08-11 23:42:08 +1000482 def getGitUrl(self, project):
483 return os.path.join(self.upstream_root, project.name)
484
Clark Boylanb640e052014-04-03 16:41:46 -0700485
486class BuildHistory(object):
487 def __init__(self, **kw):
488 self.__dict__.update(kw)
489
490 def __repr__(self):
491 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
492 (self.result, self.name, self.number, self.changes))
493
494
495class FakeURLOpener(object):
Jan Hruban6b71aff2015-10-22 16:58:08 +0200496 def __init__(self, upstream_root, url):
Clark Boylanb640e052014-04-03 16:41:46 -0700497 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700498 self.url = url
499
500 def read(self):
Morgan Fainberg293f7f82016-05-30 14:01:22 -0700501 res = urllib.parse.urlparse(self.url)
Clark Boylanb640e052014-04-03 16:41:46 -0700502 path = res.path
503 project = '/'.join(path.split('/')[2:-2])
504 ret = '001e# service=git-upload-pack\n'
505 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
506 'multi_ack thin-pack side-band side-band-64k ofs-delta '
507 'shallow no-progress include-tag multi_ack_detailed no-done\n')
508 path = os.path.join(self.upstream_root, project)
509 repo = git.Repo(path)
510 for ref in repo.refs:
511 r = ref.object.hexsha + ' ' + ref.path + '\n'
512 ret += '%04x%s' % (len(r) + 4, r)
513 ret += '0000'
514 return ret
515
516
Clark Boylanb640e052014-04-03 16:41:46 -0700517class FakeStatsd(threading.Thread):
518 def __init__(self):
519 threading.Thread.__init__(self)
520 self.daemon = True
521 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
522 self.sock.bind(('', 0))
523 self.port = self.sock.getsockname()[1]
524 self.wake_read, self.wake_write = os.pipe()
525 self.stats = []
526
527 def run(self):
528 while True:
529 poll = select.poll()
530 poll.register(self.sock, select.POLLIN)
531 poll.register(self.wake_read, select.POLLIN)
532 ret = poll.poll()
533 for (fd, event) in ret:
534 if fd == self.sock.fileno():
535 data = self.sock.recvfrom(1024)
536 if not data:
537 return
538 self.stats.append(data[0])
539 if fd == self.wake_read:
540 return
541
542 def stop(self):
543 os.write(self.wake_write, '1\n')
544
545
546class FakeBuild(threading.Thread):
547 log = logging.getLogger("zuul.test")
548
549 def __init__(self, worker, job, number, node):
550 threading.Thread.__init__(self)
551 self.daemon = True
552 self.worker = worker
553 self.job = job
554 self.name = job.name.split(':')[1]
555 self.number = number
556 self.node = node
557 self.parameters = json.loads(job.arguments)
558 self.unique = self.parameters['ZUUL_UUID']
559 self.wait_condition = threading.Condition()
560 self.waiting = False
561 self.aborted = False
Paul Belanger71d98172016-11-08 10:56:31 -0500562 self.requeue = False
Clark Boylanb640e052014-04-03 16:41:46 -0700563 self.created = time.time()
564 self.description = ''
565 self.run_error = False
566
567 def release(self):
568 self.wait_condition.acquire()
569 self.wait_condition.notify()
570 self.waiting = False
571 self.log.debug("Build %s released" % self.unique)
572 self.wait_condition.release()
573
574 def isWaiting(self):
575 self.wait_condition.acquire()
576 if self.waiting:
577 ret = True
578 else:
579 ret = False
580 self.wait_condition.release()
581 return ret
582
583 def _wait(self):
584 self.wait_condition.acquire()
585 self.waiting = True
586 self.log.debug("Build %s waiting" % self.unique)
587 self.wait_condition.wait()
588 self.wait_condition.release()
589
590 def run(self):
591 data = {
592 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
593 'name': self.name,
594 'number': self.number,
595 'manager': self.worker.worker_id,
596 'worker_name': 'My Worker',
597 'worker_hostname': 'localhost',
598 'worker_ips': ['127.0.0.1', '192.168.1.1'],
599 'worker_fqdn': 'zuul.example.org',
600 'worker_program': 'FakeBuilder',
601 'worker_version': 'v1.1',
602 'worker_extra': {'something': 'else'}
603 }
604
605 self.log.debug('Running build %s' % self.unique)
606
607 self.job.sendWorkData(json.dumps(data))
608 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
609 self.job.sendWorkStatus(0, 100)
610
611 if self.worker.hold_jobs_in_build:
612 self.log.debug('Holding build %s' % self.unique)
613 self._wait()
614 self.log.debug("Build %s continuing" % self.unique)
615
616 self.worker.lock.acquire()
617
618 result = 'SUCCESS'
619 if (('ZUUL_REF' in self.parameters) and
620 self.worker.shouldFailTest(self.name,
621 self.parameters['ZUUL_REF'])):
622 result = 'FAILURE'
623 if self.aborted:
624 result = 'ABORTED'
Paul Belanger71d98172016-11-08 10:56:31 -0500625 if self.requeue:
626 result = None
Clark Boylanb640e052014-04-03 16:41:46 -0700627
628 if self.run_error:
629 work_fail = True
630 result = 'RUN_ERROR'
631 else:
632 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500633 data['node_labels'] = ['bare-necessities']
634 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700635 work_fail = False
636
637 changes = None
638 if 'ZUUL_CHANGE_IDS' in self.parameters:
639 changes = self.parameters['ZUUL_CHANGE_IDS']
640
641 self.worker.build_history.append(
642 BuildHistory(name=self.name, number=self.number,
643 result=result, changes=changes, node=self.node,
644 uuid=self.unique, description=self.description,
James E. Blair456f2fb2016-02-09 09:29:33 -0800645 parameters=self.parameters,
Clark Boylanb640e052014-04-03 16:41:46 -0700646 pipeline=self.parameters['ZUUL_PIPELINE'])
647 )
648
649 self.job.sendWorkData(json.dumps(data))
650 if work_fail:
651 self.job.sendWorkFail()
652 else:
653 self.job.sendWorkComplete(json.dumps(data))
654 del self.worker.gearman_jobs[self.job.unique]
655 self.worker.running_builds.remove(self)
656 self.worker.lock.release()
657
658
659class FakeWorker(gear.Worker):
660 def __init__(self, worker_id, test):
661 super(FakeWorker, self).__init__(worker_id)
662 self.gearman_jobs = {}
663 self.build_history = []
664 self.running_builds = []
665 self.build_counter = 0
666 self.fail_tests = {}
667 self.test = test
668
669 self.hold_jobs_in_build = False
670 self.lock = threading.Lock()
671 self.__work_thread = threading.Thread(target=self.work)
672 self.__work_thread.daemon = True
673 self.__work_thread.start()
674
675 def handleJob(self, job):
676 parts = job.name.split(":")
677 cmd = parts[0]
678 name = parts[1]
679 if len(parts) > 2:
680 node = parts[2]
681 else:
682 node = None
683 if cmd == 'build':
684 self.handleBuild(job, name, node)
685 elif cmd == 'stop':
686 self.handleStop(job, name)
687 elif cmd == 'set_description':
688 self.handleSetDescription(job, name)
689
690 def handleBuild(self, job, name, node):
691 build = FakeBuild(self, job, self.build_counter, node)
692 job.build = build
693 self.gearman_jobs[job.unique] = job
694 self.build_counter += 1
695
696 self.running_builds.append(build)
697 build.start()
698
699 def handleStop(self, job, name):
700 self.log.debug("handle stop")
701 parameters = json.loads(job.arguments)
702 name = parameters['name']
703 number = parameters['number']
704 for build in self.running_builds:
705 if build.name == name and build.number == number:
706 build.aborted = True
707 build.release()
708 job.sendWorkComplete()
709 return
710 job.sendWorkFail()
711
712 def handleSetDescription(self, job, name):
713 self.log.debug("handle set description")
714 parameters = json.loads(job.arguments)
715 name = parameters['name']
716 number = parameters['number']
717 descr = parameters['html_description']
718 for build in self.running_builds:
719 if build.name == name and build.number == number:
720 build.description = descr
721 job.sendWorkComplete()
722 return
723 for build in self.build_history:
724 if build.name == name and build.number == number:
725 build.description = descr
726 job.sendWorkComplete()
727 return
728 job.sendWorkFail()
729
730 def work(self):
731 while self.running:
732 try:
733 job = self.getJob()
734 except gear.InterruptedError:
735 continue
736 try:
737 self.handleJob(job)
738 except:
739 self.log.exception("Worker exception:")
740
741 def addFailTest(self, name, change):
742 l = self.fail_tests.get(name, [])
743 l.append(change)
744 self.fail_tests[name] = l
745
746 def shouldFailTest(self, name, ref):
747 l = self.fail_tests.get(name, [])
748 for change in l:
749 if self.test.ref_has_change(ref, change):
750 return True
751 return False
752
753 def release(self, regex=None):
754 builds = self.running_builds[:]
755 self.log.debug("releasing build %s (%s)" % (regex,
756 len(self.running_builds)))
757 for build in builds:
758 if not regex or re.match(regex, build.name):
759 self.log.debug("releasing build %s" %
760 (build.parameters['ZUUL_UUID']))
761 build.release()
762 else:
763 self.log.debug("not releasing build %s" %
764 (build.parameters['ZUUL_UUID']))
765 self.log.debug("done releasing builds %s (%s)" %
766 (regex, len(self.running_builds)))
767
768
769class FakeGearmanServer(gear.Server):
770 def __init__(self):
771 self.hold_jobs_in_queue = False
772 super(FakeGearmanServer, self).__init__(0)
773
774 def getJobForConnection(self, connection, peek=False):
775 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
776 for job in queue:
777 if not hasattr(job, 'waiting'):
778 if job.name.startswith('build:'):
779 job.waiting = self.hold_jobs_in_queue
780 else:
781 job.waiting = False
782 if job.waiting:
783 continue
784 if job.name in connection.functions:
785 if not peek:
786 queue.remove(job)
787 connection.related_jobs[job.handle] = job
788 job.worker_connection = connection
789 job.running = True
790 return job
791 return None
792
793 def release(self, regex=None):
794 released = False
795 qlen = (len(self.high_queue) + len(self.normal_queue) +
796 len(self.low_queue))
797 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
798 for job in self.getQueue():
799 cmd, name = job.name.split(':')
800 if cmd != 'build':
801 continue
802 if not regex or re.match(regex, name):
803 self.log.debug("releasing queued job %s" %
804 job.unique)
805 job.waiting = False
806 released = True
807 else:
808 self.log.debug("not releasing queued job %s" %
809 job.unique)
810 if released:
811 self.wakeConnections()
812 qlen = (len(self.high_queue) + len(self.normal_queue) +
813 len(self.low_queue))
814 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
815
816
817class FakeSMTP(object):
818 log = logging.getLogger('zuul.FakeSMTP')
819
820 def __init__(self, messages, server, port):
821 self.server = server
822 self.port = port
823 self.messages = messages
824
825 def sendmail(self, from_email, to_email, msg):
826 self.log.info("Sending email from %s, to %s, with msg %s" % (
827 from_email, to_email, msg))
828
829 headers = msg.split('\n\n', 1)[0]
830 body = msg.split('\n\n', 1)[1]
831
832 self.messages.append(dict(
833 from_email=from_email,
834 to_email=to_email,
835 msg=msg,
836 headers=headers,
837 body=body,
838 ))
839
840 return True
841
842 def quit(self):
843 return True
844
845
846class FakeSwiftClientConnection(swiftclient.client.Connection):
847 def post_account(self, headers):
848 # Do nothing
849 pass
850
851 def get_auth(self):
852 # Returns endpoint and (unused) auth token
853 endpoint = os.path.join('https://storage.example.org', 'V1',
854 'AUTH_account')
855 return endpoint, ''
856
857
Maru Newby3fe5f852015-01-13 04:22:14 +0000858class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700859 log = logging.getLogger("zuul.test")
860
861 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000862 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700863 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
864 try:
865 test_timeout = int(test_timeout)
866 except ValueError:
867 # If timeout value is invalid do not set a timeout.
868 test_timeout = 0
869 if test_timeout > 0:
870 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
871
872 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
873 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
874 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
875 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
876 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
877 os.environ.get('OS_STDERR_CAPTURE') == '1'):
878 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
879 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
880 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
881 os.environ.get('OS_LOG_CAPTURE') == '1'):
James E. Blair79e94b62016-10-18 08:20:22 -0700882 log_level = logging.DEBUG
Jan Hrubanb4f9c612016-01-04 18:41:04 +0100883 if os.environ.get('OS_LOG_LEVEL') == 'DEBUG':
884 log_level = logging.DEBUG
James E. Blair79e94b62016-10-18 08:20:22 -0700885 elif os.environ.get('OS_LOG_LEVEL') == 'INFO':
886 log_level = logging.INFO
Jan Hrubanb4f9c612016-01-04 18:41:04 +0100887 elif os.environ.get('OS_LOG_LEVEL') == 'WARNING':
888 log_level = logging.WARNING
889 elif os.environ.get('OS_LOG_LEVEL') == 'ERROR':
890 log_level = logging.ERROR
891 elif os.environ.get('OS_LOG_LEVEL') == 'CRITICAL':
892 log_level = logging.CRITICAL
Clark Boylanb640e052014-04-03 16:41:46 -0700893 self.useFixture(fixtures.FakeLogger(
Jan Hrubanb4f9c612016-01-04 18:41:04 +0100894 level=log_level,
Clark Boylanb640e052014-04-03 16:41:46 -0700895 format='%(asctime)s %(name)-32s '
896 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000897
Morgan Fainbergd34e0b42016-06-09 19:10:38 -0700898 # NOTE(notmorgan): Extract logging overrides for specific libraries
899 # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
900 # each. This is used to limit the output during test runs from
901 # libraries that zuul depends on such as gear.
902 log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
903
904 if log_defaults_from_env:
905 for default in log_defaults_from_env.split(','):
906 try:
907 name, level_str = default.split('=', 1)
908 level = getattr(logging, level_str, logging.DEBUG)
909 self.useFixture(fixtures.FakeLogger(
910 name=name,
911 level=level,
912 format='%(asctime)s %(name)-32s '
913 '%(levelname)-8s %(message)s'))
914 except ValueError:
915 # NOTE(notmorgan): Invalid format of the log default,
916 # skip and don't try and apply a logger for the
917 # specified module
918 pass
919
Maru Newby3fe5f852015-01-13 04:22:14 +0000920
921class ZuulTestCase(BaseTestCase):
922
923 def setUp(self):
924 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700925 if USE_TEMPDIR:
926 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000927 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
928 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700929 else:
930 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700931 self.test_root = os.path.join(tmp_root, "zuul-test")
932 self.upstream_root = os.path.join(self.test_root, "upstream")
933 self.git_root = os.path.join(self.test_root, "git")
James E. Blairce8a2132016-05-19 15:21:52 -0700934 self.state_root = os.path.join(self.test_root, "lib")
Clark Boylanb640e052014-04-03 16:41:46 -0700935
936 if os.path.exists(self.test_root):
937 shutil.rmtree(self.test_root)
938 os.makedirs(self.test_root)
939 os.makedirs(self.upstream_root)
James E. Blairce8a2132016-05-19 15:21:52 -0700940 os.makedirs(self.state_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700941
942 # Make per test copy of Configuration.
943 self.setup_config()
944 self.config.set('zuul', 'layout_config',
Joshua Heskethacccffc2015-03-31 23:38:17 +1100945 os.path.join(FIXTURE_DIR,
946 self.config.get('zuul', 'layout_config')))
Clark Boylanb640e052014-04-03 16:41:46 -0700947 self.config.set('merger', 'git_dir', self.git_root)
James E. Blairce8a2132016-05-19 15:21:52 -0700948 self.config.set('zuul', 'state_dir', self.state_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700949
950 # For each project in config:
951 self.init_repo("org/project")
952 self.init_repo("org/project1")
953 self.init_repo("org/project2")
954 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700955 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700956 self.init_repo("org/project5")
957 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700958 self.init_repo("org/one-job-project")
959 self.init_repo("org/nonvoting-project")
960 self.init_repo("org/templated-project")
961 self.init_repo("org/layered-project")
962 self.init_repo("org/node-project")
963 self.init_repo("org/conflict-project")
964 self.init_repo("org/noop-project")
965 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000966 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700967
968 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000969 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
970 # see: https://github.com/jsocol/pystatsd/issues/61
971 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700972 os.environ['STATSD_PORT'] = str(self.statsd.port)
973 self.statsd.start()
974 # the statsd client object is configured in the statsd module import
Monty Taylor74fa3862016-06-02 07:39:49 +0300975 reload_module(statsd)
976 reload_module(zuul.scheduler)
Clark Boylanb640e052014-04-03 16:41:46 -0700977
978 self.gearman_server = FakeGearmanServer()
979
980 self.config.set('gearman', 'port', str(self.gearman_server.port))
981
982 self.worker = FakeWorker('fake_worker', self)
983 self.worker.addServer('127.0.0.1', self.gearman_server.port)
984 self.gearman_server.worker = self.worker
985
Joshua Hesketh352264b2015-08-11 23:42:08 +1000986 zuul.source.gerrit.GerritSource.replication_timeout = 1.5
987 zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
988 zuul.connection.gerrit.GerritEventConnector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700989
Joshua Hesketh352264b2015-08-11 23:42:08 +1000990 self.sched = zuul.scheduler.Scheduler(self.config)
Clark Boylanb640e052014-04-03 16:41:46 -0700991
992 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
993 FakeSwiftClientConnection))
994 self.swift = zuul.lib.swift.Swift(self.config)
995
Jan Hruban6b71aff2015-10-22 16:58:08 +0200996 self.event_queues = [
997 self.sched.result_event_queue,
998 self.sched.trigger_event_queue
999 ]
1000
Joshua Hesketh352264b2015-08-11 23:42:08 +10001001 self.configure_connections()
1002 self.sched.registerConnections(self.connections)
Joshua Hesketh352264b2015-08-11 23:42:08 +10001003
Clark Boylanb640e052014-04-03 16:41:46 -07001004 def URLOpenerFactory(*args, **kw):
Morgan Fainberg293f7f82016-05-30 14:01:22 -07001005 if isinstance(args[0], urllib.request.Request):
Clark Boylanb640e052014-04-03 16:41:46 -07001006 return old_urlopen(*args, **kw)
Clark Boylanb640e052014-04-03 16:41:46 -07001007 return FakeURLOpener(self.upstream_root, *args, **kw)
1008
Morgan Fainberg293f7f82016-05-30 14:01:22 -07001009 old_urlopen = urllib.request.urlopen
1010 urllib.request.urlopen = URLOpenerFactory
Clark Boylanb640e052014-04-03 16:41:46 -07001011
Joshua Hesketh352264b2015-08-11 23:42:08 +10001012 self.merge_server = zuul.merger.server.MergeServer(self.config,
1013 self.connections)
1014 self.merge_server.start()
Clark Boylanb640e052014-04-03 16:41:46 -07001015
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001016 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
1017 self.swift)
1018 self.merge_client = zuul.merger.client.MergeClient(
1019 self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -07001020
1021 self.sched.setLauncher(self.launcher)
1022 self.sched.setMerger(self.merge_client)
Clark Boylanb640e052014-04-03 16:41:46 -07001023
Paul Belanger88ef0ea2015-12-23 11:57:02 -05001024 self.webapp = zuul.webapp.WebApp(
1025 self.sched, port=0, listen_address='127.0.0.1')
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001026 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -07001027
1028 self.sched.start()
1029 self.sched.reconfigure(self.config)
1030 self.sched.resume()
1031 self.webapp.start()
1032 self.rpc.start()
1033 self.launcher.gearman.waitForServer()
1034 self.registerJobs()
1035 self.builds = self.worker.running_builds
1036 self.history = self.worker.build_history
1037
1038 self.addCleanup(self.assertFinalState)
1039 self.addCleanup(self.shutdown)
1040
Joshua Hesketh352264b2015-08-11 23:42:08 +10001041 def configure_connections(self):
1042 # Register connections from the config
1043 self.smtp_messages = []
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001044
Joshua Hesketh352264b2015-08-11 23:42:08 +10001045 def FakeSMTPFactory(*args, **kw):
1046 args = [self.smtp_messages] + list(args)
1047 return FakeSMTP(*args, **kw)
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001048
Joshua Hesketh352264b2015-08-11 23:42:08 +10001049 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001050
Joshua Hesketh352264b2015-08-11 23:42:08 +10001051 # Set a changes database so multiple FakeGerrit's can report back to
1052 # a virtual canonical database given by the configured hostname
1053 self.gerrit_changes_dbs = {}
1054 self.gerrit_queues_dbs = {}
1055 self.connections = {}
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001056
Joshua Hesketh352264b2015-08-11 23:42:08 +10001057 for section_name in self.config.sections():
1058 con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
1059 section_name, re.I)
1060 if not con_match:
1061 continue
1062 con_name = con_match.group(2)
1063 con_config = dict(self.config.items(section_name))
1064
1065 if 'driver' not in con_config:
1066 raise Exception("No driver specified for connection %s."
1067 % con_name)
1068
1069 con_driver = con_config['driver']
1070
1071 # TODO(jhesketh): load the required class automatically
1072 if con_driver == 'gerrit':
Joshua Heskethacccffc2015-03-31 23:38:17 +11001073 if con_config['server'] not in self.gerrit_changes_dbs.keys():
1074 self.gerrit_changes_dbs[con_config['server']] = {}
1075 if con_config['server'] not in self.gerrit_queues_dbs.keys():
1076 self.gerrit_queues_dbs[con_config['server']] = \
1077 Queue.Queue()
1078 self.event_queues.append(
1079 self.gerrit_queues_dbs[con_config['server']])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001080 self.connections[con_name] = FakeGerritConnection(
1081 con_name, con_config,
Joshua Heskethacccffc2015-03-31 23:38:17 +11001082 changes_db=self.gerrit_changes_dbs[con_config['server']],
1083 queues_db=self.gerrit_queues_dbs[con_config['server']],
Jan Hruban6b71aff2015-10-22 16:58:08 +02001084 upstream_root=self.upstream_root
Joshua Hesketh352264b2015-08-11 23:42:08 +10001085 )
Joshua Heskethacccffc2015-03-31 23:38:17 +11001086 setattr(self, 'fake_' + con_name, self.connections[con_name])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001087 elif con_driver == 'smtp':
1088 self.connections[con_name] = \
1089 zuul.connection.smtp.SMTPConnection(con_name, con_config)
1090 else:
1091 raise Exception("Unknown driver, %s, for connection %s"
1092 % (con_config['driver'], con_name))
1093
1094 # If the [gerrit] or [smtp] sections still exist, load them in as a
1095 # connection named 'gerrit' or 'smtp' respectfully
1096
1097 if 'gerrit' in self.config.sections():
1098 self.gerrit_changes_dbs['gerrit'] = {}
1099 self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
Jan Hruban6b71aff2015-10-22 16:58:08 +02001100 self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001101 self.connections['gerrit'] = FakeGerritConnection(
1102 '_legacy_gerrit', dict(self.config.items('gerrit')),
1103 changes_db=self.gerrit_changes_dbs['gerrit'],
1104 queues_db=self.gerrit_queues_dbs['gerrit'])
1105
1106 if 'smtp' in self.config.sections():
1107 self.connections['smtp'] = \
1108 zuul.connection.smtp.SMTPConnection(
1109 '_legacy_smtp', dict(self.config.items('smtp')))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001110
Joshua Heskethacccffc2015-03-31 23:38:17 +11001111 def setup_config(self, config_file='zuul.conf'):
Clark Boylanb640e052014-04-03 16:41:46 -07001112 """Per test config object. Override to set different config."""
1113 self.config = ConfigParser.ConfigParser()
Joshua Heskethacccffc2015-03-31 23:38:17 +11001114 self.config.read(os.path.join(FIXTURE_DIR, config_file))
Clark Boylanb640e052014-04-03 16:41:46 -07001115
1116 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001117 # Make sure that git.Repo objects have been garbage collected.
1118 repos = []
1119 gc.collect()
1120 for obj in gc.get_objects():
1121 if isinstance(obj, git.Repo):
1122 repos.append(obj)
1123 self.assertEqual(len(repos), 0)
1124 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001125 for pipeline in self.sched.layout.pipelines.values():
1126 if isinstance(pipeline.manager,
1127 zuul.scheduler.IndependentPipelineManager):
1128 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001129
1130 def shutdown(self):
1131 self.log.debug("Shutting down after tests")
1132 self.launcher.stop()
1133 self.merge_server.stop()
1134 self.merge_server.join()
1135 self.merge_client.stop()
1136 self.worker.shutdown()
Clark Boylanb640e052014-04-03 16:41:46 -07001137 self.sched.stop()
1138 self.sched.join()
1139 self.statsd.stop()
1140 self.statsd.join()
1141 self.webapp.stop()
1142 self.webapp.join()
1143 self.rpc.stop()
1144 self.rpc.join()
1145 self.gearman_server.shutdown()
1146 threads = threading.enumerate()
1147 if len(threads) > 1:
1148 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001149
1150 def init_repo(self, project):
1151 parts = project.split('/')
1152 path = os.path.join(self.upstream_root, *parts[:-1])
1153 if not os.path.exists(path):
1154 os.makedirs(path)
1155 path = os.path.join(self.upstream_root, project)
1156 repo = git.Repo.init(path)
1157
1158 repo.config_writer().set_value('user', 'email', 'user@example.com')
1159 repo.config_writer().set_value('user', 'name', 'User Name')
1160 repo.config_writer().write()
1161
1162 fn = os.path.join(path, 'README')
1163 f = open(fn, 'w')
1164 f.write("test\n")
1165 f.close()
1166 repo.index.add([fn])
1167 repo.index.commit('initial commit')
1168 master = repo.create_head('master')
1169 repo.create_tag('init')
1170
James E. Blair97d902e2014-08-21 13:25:56 -07001171 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001172 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001173 repo.git.clean('-x', '-f', '-d')
1174
1175 self.create_branch(project, 'mp')
1176
1177 def create_branch(self, project, branch):
1178 path = os.path.join(self.upstream_root, project)
1179 repo = git.Repo.init(path)
1180 fn = os.path.join(path, 'README')
1181
1182 branch_head = repo.create_head(branch)
1183 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001184 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001185 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001186 f.close()
1187 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001188 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001189
James E. Blair97d902e2014-08-21 13:25:56 -07001190 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001191 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001192 repo.git.clean('-x', '-f', '-d')
1193
Sachi King9f16d522016-03-16 12:20:45 +11001194 def create_commit(self, project):
1195 path = os.path.join(self.upstream_root, project)
1196 repo = git.Repo(path)
1197 repo.head.reference = repo.heads['master']
1198 file_name = os.path.join(path, 'README')
1199 with open(file_name, 'a') as f:
1200 f.write('creating fake commit\n')
1201 repo.index.add([file_name])
1202 commit = repo.index.commit('Creating a fake commit')
1203 return commit.hexsha
1204
Clark Boylanb640e052014-04-03 16:41:46 -07001205 def ref_has_change(self, ref, change):
1206 path = os.path.join(self.git_root, change.project)
1207 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001208 try:
1209 for commit in repo.iter_commits(ref):
1210 if commit.message.strip() == ('%s-1' % change.subject):
1211 return True
1212 except GitCommandError:
1213 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001214 return False
1215
1216 def job_has_changes(self, *args):
1217 job = args[0]
1218 commits = args[1:]
1219 if isinstance(job, FakeBuild):
1220 parameters = job.parameters
1221 else:
1222 parameters = json.loads(job.arguments)
1223 project = parameters['ZUUL_PROJECT']
1224 path = os.path.join(self.git_root, project)
1225 repo = git.Repo(path)
1226 ref = parameters['ZUUL_REF']
1227 sha = parameters['ZUUL_COMMIT']
1228 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1229 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1230 commit_messages = ['%s-1' % commit.subject for commit in commits]
1231 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1232 " repo_messages %s; sha %s" % (job, commit_messages,
1233 repo_messages, sha))
1234 for msg in commit_messages:
1235 if msg not in repo_messages:
1236 self.log.debug(" messages do not match")
1237 return False
1238 if repo_shas[0] != sha:
1239 self.log.debug(" sha does not match")
1240 return False
1241 self.log.debug(" OK")
1242 return True
1243
1244 def registerJobs(self):
1245 count = 0
1246 for job in self.sched.layout.jobs.keys():
1247 self.worker.registerFunction('build:' + job)
1248 count += 1
1249 self.worker.registerFunction('stop:' + self.worker.worker_id)
1250 count += 1
1251
1252 while len(self.gearman_server.functions) < count:
1253 time.sleep(0)
1254
James E. Blairb8c16472015-05-05 14:55:26 -07001255 def orderedRelease(self):
1256 # Run one build at a time to ensure non-race order:
1257 while len(self.builds):
1258 self.release(self.builds[0])
1259 self.waitUntilSettled()
1260
Clark Boylanb640e052014-04-03 16:41:46 -07001261 def release(self, job):
1262 if isinstance(job, FakeBuild):
1263 job.release()
1264 else:
1265 job.waiting = False
1266 self.log.debug("Queued job %s released" % job.unique)
1267 self.gearman_server.wakeConnections()
1268
1269 def getParameter(self, job, name):
1270 if isinstance(job, FakeBuild):
1271 return job.parameters[name]
1272 else:
1273 parameters = json.loads(job.arguments)
1274 return parameters[name]
1275
1276 def resetGearmanServer(self):
1277 self.worker.setFunctions([])
1278 while True:
1279 done = True
1280 for connection in self.gearman_server.active_connections:
1281 if (connection.functions and
1282 connection.client_id not in ['Zuul RPC Listener',
1283 'Zuul Merger']):
1284 done = False
1285 if done:
1286 break
1287 time.sleep(0)
1288 self.gearman_server.functions = set()
1289 self.rpc.register()
1290 self.merge_server.register()
1291
1292 def haveAllBuildsReported(self):
1293 # See if Zuul is waiting on a meta job to complete
1294 if self.launcher.meta_jobs:
1295 return False
1296 # Find out if every build that the worker has completed has been
1297 # reported back to Zuul. If it hasn't then that means a Gearman
1298 # event is still in transit and the system is not stable.
1299 for build in self.worker.build_history:
1300 zbuild = self.launcher.builds.get(build.uuid)
1301 if not zbuild:
1302 # It has already been reported
1303 continue
1304 # It hasn't been reported yet.
1305 return False
1306 # Make sure that none of the worker connections are in GRAB_WAIT
1307 for connection in self.worker.active_connections:
1308 if connection.state == 'GRAB_WAIT':
1309 return False
1310 return True
1311
1312 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001313 builds = self.launcher.builds.values()
1314 for build in builds:
1315 client_job = None
1316 for conn in self.launcher.gearman.active_connections:
1317 for j in conn.related_jobs.values():
1318 if j.unique == build.uuid:
1319 client_job = j
1320 break
1321 if not client_job:
1322 self.log.debug("%s is not known to the gearman client" %
1323 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001324 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001325 if not client_job.handle:
1326 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001327 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001328 server_job = self.gearman_server.jobs.get(client_job.handle)
1329 if not server_job:
1330 self.log.debug("%s is not known to the gearman server" %
1331 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001332 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001333 if not hasattr(server_job, 'waiting'):
1334 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001335 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001336 if server_job.waiting:
1337 continue
1338 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1339 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001340 if build.number is None:
1341 self.log.debug("%s has not reported start" % worker_job)
1342 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001343 if worker_job.build.isWaiting():
1344 continue
1345 else:
1346 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001347 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001348 else:
1349 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001350 return False
1351 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001352
Jan Hruban6b71aff2015-10-22 16:58:08 +02001353 def eventQueuesEmpty(self):
1354 for queue in self.event_queues:
1355 yield queue.empty()
1356
1357 def eventQueuesJoin(self):
1358 for queue in self.event_queues:
1359 queue.join()
1360
Clark Boylanb640e052014-04-03 16:41:46 -07001361 def waitUntilSettled(self):
1362 self.log.debug("Waiting until settled...")
1363 start = time.time()
1364 while True:
1365 if time.time() - start > 10:
James E. Blair622c9682016-06-09 08:14:53 -07001366 self.log.debug("Queue status:")
1367 for queue in self.event_queues:
1368 self.log.debug(" %s: %s" % (queue, queue.empty()))
1369 self.log.debug("All builds waiting: %s" %
1370 (self.areAllBuildsWaiting(),))
Clark Boylanb640e052014-04-03 16:41:46 -07001371 raise Exception("Timeout waiting for Zuul to settle")
1372 # Make sure no new events show up while we're checking
1373 self.worker.lock.acquire()
1374 # have all build states propogated to zuul?
1375 if self.haveAllBuildsReported():
1376 # Join ensures that the queue is empty _and_ events have been
1377 # processed
Jan Hruban6b71aff2015-10-22 16:58:08 +02001378 self.eventQueuesJoin()
Clark Boylanb640e052014-04-03 16:41:46 -07001379 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001380 if (not self.merge_client.build_sets and
Jan Hruban6b71aff2015-10-22 16:58:08 +02001381 all(self.eventQueuesEmpty()) and
Clark Boylanb640e052014-04-03 16:41:46 -07001382 self.haveAllBuildsReported() and
1383 self.areAllBuildsWaiting()):
1384 self.sched.run_handler_lock.release()
1385 self.worker.lock.release()
1386 self.log.debug("...settled.")
1387 return
1388 self.sched.run_handler_lock.release()
1389 self.worker.lock.release()
1390 self.sched.wake_event.wait(0.1)
1391
1392 def countJobResults(self, jobs, result):
1393 jobs = filter(lambda x: x.result == result, jobs)
1394 return len(jobs)
1395
1396 def getJobFromHistory(self, name):
1397 history = self.worker.build_history
1398 for job in history:
1399 if job.name == name:
1400 return job
1401 raise Exception("Unable to find job %s in history" % name)
1402
1403 def assertEmptyQueues(self):
1404 # Make sure there are no orphaned jobs
1405 for pipeline in self.sched.layout.pipelines.values():
1406 for queue in pipeline.queues:
1407 if len(queue.queue) != 0:
Morgan Fainberg4c6a7742016-05-27 08:42:17 -07001408 print('pipeline %s queue %s contents %s' % (
1409 pipeline.name, queue.name, queue.queue))
Antoine Mussobd86a312014-01-08 14:51:33 +01001410 self.assertEqual(len(queue.queue), 0,
1411 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001412
1413 def assertReportedStat(self, key, value=None, kind=None):
1414 start = time.time()
1415 while time.time() < (start + 5):
1416 for stat in self.statsd.stats:
1417 pprint.pprint(self.statsd.stats)
1418 k, v = stat.split(':')
1419 if key == k:
1420 if value is None and kind is None:
1421 return
1422 elif value:
1423 if value == v:
1424 return
1425 elif kind:
1426 if v.endswith('|' + kind):
1427 return
1428 time.sleep(0.1)
1429
1430 pprint.pprint(self.statsd.stats)
1431 raise Exception("Key %s not found in reported stats" % key)