blob: 3c28a726f7ca2dba1cf906e3dfa282bc31bad5a2 [file] [log] [blame]
Clark Boylanb640e052014-04-03 16:41:46 -07001#!/usr/bin/env python
2
3# Copyright 2012 Hewlett-Packard Development Company, L.P.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Christian Berendtffba5df2014-06-07 21:30:22 +020017from six.moves import configparser as ConfigParser
Clark Boylanb640e052014-04-03 16:41:46 -070018import gc
19import hashlib
20import json
21import logging
22import os
23import pprint
Christian Berendt12d4d722014-06-07 21:03:45 +020024from six.moves import queue as Queue
Morgan Fainberg293f7f82016-05-30 14:01:22 -070025from six.moves import urllib
Clark Boylanb640e052014-04-03 16:41:46 -070026import random
27import re
28import select
29import shutil
30import socket
31import string
32import subprocess
33import swiftclient
34import threading
35import time
Clark Boylanb640e052014-04-03 16:41:46 -070036
37import git
38import gear
39import fixtures
Clark Boylanb640e052014-04-03 16:41:46 -070040import statsd
41import testtools
Mike Heald8225f522014-11-21 09:52:33 +000042from git import GitCommandError
Clark Boylanb640e052014-04-03 16:41:46 -070043
Joshua Hesketh352264b2015-08-11 23:42:08 +100044import zuul.connection.gerrit
45import zuul.connection.smtp
Clark Boylanb640e052014-04-03 16:41:46 -070046import zuul.scheduler
47import zuul.webapp
48import zuul.rpclistener
49import zuul.launcher.gearman
50import zuul.lib.swift
Clark Boylanb640e052014-04-03 16:41:46 -070051import zuul.merger.client
James E. Blair879dafb2015-07-17 14:04:49 -070052import zuul.merger.merger
53import zuul.merger.server
Clark Boylanb640e052014-04-03 16:41:46 -070054import zuul.reporter.gerrit
55import zuul.reporter.smtp
Joshua Hesketh850ccb62014-11-27 11:31:02 +110056import zuul.source.gerrit
Clark Boylanb640e052014-04-03 16:41:46 -070057import zuul.trigger.gerrit
58import zuul.trigger.timer
James E. Blairc494d542014-08-06 09:23:52 -070059import zuul.trigger.zuultrigger
Clark Boylanb640e052014-04-03 16:41:46 -070060
61FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
62 'fixtures')
James E. Blair97d902e2014-08-21 13:25:56 -070063USE_TEMPDIR = True
Clark Boylanb640e052014-04-03 16:41:46 -070064
65logging.basicConfig(level=logging.DEBUG,
66 format='%(asctime)s %(name)-32s '
67 '%(levelname)-8s %(message)s')
68
69
70def repack_repo(path):
71 cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
72 output = subprocess.Popen(cmd, close_fds=True,
73 stdout=subprocess.PIPE,
74 stderr=subprocess.PIPE)
75 out = output.communicate()
76 if output.returncode:
77 raise Exception("git repack returned %d" % output.returncode)
78 return out
79
80
81def random_sha1():
82 return hashlib.sha1(str(random.random())).hexdigest()
83
84
James E. Blaira190f3b2015-01-05 14:56:54 -080085def iterate_timeout(max_seconds, purpose):
86 start = time.time()
87 count = 0
88 while (time.time() < start + max_seconds):
89 count += 1
90 yield count
91 time.sleep(0)
92 raise Exception("Timeout waiting for %s" % purpose)
93
94
Clark Boylanb640e052014-04-03 16:41:46 -070095class ChangeReference(git.Reference):
96 _common_path_default = "refs/changes"
97 _points_to_commits_only = True
98
99
100class FakeChange(object):
101 categories = {'APRV': ('Approved', -1, 1),
102 'CRVW': ('Code-Review', -2, 2),
103 'VRFY': ('Verified', -2, 2)}
104
105 def __init__(self, gerrit, number, project, branch, subject,
106 status='NEW', upstream_root=None):
107 self.gerrit = gerrit
108 self.reported = 0
109 self.queried = 0
110 self.patchsets = []
111 self.number = number
112 self.project = project
113 self.branch = branch
114 self.subject = subject
115 self.latest_patchset = 0
116 self.depends_on_change = None
117 self.needed_by_changes = []
118 self.fail_merge = False
119 self.messages = []
120 self.data = {
121 'branch': branch,
122 'comments': [],
123 'commitMessage': subject,
124 'createdOn': time.time(),
125 'id': 'I' + random_sha1(),
126 'lastUpdated': time.time(),
127 'number': str(number),
128 'open': status == 'NEW',
129 'owner': {'email': 'user@example.com',
130 'name': 'User Name',
131 'username': 'username'},
132 'patchSets': self.patchsets,
133 'project': project,
134 'status': status,
135 'subject': subject,
136 'submitRecords': [],
137 'url': 'https://hostname/%s' % number}
138
139 self.upstream_root = upstream_root
140 self.addPatchset()
141 self.data['submitRecords'] = self.getSubmitRecords()
142 self.open = status == 'NEW'
143
144 def add_fake_change_to_repo(self, msg, fn, large):
145 path = os.path.join(self.upstream_root, self.project)
146 repo = git.Repo(path)
147 ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
148 self.latest_patchset),
149 'refs/tags/init')
150 repo.head.reference = ref
James E. Blair879dafb2015-07-17 14:04:49 -0700151 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700152 repo.git.clean('-x', '-f', '-d')
153
154 path = os.path.join(self.upstream_root, self.project)
155 if not large:
156 fn = os.path.join(path, fn)
157 f = open(fn, 'w')
158 f.write("test %s %s %s\n" %
159 (self.branch, self.number, self.latest_patchset))
160 f.close()
161 repo.index.add([fn])
162 else:
163 for fni in range(100):
164 fn = os.path.join(path, str(fni))
165 f = open(fn, 'w')
166 for ci in range(4096):
167 f.write(random.choice(string.printable))
168 f.close()
169 repo.index.add([fn])
170
171 r = repo.index.commit(msg)
172 repo.head.reference = 'master'
James E. Blair879dafb2015-07-17 14:04:49 -0700173 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -0700174 repo.git.clean('-x', '-f', '-d')
175 repo.heads['master'].checkout()
176 return r
177
178 def addPatchset(self, files=[], large=False):
179 self.latest_patchset += 1
180 if files:
181 fn = files[0]
182 else:
James E. Blair97d902e2014-08-21 13:25:56 -0700183 fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
Clark Boylanb640e052014-04-03 16:41:46 -0700184 msg = self.subject + '-' + str(self.latest_patchset)
185 c = self.add_fake_change_to_repo(msg, fn, large)
186 ps_files = [{'file': '/COMMIT_MSG',
187 'type': 'ADDED'},
188 {'file': 'README',
189 'type': 'MODIFIED'}]
190 for f in files:
191 ps_files.append({'file': f, 'type': 'ADDED'})
192 d = {'approvals': [],
193 'createdOn': time.time(),
194 'files': ps_files,
195 'number': str(self.latest_patchset),
196 'ref': 'refs/changes/1/%s/%s' % (self.number,
197 self.latest_patchset),
198 'revision': c.hexsha,
199 'uploader': {'email': 'user@example.com',
200 'name': 'User name',
201 'username': 'user'}}
202 self.data['currentPatchSet'] = d
203 self.patchsets.append(d)
204 self.data['submitRecords'] = self.getSubmitRecords()
205
206 def getPatchsetCreatedEvent(self, patchset):
207 event = {"type": "patchset-created",
208 "change": {"project": self.project,
209 "branch": self.branch,
210 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
211 "number": str(self.number),
212 "subject": self.subject,
213 "owner": {"name": "User Name"},
214 "url": "https://hostname/3"},
215 "patchSet": self.patchsets[patchset - 1],
216 "uploader": {"name": "User Name"}}
217 return event
218
219 def getChangeRestoredEvent(self):
220 event = {"type": "change-restored",
221 "change": {"project": self.project,
222 "branch": self.branch,
223 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
224 "number": str(self.number),
225 "subject": self.subject,
226 "owner": {"name": "User Name"},
227 "url": "https://hostname/3"},
228 "restorer": {"name": "User Name"},
Antoine Mussobd86a312014-01-08 14:51:33 +0100229 "patchSet": self.patchsets[-1],
230 "reason": ""}
231 return event
232
233 def getChangeAbandonedEvent(self):
234 event = {"type": "change-abandoned",
235 "change": {"project": self.project,
236 "branch": self.branch,
237 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
238 "number": str(self.number),
239 "subject": self.subject,
240 "owner": {"name": "User Name"},
241 "url": "https://hostname/3"},
242 "abandoner": {"name": "User Name"},
243 "patchSet": self.patchsets[-1],
Clark Boylanb640e052014-04-03 16:41:46 -0700244 "reason": ""}
245 return event
246
247 def getChangeCommentEvent(self, patchset):
248 event = {"type": "comment-added",
249 "change": {"project": self.project,
250 "branch": self.branch,
251 "id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
252 "number": str(self.number),
253 "subject": self.subject,
254 "owner": {"name": "User Name"},
255 "url": "https://hostname/3"},
256 "patchSet": self.patchsets[patchset - 1],
257 "author": {"name": "User Name"},
258 "approvals": [{"type": "Code-Review",
259 "description": "Code-Review",
260 "value": "0"}],
261 "comment": "This is a comment"}
262 return event
263
Joshua Hesketh642824b2014-07-01 17:54:59 +1000264 def addApproval(self, category, value, username='reviewer_john',
265 granted_on=None, message=''):
Clark Boylanb640e052014-04-03 16:41:46 -0700266 if not granted_on:
267 granted_on = time.time()
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000268 approval = {
269 'description': self.categories[category][0],
270 'type': category,
271 'value': str(value),
272 'by': {
273 'username': username,
274 'email': username + '@example.com',
275 },
276 'grantedOn': int(granted_on)
277 }
Clark Boylanb640e052014-04-03 16:41:46 -0700278 for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
279 if x['by']['username'] == username and x['type'] == category:
280 del self.patchsets[-1]['approvals'][i]
281 self.patchsets[-1]['approvals'].append(approval)
282 event = {'approvals': [approval],
Joshua Hesketh642824b2014-07-01 17:54:59 +1000283 'author': {'email': 'author@example.com',
284 'name': 'Patchset Author',
285 'username': 'author_phil'},
Clark Boylanb640e052014-04-03 16:41:46 -0700286 'change': {'branch': self.branch,
287 'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
288 'number': str(self.number),
Joshua Hesketh642824b2014-07-01 17:54:59 +1000289 'owner': {'email': 'owner@example.com',
290 'name': 'Change Owner',
291 'username': 'owner_jane'},
Clark Boylanb640e052014-04-03 16:41:46 -0700292 'project': self.project,
293 'subject': self.subject,
294 'topic': 'master',
295 'url': 'https://hostname/459'},
Joshua Hesketh642824b2014-07-01 17:54:59 +1000296 'comment': message,
Clark Boylanb640e052014-04-03 16:41:46 -0700297 'patchSet': self.patchsets[-1],
298 'type': 'comment-added'}
299 self.data['submitRecords'] = self.getSubmitRecords()
300 return json.loads(json.dumps(event))
301
302 def getSubmitRecords(self):
303 status = {}
304 for cat in self.categories.keys():
305 status[cat] = 0
306
307 for a in self.patchsets[-1]['approvals']:
308 cur = status[a['type']]
309 cat_min, cat_max = self.categories[a['type']][1:]
310 new = int(a['value'])
311 if new == cat_min:
312 cur = new
313 elif abs(new) > abs(cur):
314 cur = new
315 status[a['type']] = cur
316
317 labels = []
318 ok = True
319 for typ, cat in self.categories.items():
320 cur = status[typ]
321 cat_min, cat_max = cat[1:]
322 if cur == cat_min:
323 value = 'REJECT'
324 ok = False
325 elif cur == cat_max:
326 value = 'OK'
327 else:
328 value = 'NEED'
329 ok = False
330 labels.append({'label': cat[0], 'status': value})
331 if ok:
332 return [{'status': 'OK'}]
333 return [{'status': 'NOT_READY',
334 'labels': labels}]
335
336 def setDependsOn(self, other, patchset):
337 self.depends_on_change = other
338 d = {'id': other.data['id'],
339 'number': other.data['number'],
340 'ref': other.patchsets[patchset - 1]['ref']
341 }
342 self.data['dependsOn'] = [d]
343
344 other.needed_by_changes.append(self)
345 needed = other.data.get('neededBy', [])
346 d = {'id': self.data['id'],
347 'number': self.data['number'],
348 'ref': self.patchsets[patchset - 1]['ref'],
349 'revision': self.patchsets[patchset - 1]['revision']
350 }
351 needed.append(d)
352 other.data['neededBy'] = needed
353
354 def query(self):
355 self.queried += 1
356 d = self.data.get('dependsOn')
357 if d:
358 d = d[0]
359 if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
360 d['isCurrentPatchSet'] = True
361 else:
362 d['isCurrentPatchSet'] = False
363 return json.loads(json.dumps(self.data))
364
365 def setMerged(self):
366 if (self.depends_on_change and
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000367 self.depends_on_change.data['status'] != 'MERGED'):
Clark Boylanb640e052014-04-03 16:41:46 -0700368 return
369 if self.fail_merge:
370 return
371 self.data['status'] = 'MERGED'
372 self.open = False
373
374 path = os.path.join(self.upstream_root, self.project)
375 repo = git.Repo(path)
376 repo.heads[self.branch].commit = \
377 repo.commit(self.patchsets[-1]['revision'])
378
379 def setReported(self):
380 self.reported += 1
381
382
Joshua Hesketh352264b2015-08-11 23:42:08 +1000383class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
384 log = logging.getLogger("zuul.test.FakeGerritConnection")
James E. Blair96698e22015-04-02 07:48:21 -0700385
Joshua Hesketh352264b2015-08-11 23:42:08 +1000386 def __init__(self, connection_name, connection_config,
Jan Hruban6b71aff2015-10-22 16:58:08 +0200387 changes_db=None, queues_db=None, upstream_root=None):
Joshua Hesketh352264b2015-08-11 23:42:08 +1000388 super(FakeGerritConnection, self).__init__(connection_name,
389 connection_config)
390
391 self.event_queue = queues_db
Clark Boylanb640e052014-04-03 16:41:46 -0700392 self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
393 self.change_number = 0
Joshua Hesketh352264b2015-08-11 23:42:08 +1000394 self.changes = changes_db
James E. Blairf8ff9932014-08-15 15:24:24 -0700395 self.queries = []
Jan Hruban6b71aff2015-10-22 16:58:08 +0200396 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700397
398 def addFakeChange(self, project, branch, subject, status='NEW'):
399 self.change_number += 1
400 c = FakeChange(self, self.change_number, project, branch, subject,
401 upstream_root=self.upstream_root,
402 status=status)
403 self.changes[self.change_number] = c
404 return c
405
Clark Boylanb640e052014-04-03 16:41:46 -0700406 def review(self, project, changeid, message, action):
407 number, ps = changeid.split(',')
408 change = self.changes[int(number)]
Joshua Hesketh642824b2014-07-01 17:54:59 +1000409
410 # Add the approval back onto the change (ie simulate what gerrit would
411 # do).
412 # Usually when zuul leaves a review it'll create a feedback loop where
413 # zuul's review enters another gerrit event (which is then picked up by
414 # zuul). However, we can't mimic this behaviour (by adding this
415 # approval event into the queue) as it stops jobs from checking what
416 # happens before this event is triggered. If a job needs to see what
417 # happens they can add their own verified event into the queue.
418 # Nevertheless, we can update change with the new review in gerrit.
419
420 for cat in ['CRVW', 'VRFY', 'APRV']:
421 if cat in action:
Joshua Hesketh352264b2015-08-11 23:42:08 +1000422 change.addApproval(cat, action[cat], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000423
424 if 'label' in action:
425 parts = action['label'].split('=')
Joshua Hesketh352264b2015-08-11 23:42:08 +1000426 change.addApproval(parts[0], parts[2], username=self.user)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000427
Clark Boylanb640e052014-04-03 16:41:46 -0700428 change.messages.append(message)
Joshua Hesketh642824b2014-07-01 17:54:59 +1000429
Clark Boylanb640e052014-04-03 16:41:46 -0700430 if 'submit' in action:
431 change.setMerged()
432 if message:
433 change.setReported()
434
435 def query(self, number):
436 change = self.changes.get(int(number))
437 if change:
438 return change.query()
439 return {}
440
James E. Blairc494d542014-08-06 09:23:52 -0700441 def simpleQuery(self, query):
James E. Blair96698e22015-04-02 07:48:21 -0700442 self.log.debug("simpleQuery: %s" % query)
James E. Blairf8ff9932014-08-15 15:24:24 -0700443 self.queries.append(query)
James E. Blair5ee24252014-12-30 10:12:29 -0800444 if query.startswith('change:'):
445 # Query a specific changeid
446 changeid = query[len('change:'):]
447 l = [change.query() for change in self.changes.values()
448 if change.data['id'] == changeid]
James E. Blair96698e22015-04-02 07:48:21 -0700449 elif query.startswith('message:'):
450 # Query the content of a commit message
451 msg = query[len('message:'):].strip()
452 l = [change.query() for change in self.changes.values()
453 if msg in change.data['commitMessage']]
James E. Blair5ee24252014-12-30 10:12:29 -0800454 else:
455 # Query all open changes
456 l = [change.query() for change in self.changes.values()]
James E. Blairf8ff9932014-08-15 15:24:24 -0700457 return l
James E. Blairc494d542014-08-06 09:23:52 -0700458
Joshua Hesketh352264b2015-08-11 23:42:08 +1000459 def _start_watcher_thread(self, *args, **kw):
Clark Boylanb640e052014-04-03 16:41:46 -0700460 pass
461
Joshua Hesketh352264b2015-08-11 23:42:08 +1000462 def getGitUrl(self, project):
463 return os.path.join(self.upstream_root, project.name)
464
Clark Boylanb640e052014-04-03 16:41:46 -0700465
466class BuildHistory(object):
467 def __init__(self, **kw):
468 self.__dict__.update(kw)
469
470 def __repr__(self):
471 return ("<Completed build, result: %s name: %s #%s changes: %s>" %
472 (self.result, self.name, self.number, self.changes))
473
474
475class FakeURLOpener(object):
Jan Hruban6b71aff2015-10-22 16:58:08 +0200476 def __init__(self, upstream_root, url):
Clark Boylanb640e052014-04-03 16:41:46 -0700477 self.upstream_root = upstream_root
Clark Boylanb640e052014-04-03 16:41:46 -0700478 self.url = url
479
480 def read(self):
Morgan Fainberg293f7f82016-05-30 14:01:22 -0700481 res = urllib.parse.urlparse(self.url)
Clark Boylanb640e052014-04-03 16:41:46 -0700482 path = res.path
483 project = '/'.join(path.split('/')[2:-2])
484 ret = '001e# service=git-upload-pack\n'
485 ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
486 'multi_ack thin-pack side-band side-band-64k ofs-delta '
487 'shallow no-progress include-tag multi_ack_detailed no-done\n')
488 path = os.path.join(self.upstream_root, project)
489 repo = git.Repo(path)
490 for ref in repo.refs:
491 r = ref.object.hexsha + ' ' + ref.path + '\n'
492 ret += '%04x%s' % (len(r) + 4, r)
493 ret += '0000'
494 return ret
495
496
Clark Boylanb640e052014-04-03 16:41:46 -0700497class FakeStatsd(threading.Thread):
498 def __init__(self):
499 threading.Thread.__init__(self)
500 self.daemon = True
501 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
502 self.sock.bind(('', 0))
503 self.port = self.sock.getsockname()[1]
504 self.wake_read, self.wake_write = os.pipe()
505 self.stats = []
506
507 def run(self):
508 while True:
509 poll = select.poll()
510 poll.register(self.sock, select.POLLIN)
511 poll.register(self.wake_read, select.POLLIN)
512 ret = poll.poll()
513 for (fd, event) in ret:
514 if fd == self.sock.fileno():
515 data = self.sock.recvfrom(1024)
516 if not data:
517 return
518 self.stats.append(data[0])
519 if fd == self.wake_read:
520 return
521
522 def stop(self):
523 os.write(self.wake_write, '1\n')
524
525
526class FakeBuild(threading.Thread):
527 log = logging.getLogger("zuul.test")
528
529 def __init__(self, worker, job, number, node):
530 threading.Thread.__init__(self)
531 self.daemon = True
532 self.worker = worker
533 self.job = job
534 self.name = job.name.split(':')[1]
535 self.number = number
536 self.node = node
537 self.parameters = json.loads(job.arguments)
538 self.unique = self.parameters['ZUUL_UUID']
539 self.wait_condition = threading.Condition()
540 self.waiting = False
541 self.aborted = False
542 self.created = time.time()
543 self.description = ''
544 self.run_error = False
545
546 def release(self):
547 self.wait_condition.acquire()
548 self.wait_condition.notify()
549 self.waiting = False
550 self.log.debug("Build %s released" % self.unique)
551 self.wait_condition.release()
552
553 def isWaiting(self):
554 self.wait_condition.acquire()
555 if self.waiting:
556 ret = True
557 else:
558 ret = False
559 self.wait_condition.release()
560 return ret
561
562 def _wait(self):
563 self.wait_condition.acquire()
564 self.waiting = True
565 self.log.debug("Build %s waiting" % self.unique)
566 self.wait_condition.wait()
567 self.wait_condition.release()
568
569 def run(self):
570 data = {
571 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
572 'name': self.name,
573 'number': self.number,
574 'manager': self.worker.worker_id,
575 'worker_name': 'My Worker',
576 'worker_hostname': 'localhost',
577 'worker_ips': ['127.0.0.1', '192.168.1.1'],
578 'worker_fqdn': 'zuul.example.org',
579 'worker_program': 'FakeBuilder',
580 'worker_version': 'v1.1',
581 'worker_extra': {'something': 'else'}
582 }
583
584 self.log.debug('Running build %s' % self.unique)
585
586 self.job.sendWorkData(json.dumps(data))
587 self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
588 self.job.sendWorkStatus(0, 100)
589
590 if self.worker.hold_jobs_in_build:
591 self.log.debug('Holding build %s' % self.unique)
592 self._wait()
593 self.log.debug("Build %s continuing" % self.unique)
594
595 self.worker.lock.acquire()
596
597 result = 'SUCCESS'
598 if (('ZUUL_REF' in self.parameters) and
599 self.worker.shouldFailTest(self.name,
600 self.parameters['ZUUL_REF'])):
601 result = 'FAILURE'
602 if self.aborted:
603 result = 'ABORTED'
604
605 if self.run_error:
606 work_fail = True
607 result = 'RUN_ERROR'
608 else:
609 data['result'] = result
Timothy Chavezb2332082015-08-07 20:08:04 -0500610 data['node_labels'] = ['bare-necessities']
611 data['node_name'] = 'foo'
Clark Boylanb640e052014-04-03 16:41:46 -0700612 work_fail = False
613
614 changes = None
615 if 'ZUUL_CHANGE_IDS' in self.parameters:
616 changes = self.parameters['ZUUL_CHANGE_IDS']
617
618 self.worker.build_history.append(
619 BuildHistory(name=self.name, number=self.number,
620 result=result, changes=changes, node=self.node,
621 uuid=self.unique, description=self.description,
James E. Blair456f2fb2016-02-09 09:29:33 -0800622 parameters=self.parameters,
Clark Boylanb640e052014-04-03 16:41:46 -0700623 pipeline=self.parameters['ZUUL_PIPELINE'])
624 )
625
626 self.job.sendWorkData(json.dumps(data))
627 if work_fail:
628 self.job.sendWorkFail()
629 else:
630 self.job.sendWorkComplete(json.dumps(data))
631 del self.worker.gearman_jobs[self.job.unique]
632 self.worker.running_builds.remove(self)
633 self.worker.lock.release()
634
635
636class FakeWorker(gear.Worker):
637 def __init__(self, worker_id, test):
638 super(FakeWorker, self).__init__(worker_id)
639 self.gearman_jobs = {}
640 self.build_history = []
641 self.running_builds = []
642 self.build_counter = 0
643 self.fail_tests = {}
644 self.test = test
645
646 self.hold_jobs_in_build = False
647 self.lock = threading.Lock()
648 self.__work_thread = threading.Thread(target=self.work)
649 self.__work_thread.daemon = True
650 self.__work_thread.start()
651
652 def handleJob(self, job):
653 parts = job.name.split(":")
654 cmd = parts[0]
655 name = parts[1]
656 if len(parts) > 2:
657 node = parts[2]
658 else:
659 node = None
660 if cmd == 'build':
661 self.handleBuild(job, name, node)
662 elif cmd == 'stop':
663 self.handleStop(job, name)
664 elif cmd == 'set_description':
665 self.handleSetDescription(job, name)
666
667 def handleBuild(self, job, name, node):
668 build = FakeBuild(self, job, self.build_counter, node)
669 job.build = build
670 self.gearman_jobs[job.unique] = job
671 self.build_counter += 1
672
673 self.running_builds.append(build)
674 build.start()
675
676 def handleStop(self, job, name):
677 self.log.debug("handle stop")
678 parameters = json.loads(job.arguments)
679 name = parameters['name']
680 number = parameters['number']
681 for build in self.running_builds:
682 if build.name == name and build.number == number:
683 build.aborted = True
684 build.release()
685 job.sendWorkComplete()
686 return
687 job.sendWorkFail()
688
689 def handleSetDescription(self, job, name):
690 self.log.debug("handle set description")
691 parameters = json.loads(job.arguments)
692 name = parameters['name']
693 number = parameters['number']
694 descr = parameters['html_description']
695 for build in self.running_builds:
696 if build.name == name and build.number == number:
697 build.description = descr
698 job.sendWorkComplete()
699 return
700 for build in self.build_history:
701 if build.name == name and build.number == number:
702 build.description = descr
703 job.sendWorkComplete()
704 return
705 job.sendWorkFail()
706
707 def work(self):
708 while self.running:
709 try:
710 job = self.getJob()
711 except gear.InterruptedError:
712 continue
713 try:
714 self.handleJob(job)
715 except:
716 self.log.exception("Worker exception:")
717
718 def addFailTest(self, name, change):
719 l = self.fail_tests.get(name, [])
720 l.append(change)
721 self.fail_tests[name] = l
722
723 def shouldFailTest(self, name, ref):
724 l = self.fail_tests.get(name, [])
725 for change in l:
726 if self.test.ref_has_change(ref, change):
727 return True
728 return False
729
730 def release(self, regex=None):
731 builds = self.running_builds[:]
732 self.log.debug("releasing build %s (%s)" % (regex,
733 len(self.running_builds)))
734 for build in builds:
735 if not regex or re.match(regex, build.name):
736 self.log.debug("releasing build %s" %
737 (build.parameters['ZUUL_UUID']))
738 build.release()
739 else:
740 self.log.debug("not releasing build %s" %
741 (build.parameters['ZUUL_UUID']))
742 self.log.debug("done releasing builds %s (%s)" %
743 (regex, len(self.running_builds)))
744
745
746class FakeGearmanServer(gear.Server):
747 def __init__(self):
748 self.hold_jobs_in_queue = False
749 super(FakeGearmanServer, self).__init__(0)
750
751 def getJobForConnection(self, connection, peek=False):
752 for queue in [self.high_queue, self.normal_queue, self.low_queue]:
753 for job in queue:
754 if not hasattr(job, 'waiting'):
755 if job.name.startswith('build:'):
756 job.waiting = self.hold_jobs_in_queue
757 else:
758 job.waiting = False
759 if job.waiting:
760 continue
761 if job.name in connection.functions:
762 if not peek:
763 queue.remove(job)
764 connection.related_jobs[job.handle] = job
765 job.worker_connection = connection
766 job.running = True
767 return job
768 return None
769
770 def release(self, regex=None):
771 released = False
772 qlen = (len(self.high_queue) + len(self.normal_queue) +
773 len(self.low_queue))
774 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
775 for job in self.getQueue():
776 cmd, name = job.name.split(':')
777 if cmd != 'build':
778 continue
779 if not regex or re.match(regex, name):
780 self.log.debug("releasing queued job %s" %
781 job.unique)
782 job.waiting = False
783 released = True
784 else:
785 self.log.debug("not releasing queued job %s" %
786 job.unique)
787 if released:
788 self.wakeConnections()
789 qlen = (len(self.high_queue) + len(self.normal_queue) +
790 len(self.low_queue))
791 self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
792
793
794class FakeSMTP(object):
795 log = logging.getLogger('zuul.FakeSMTP')
796
797 def __init__(self, messages, server, port):
798 self.server = server
799 self.port = port
800 self.messages = messages
801
802 def sendmail(self, from_email, to_email, msg):
803 self.log.info("Sending email from %s, to %s, with msg %s" % (
804 from_email, to_email, msg))
805
806 headers = msg.split('\n\n', 1)[0]
807 body = msg.split('\n\n', 1)[1]
808
809 self.messages.append(dict(
810 from_email=from_email,
811 to_email=to_email,
812 msg=msg,
813 headers=headers,
814 body=body,
815 ))
816
817 return True
818
819 def quit(self):
820 return True
821
822
823class FakeSwiftClientConnection(swiftclient.client.Connection):
824 def post_account(self, headers):
825 # Do nothing
826 pass
827
828 def get_auth(self):
829 # Returns endpoint and (unused) auth token
830 endpoint = os.path.join('https://storage.example.org', 'V1',
831 'AUTH_account')
832 return endpoint, ''
833
834
Maru Newby3fe5f852015-01-13 04:22:14 +0000835class BaseTestCase(testtools.TestCase):
Clark Boylanb640e052014-04-03 16:41:46 -0700836 log = logging.getLogger("zuul.test")
837
838 def setUp(self):
Maru Newby3fe5f852015-01-13 04:22:14 +0000839 super(BaseTestCase, self).setUp()
Clark Boylanb640e052014-04-03 16:41:46 -0700840 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
841 try:
842 test_timeout = int(test_timeout)
843 except ValueError:
844 # If timeout value is invalid do not set a timeout.
845 test_timeout = 0
846 if test_timeout > 0:
847 self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
848
849 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
850 os.environ.get('OS_STDOUT_CAPTURE') == '1'):
851 stdout = self.useFixture(fixtures.StringStream('stdout')).stream
852 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
853 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
854 os.environ.get('OS_STDERR_CAPTURE') == '1'):
855 stderr = self.useFixture(fixtures.StringStream('stderr')).stream
856 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
857 if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
858 os.environ.get('OS_LOG_CAPTURE') == '1'):
859 self.useFixture(fixtures.FakeLogger(
860 level=logging.DEBUG,
861 format='%(asctime)s %(name)-32s '
862 '%(levelname)-8s %(message)s'))
Maru Newby3fe5f852015-01-13 04:22:14 +0000863
864
865class ZuulTestCase(BaseTestCase):
866
867 def setUp(self):
868 super(ZuulTestCase, self).setUp()
James E. Blair97d902e2014-08-21 13:25:56 -0700869 if USE_TEMPDIR:
870 tmp_root = self.useFixture(fixtures.TempDir(
Joshua Hesketh29d99b72014-08-19 16:27:42 +1000871 rootdir=os.environ.get("ZUUL_TEST_ROOT"))
872 ).path
James E. Blair97d902e2014-08-21 13:25:56 -0700873 else:
874 tmp_root = os.environ.get("ZUUL_TEST_ROOT")
Clark Boylanb640e052014-04-03 16:41:46 -0700875 self.test_root = os.path.join(tmp_root, "zuul-test")
876 self.upstream_root = os.path.join(self.test_root, "upstream")
877 self.git_root = os.path.join(self.test_root, "git")
James E. Blairce8a2132016-05-19 15:21:52 -0700878 self.state_root = os.path.join(self.test_root, "lib")
Clark Boylanb640e052014-04-03 16:41:46 -0700879
880 if os.path.exists(self.test_root):
881 shutil.rmtree(self.test_root)
882 os.makedirs(self.test_root)
883 os.makedirs(self.upstream_root)
James E. Blairce8a2132016-05-19 15:21:52 -0700884 os.makedirs(self.state_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700885
886 # Make per test copy of Configuration.
887 self.setup_config()
888 self.config.set('zuul', 'layout_config',
Joshua Heskethacccffc2015-03-31 23:38:17 +1100889 os.path.join(FIXTURE_DIR,
890 self.config.get('zuul', 'layout_config')))
Clark Boylanb640e052014-04-03 16:41:46 -0700891 self.config.set('merger', 'git_dir', self.git_root)
James E. Blairce8a2132016-05-19 15:21:52 -0700892 self.config.set('zuul', 'state_dir', self.state_root)
Clark Boylanb640e052014-04-03 16:41:46 -0700893
894 # For each project in config:
895 self.init_repo("org/project")
896 self.init_repo("org/project1")
897 self.init_repo("org/project2")
898 self.init_repo("org/project3")
James E. Blair97d902e2014-08-21 13:25:56 -0700899 self.init_repo("org/project4")
James E. Blairbce35e12014-08-21 14:31:17 -0700900 self.init_repo("org/project5")
901 self.init_repo("org/project6")
Clark Boylanb640e052014-04-03 16:41:46 -0700902 self.init_repo("org/one-job-project")
903 self.init_repo("org/nonvoting-project")
904 self.init_repo("org/templated-project")
905 self.init_repo("org/layered-project")
906 self.init_repo("org/node-project")
907 self.init_repo("org/conflict-project")
908 self.init_repo("org/noop-project")
909 self.init_repo("org/experimental-project")
Evgeny Antyshevd6e546c2015-06-11 15:13:57 +0000910 self.init_repo("org/no-jobs-project")
Clark Boylanb640e052014-04-03 16:41:46 -0700911
912 self.statsd = FakeStatsd()
Ian Wienandff977bf2015-09-30 15:38:47 +1000913 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
914 # see: https://github.com/jsocol/pystatsd/issues/61
915 os.environ['STATSD_HOST'] = '127.0.0.1'
Clark Boylanb640e052014-04-03 16:41:46 -0700916 os.environ['STATSD_PORT'] = str(self.statsd.port)
917 self.statsd.start()
918 # the statsd client object is configured in the statsd module import
919 reload(statsd)
920 reload(zuul.scheduler)
921
922 self.gearman_server = FakeGearmanServer()
923
924 self.config.set('gearman', 'port', str(self.gearman_server.port))
925
926 self.worker = FakeWorker('fake_worker', self)
927 self.worker.addServer('127.0.0.1', self.gearman_server.port)
928 self.gearman_server.worker = self.worker
929
Joshua Hesketh352264b2015-08-11 23:42:08 +1000930 zuul.source.gerrit.GerritSource.replication_timeout = 1.5
931 zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
932 zuul.connection.gerrit.GerritEventConnector.delay = 0.0
Clark Boylanb640e052014-04-03 16:41:46 -0700933
Joshua Hesketh352264b2015-08-11 23:42:08 +1000934 self.sched = zuul.scheduler.Scheduler(self.config)
Clark Boylanb640e052014-04-03 16:41:46 -0700935
936 self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
937 FakeSwiftClientConnection))
938 self.swift = zuul.lib.swift.Swift(self.config)
939
Jan Hruban6b71aff2015-10-22 16:58:08 +0200940 self.event_queues = [
941 self.sched.result_event_queue,
942 self.sched.trigger_event_queue
943 ]
944
Joshua Hesketh352264b2015-08-11 23:42:08 +1000945 self.configure_connections()
946 self.sched.registerConnections(self.connections)
Joshua Hesketh352264b2015-08-11 23:42:08 +1000947
Clark Boylanb640e052014-04-03 16:41:46 -0700948 def URLOpenerFactory(*args, **kw):
Morgan Fainberg293f7f82016-05-30 14:01:22 -0700949 if isinstance(args[0], urllib.request.Request):
Clark Boylanb640e052014-04-03 16:41:46 -0700950 return old_urlopen(*args, **kw)
Clark Boylanb640e052014-04-03 16:41:46 -0700951 return FakeURLOpener(self.upstream_root, *args, **kw)
952
Morgan Fainberg293f7f82016-05-30 14:01:22 -0700953 old_urlopen = urllib.request.urlopen
954 urllib.request.urlopen = URLOpenerFactory
Clark Boylanb640e052014-04-03 16:41:46 -0700955
Joshua Hesketh352264b2015-08-11 23:42:08 +1000956 self.merge_server = zuul.merger.server.MergeServer(self.config,
957 self.connections)
958 self.merge_server.start()
Clark Boylanb640e052014-04-03 16:41:46 -0700959
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100960 self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
961 self.swift)
962 self.merge_client = zuul.merger.client.MergeClient(
963 self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700964
965 self.sched.setLauncher(self.launcher)
966 self.sched.setMerger(self.merge_client)
Clark Boylanb640e052014-04-03 16:41:46 -0700967
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100968 self.webapp = zuul.webapp.WebApp(self.sched, port=0)
969 self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
Clark Boylanb640e052014-04-03 16:41:46 -0700970
971 self.sched.start()
972 self.sched.reconfigure(self.config)
973 self.sched.resume()
974 self.webapp.start()
975 self.rpc.start()
976 self.launcher.gearman.waitForServer()
977 self.registerJobs()
978 self.builds = self.worker.running_builds
979 self.history = self.worker.build_history
980
981 self.addCleanup(self.assertFinalState)
982 self.addCleanup(self.shutdown)
983
Joshua Hesketh352264b2015-08-11 23:42:08 +1000984 def configure_connections(self):
985 # Register connections from the config
986 self.smtp_messages = []
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100987
Joshua Hesketh352264b2015-08-11 23:42:08 +1000988 def FakeSMTPFactory(*args, **kw):
989 args = [self.smtp_messages] + list(args)
990 return FakeSMTP(*args, **kw)
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100991
Joshua Hesketh352264b2015-08-11 23:42:08 +1000992 self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100993
Joshua Hesketh352264b2015-08-11 23:42:08 +1000994 # Set a changes database so multiple FakeGerrit's can report back to
995 # a virtual canonical database given by the configured hostname
996 self.gerrit_changes_dbs = {}
997 self.gerrit_queues_dbs = {}
998 self.connections = {}
Joshua Hesketh850ccb62014-11-27 11:31:02 +1100999
Joshua Hesketh352264b2015-08-11 23:42:08 +10001000 for section_name in self.config.sections():
1001 con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
1002 section_name, re.I)
1003 if not con_match:
1004 continue
1005 con_name = con_match.group(2)
1006 con_config = dict(self.config.items(section_name))
1007
1008 if 'driver' not in con_config:
1009 raise Exception("No driver specified for connection %s."
1010 % con_name)
1011
1012 con_driver = con_config['driver']
1013
1014 # TODO(jhesketh): load the required class automatically
1015 if con_driver == 'gerrit':
Joshua Heskethacccffc2015-03-31 23:38:17 +11001016 if con_config['server'] not in self.gerrit_changes_dbs.keys():
1017 self.gerrit_changes_dbs[con_config['server']] = {}
1018 if con_config['server'] not in self.gerrit_queues_dbs.keys():
1019 self.gerrit_queues_dbs[con_config['server']] = \
1020 Queue.Queue()
1021 self.event_queues.append(
1022 self.gerrit_queues_dbs[con_config['server']])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001023 self.connections[con_name] = FakeGerritConnection(
1024 con_name, con_config,
Joshua Heskethacccffc2015-03-31 23:38:17 +11001025 changes_db=self.gerrit_changes_dbs[con_config['server']],
1026 queues_db=self.gerrit_queues_dbs[con_config['server']],
Jan Hruban6b71aff2015-10-22 16:58:08 +02001027 upstream_root=self.upstream_root
Joshua Hesketh352264b2015-08-11 23:42:08 +10001028 )
Joshua Heskethacccffc2015-03-31 23:38:17 +11001029 setattr(self, 'fake_' + con_name, self.connections[con_name])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001030 elif con_driver == 'smtp':
1031 self.connections[con_name] = \
1032 zuul.connection.smtp.SMTPConnection(con_name, con_config)
1033 else:
1034 raise Exception("Unknown driver, %s, for connection %s"
1035 % (con_config['driver'], con_name))
1036
1037 # If the [gerrit] or [smtp] sections still exist, load them in as a
1038 # connection named 'gerrit' or 'smtp' respectfully
1039
1040 if 'gerrit' in self.config.sections():
1041 self.gerrit_changes_dbs['gerrit'] = {}
1042 self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
Jan Hruban6b71aff2015-10-22 16:58:08 +02001043 self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
Joshua Hesketh352264b2015-08-11 23:42:08 +10001044 self.connections['gerrit'] = FakeGerritConnection(
1045 '_legacy_gerrit', dict(self.config.items('gerrit')),
1046 changes_db=self.gerrit_changes_dbs['gerrit'],
1047 queues_db=self.gerrit_queues_dbs['gerrit'])
1048
1049 if 'smtp' in self.config.sections():
1050 self.connections['smtp'] = \
1051 zuul.connection.smtp.SMTPConnection(
1052 '_legacy_smtp', dict(self.config.items('smtp')))
Joshua Hesketh850ccb62014-11-27 11:31:02 +11001053
Joshua Heskethacccffc2015-03-31 23:38:17 +11001054 def setup_config(self, config_file='zuul.conf'):
Clark Boylanb640e052014-04-03 16:41:46 -07001055 """Per test config object. Override to set different config."""
1056 self.config = ConfigParser.ConfigParser()
Joshua Heskethacccffc2015-03-31 23:38:17 +11001057 self.config.read(os.path.join(FIXTURE_DIR, config_file))
Clark Boylanb640e052014-04-03 16:41:46 -07001058
1059 def assertFinalState(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001060 # Make sure that git.Repo objects have been garbage collected.
1061 repos = []
1062 gc.collect()
1063 for obj in gc.get_objects():
1064 if isinstance(obj, git.Repo):
1065 repos.append(obj)
1066 self.assertEqual(len(repos), 0)
1067 self.assertEmptyQueues()
James E. Blair0577cd62015-02-07 11:42:12 -08001068 for pipeline in self.sched.layout.pipelines.values():
1069 if isinstance(pipeline.manager,
1070 zuul.scheduler.IndependentPipelineManager):
1071 self.assertEqual(len(pipeline.queues), 0)
Clark Boylanb640e052014-04-03 16:41:46 -07001072
1073 def shutdown(self):
1074 self.log.debug("Shutting down after tests")
1075 self.launcher.stop()
1076 self.merge_server.stop()
1077 self.merge_server.join()
1078 self.merge_client.stop()
1079 self.worker.shutdown()
Clark Boylanb640e052014-04-03 16:41:46 -07001080 self.sched.stop()
1081 self.sched.join()
1082 self.statsd.stop()
1083 self.statsd.join()
1084 self.webapp.stop()
1085 self.webapp.join()
1086 self.rpc.stop()
1087 self.rpc.join()
1088 self.gearman_server.shutdown()
1089 threads = threading.enumerate()
1090 if len(threads) > 1:
1091 self.log.error("More than one thread is running: %s" % threads)
Clark Boylanb640e052014-04-03 16:41:46 -07001092
1093 def init_repo(self, project):
1094 parts = project.split('/')
1095 path = os.path.join(self.upstream_root, *parts[:-1])
1096 if not os.path.exists(path):
1097 os.makedirs(path)
1098 path = os.path.join(self.upstream_root, project)
1099 repo = git.Repo.init(path)
1100
1101 repo.config_writer().set_value('user', 'email', 'user@example.com')
1102 repo.config_writer().set_value('user', 'name', 'User Name')
1103 repo.config_writer().write()
1104
1105 fn = os.path.join(path, 'README')
1106 f = open(fn, 'w')
1107 f.write("test\n")
1108 f.close()
1109 repo.index.add([fn])
1110 repo.index.commit('initial commit')
1111 master = repo.create_head('master')
1112 repo.create_tag('init')
1113
James E. Blair97d902e2014-08-21 13:25:56 -07001114 repo.head.reference = master
James E. Blair879dafb2015-07-17 14:04:49 -07001115 zuul.merger.merger.reset_repo_to_head(repo)
James E. Blair97d902e2014-08-21 13:25:56 -07001116 repo.git.clean('-x', '-f', '-d')
1117
1118 self.create_branch(project, 'mp')
1119
1120 def create_branch(self, project, branch):
1121 path = os.path.join(self.upstream_root, project)
1122 repo = git.Repo.init(path)
1123 fn = os.path.join(path, 'README')
1124
1125 branch_head = repo.create_head(branch)
1126 repo.head.reference = branch_head
Clark Boylanb640e052014-04-03 16:41:46 -07001127 f = open(fn, 'a')
James E. Blair97d902e2014-08-21 13:25:56 -07001128 f.write("test %s\n" % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001129 f.close()
1130 repo.index.add([fn])
James E. Blair97d902e2014-08-21 13:25:56 -07001131 repo.index.commit('%s commit' % branch)
Clark Boylanb640e052014-04-03 16:41:46 -07001132
James E. Blair97d902e2014-08-21 13:25:56 -07001133 repo.head.reference = repo.heads['master']
James E. Blair879dafb2015-07-17 14:04:49 -07001134 zuul.merger.merger.reset_repo_to_head(repo)
Clark Boylanb640e052014-04-03 16:41:46 -07001135 repo.git.clean('-x', '-f', '-d')
1136
1137 def ref_has_change(self, ref, change):
1138 path = os.path.join(self.git_root, change.project)
1139 repo = git.Repo(path)
Mike Heald8225f522014-11-21 09:52:33 +00001140 try:
1141 for commit in repo.iter_commits(ref):
1142 if commit.message.strip() == ('%s-1' % change.subject):
1143 return True
1144 except GitCommandError:
1145 pass
Clark Boylanb640e052014-04-03 16:41:46 -07001146 return False
1147
1148 def job_has_changes(self, *args):
1149 job = args[0]
1150 commits = args[1:]
1151 if isinstance(job, FakeBuild):
1152 parameters = job.parameters
1153 else:
1154 parameters = json.loads(job.arguments)
1155 project = parameters['ZUUL_PROJECT']
1156 path = os.path.join(self.git_root, project)
1157 repo = git.Repo(path)
1158 ref = parameters['ZUUL_REF']
1159 sha = parameters['ZUUL_COMMIT']
1160 repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
1161 repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
1162 commit_messages = ['%s-1' % commit.subject for commit in commits]
1163 self.log.debug("Checking if job %s has changes; commit_messages %s;"
1164 " repo_messages %s; sha %s" % (job, commit_messages,
1165 repo_messages, sha))
1166 for msg in commit_messages:
1167 if msg not in repo_messages:
1168 self.log.debug(" messages do not match")
1169 return False
1170 if repo_shas[0] != sha:
1171 self.log.debug(" sha does not match")
1172 return False
1173 self.log.debug(" OK")
1174 return True
1175
1176 def registerJobs(self):
1177 count = 0
1178 for job in self.sched.layout.jobs.keys():
1179 self.worker.registerFunction('build:' + job)
1180 count += 1
1181 self.worker.registerFunction('stop:' + self.worker.worker_id)
1182 count += 1
1183
1184 while len(self.gearman_server.functions) < count:
1185 time.sleep(0)
1186
James E. Blairb8c16472015-05-05 14:55:26 -07001187 def orderedRelease(self):
1188 # Run one build at a time to ensure non-race order:
1189 while len(self.builds):
1190 self.release(self.builds[0])
1191 self.waitUntilSettled()
1192
Clark Boylanb640e052014-04-03 16:41:46 -07001193 def release(self, job):
1194 if isinstance(job, FakeBuild):
1195 job.release()
1196 else:
1197 job.waiting = False
1198 self.log.debug("Queued job %s released" % job.unique)
1199 self.gearman_server.wakeConnections()
1200
1201 def getParameter(self, job, name):
1202 if isinstance(job, FakeBuild):
1203 return job.parameters[name]
1204 else:
1205 parameters = json.loads(job.arguments)
1206 return parameters[name]
1207
1208 def resetGearmanServer(self):
1209 self.worker.setFunctions([])
1210 while True:
1211 done = True
1212 for connection in self.gearman_server.active_connections:
1213 if (connection.functions and
1214 connection.client_id not in ['Zuul RPC Listener',
1215 'Zuul Merger']):
1216 done = False
1217 if done:
1218 break
1219 time.sleep(0)
1220 self.gearman_server.functions = set()
1221 self.rpc.register()
1222 self.merge_server.register()
1223
1224 def haveAllBuildsReported(self):
1225 # See if Zuul is waiting on a meta job to complete
1226 if self.launcher.meta_jobs:
1227 return False
1228 # Find out if every build that the worker has completed has been
1229 # reported back to Zuul. If it hasn't then that means a Gearman
1230 # event is still in transit and the system is not stable.
1231 for build in self.worker.build_history:
1232 zbuild = self.launcher.builds.get(build.uuid)
1233 if not zbuild:
1234 # It has already been reported
1235 continue
1236 # It hasn't been reported yet.
1237 return False
1238 # Make sure that none of the worker connections are in GRAB_WAIT
1239 for connection in self.worker.active_connections:
1240 if connection.state == 'GRAB_WAIT':
1241 return False
1242 return True
1243
1244 def areAllBuildsWaiting(self):
Clark Boylanb640e052014-04-03 16:41:46 -07001245 builds = self.launcher.builds.values()
1246 for build in builds:
1247 client_job = None
1248 for conn in self.launcher.gearman.active_connections:
1249 for j in conn.related_jobs.values():
1250 if j.unique == build.uuid:
1251 client_job = j
1252 break
1253 if not client_job:
1254 self.log.debug("%s is not known to the gearman client" %
1255 build)
James E. Blairf15139b2015-04-02 16:37:15 -07001256 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001257 if not client_job.handle:
1258 self.log.debug("%s has no handle" % client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001259 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001260 server_job = self.gearman_server.jobs.get(client_job.handle)
1261 if not server_job:
1262 self.log.debug("%s is not known to the gearman server" %
1263 client_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001264 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001265 if not hasattr(server_job, 'waiting'):
1266 self.log.debug("%s is being enqueued" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001267 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001268 if server_job.waiting:
1269 continue
1270 worker_job = self.worker.gearman_jobs.get(server_job.unique)
1271 if worker_job:
James E. Blairf15139b2015-04-02 16:37:15 -07001272 if build.number is None:
1273 self.log.debug("%s has not reported start" % worker_job)
1274 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001275 if worker_job.build.isWaiting():
1276 continue
1277 else:
1278 self.log.debug("%s is running" % worker_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001279 return False
Clark Boylanb640e052014-04-03 16:41:46 -07001280 else:
1281 self.log.debug("%s is unassigned" % server_job)
James E. Blairf15139b2015-04-02 16:37:15 -07001282 return False
1283 return True
Clark Boylanb640e052014-04-03 16:41:46 -07001284
Jan Hruban6b71aff2015-10-22 16:58:08 +02001285 def eventQueuesEmpty(self):
1286 for queue in self.event_queues:
1287 yield queue.empty()
1288
1289 def eventQueuesJoin(self):
1290 for queue in self.event_queues:
1291 queue.join()
1292
Clark Boylanb640e052014-04-03 16:41:46 -07001293 def waitUntilSettled(self):
1294 self.log.debug("Waiting until settled...")
1295 start = time.time()
1296 while True:
1297 if time.time() - start > 10:
Morgan Fainberg4c6a7742016-05-27 08:42:17 -07001298 print('queue status:', ''.join(self.eventQueuesEmpty()))
1299 print(self.areAllBuildsWaiting())
Clark Boylanb640e052014-04-03 16:41:46 -07001300 raise Exception("Timeout waiting for Zuul to settle")
1301 # Make sure no new events show up while we're checking
1302 self.worker.lock.acquire()
1303 # have all build states propogated to zuul?
1304 if self.haveAllBuildsReported():
1305 # Join ensures that the queue is empty _and_ events have been
1306 # processed
Jan Hruban6b71aff2015-10-22 16:58:08 +02001307 self.eventQueuesJoin()
Clark Boylanb640e052014-04-03 16:41:46 -07001308 self.sched.run_handler_lock.acquire()
James E. Blairae1b2d12015-02-07 08:01:21 -08001309 if (not self.merge_client.build_sets and
Jan Hruban6b71aff2015-10-22 16:58:08 +02001310 all(self.eventQueuesEmpty()) and
Clark Boylanb640e052014-04-03 16:41:46 -07001311 self.haveAllBuildsReported() and
1312 self.areAllBuildsWaiting()):
1313 self.sched.run_handler_lock.release()
1314 self.worker.lock.release()
1315 self.log.debug("...settled.")
1316 return
1317 self.sched.run_handler_lock.release()
1318 self.worker.lock.release()
1319 self.sched.wake_event.wait(0.1)
1320
1321 def countJobResults(self, jobs, result):
1322 jobs = filter(lambda x: x.result == result, jobs)
1323 return len(jobs)
1324
1325 def getJobFromHistory(self, name):
1326 history = self.worker.build_history
1327 for job in history:
1328 if job.name == name:
1329 return job
1330 raise Exception("Unable to find job %s in history" % name)
1331
1332 def assertEmptyQueues(self):
1333 # Make sure there are no orphaned jobs
1334 for pipeline in self.sched.layout.pipelines.values():
1335 for queue in pipeline.queues:
1336 if len(queue.queue) != 0:
Morgan Fainberg4c6a7742016-05-27 08:42:17 -07001337 print('pipeline %s queue %s contents %s' % (
1338 pipeline.name, queue.name, queue.queue))
Antoine Mussobd86a312014-01-08 14:51:33 +01001339 self.assertEqual(len(queue.queue), 0,
1340 "Pipelines queues should be empty")
Clark Boylanb640e052014-04-03 16:41:46 -07001341
1342 def assertReportedStat(self, key, value=None, kind=None):
1343 start = time.time()
1344 while time.time() < (start + 5):
1345 for stat in self.statsd.stats:
1346 pprint.pprint(self.statsd.stats)
1347 k, v = stat.split(':')
1348 if key == k:
1349 if value is None and kind is None:
1350 return
1351 elif value:
1352 if value == v:
1353 return
1354 elif kind:
1355 if v.endswith('|' + kind):
1356 return
1357 time.sleep(0.1)
1358
1359 pprint.pprint(self.statsd.stats)
1360 raise Exception("Key %s not found in reported stats" % key)