blob: c5ea32074dc6e0444ce48452833373c50904531e [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import configparser as ConfigParser
import gc
import hashlib
import json
import logging
import os
import pprint
from six.moves import queue as Queue
from six.moves import urllib
import random
import re
import select
import shutil
from six.moves import reload_module
import socket
import string
import subprocess
import swiftclient
import tempfile
import threading
import time
import git
import gear
import fixtures
import statsd
import testtools
from git import GitCommandError
import zuul.connection.gerrit
import zuul.connection.smtp
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
import zuul.launcher.server
import zuul.launcher.client
import zuul.lib.swift
import zuul.lib.connections
import zuul.merger.client
import zuul.merger.merger
import zuul.merger.server
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.source.gerrit
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
USE_TEMPDIR = True
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s')
def repack_repo(path):
cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
output = subprocess.Popen(cmd, close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode:
raise Exception("git repack returned %d" % output.returncode)
return out
def random_sha1():
return hashlib.sha1(str(random.random())).hexdigest()
def iterate_timeout(max_seconds, purpose):
start = time.time()
count = 0
while (time.time() < start + max_seconds):
count += 1
yield count
time.sleep(0)
raise Exception("Timeout waiting for %s" % purpose)
class ChangeReference(git.Reference):
_common_path_default = "refs/changes"
_points_to_commits_only = True
class FakeChange(object):
categories = {'APRV': ('Approved', -1, 1),
'CRVW': ('Code-Review', -2, 2),
'VRFY': ('Verified', -2, 2)}
def __init__(self, gerrit, number, project, branch, subject,
status='NEW', upstream_root=None):
self.gerrit = gerrit
self.reported = 0
self.queried = 0
self.patchsets = []
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.latest_patchset = 0
self.depends_on_change = None
self.needed_by_changes = []
self.fail_merge = False
self.messages = []
self.data = {
'branch': branch,
'comments': [],
'commitMessage': subject,
'createdOn': time.time(),
'id': 'I' + random_sha1(),
'lastUpdated': time.time(),
'number': str(number),
'open': status == 'NEW',
'owner': {'email': 'user@example.com',
'name': 'User Name',
'username': 'username'},
'patchSets': self.patchsets,
'project': project,
'status': status,
'subject': subject,
'submitRecords': [],
'url': 'https://hostname/%s' % number}
self.upstream_root = upstream_root
self.addPatchset()
self.data['submitRecords'] = self.getSubmitRecords()
self.open = status == 'NEW'
def add_fake_change_to_repo(self, msg, fn, large):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
self.latest_patchset),
'refs/tags/init')
repo.head.reference = ref
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
path = os.path.join(self.upstream_root, self.project)
if not large:
fn = os.path.join(path, fn)
f = open(fn, 'w')
f.write("test %s %s %s\n" %
(self.branch, self.number, self.latest_patchset))
f.close()
repo.index.add([fn])
else:
for fni in range(100):
fn = os.path.join(path, str(fni))
f = open(fn, 'w')
for ci in range(4096):
f.write(random.choice(string.printable))
f.close()
repo.index.add([fn])
r = repo.index.commit(msg)
repo.head.reference = 'master'
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
return r
def addPatchset(self, files=[], large=False):
self.latest_patchset += 1
if files:
fn = files[0]
else:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
msg = self.subject + '-' + str(self.latest_patchset)
c = self.add_fake_change_to_repo(msg, fn, large)
ps_files = [{'file': '/COMMIT_MSG',
'type': 'ADDED'},
{'file': 'README',
'type': 'MODIFIED'}]
for f in files:
ps_files.append({'file': f, 'type': 'ADDED'})
d = {'approvals': [],
'createdOn': time.time(),
'files': ps_files,
'number': str(self.latest_patchset),
'ref': 'refs/changes/1/%s/%s' % (self.number,
self.latest_patchset),
'revision': c.hexsha,
'uploader': {'email': 'user@example.com',
'name': 'User name',
'username': 'user'}}
self.data['currentPatchSet'] = d
self.patchsets.append(d)
self.data['submitRecords'] = self.getSubmitRecords()
def getPatchsetCreatedEvent(self, patchset):
event = {"type": "patchset-created",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"uploader": {"name": "User Name"}}
return event
def getChangeRestoredEvent(self):
event = {"type": "change-restored",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"restorer": {"name": "User Name"},
"patchSet": self.patchsets[-1],
"reason": ""}
return event
def getChangeAbandonedEvent(self):
event = {"type": "change-abandoned",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"abandoner": {"name": "User Name"},
"patchSet": self.patchsets[-1],
"reason": ""}
return event
def getChangeCommentEvent(self, patchset):
event = {"type": "comment-added",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"author": {"name": "User Name"},
"approvals": [{"type": "Code-Review",
"description": "Code-Review",
"value": "0"}],
"comment": "This is a comment"}
return event
def addApproval(self, category, value, username='reviewer_john',
granted_on=None, message=''):
if not granted_on:
granted_on = time.time()
approval = {
'description': self.categories[category][0],
'type': category,
'value': str(value),
'by': {
'username': username,
'email': username + '@example.com',
},
'grantedOn': int(granted_on)
}
for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
if x['by']['username'] == username and x['type'] == category:
del self.patchsets[-1]['approvals'][i]
self.patchsets[-1]['approvals'].append(approval)
event = {'approvals': [approval],
'author': {'email': 'author@example.com',
'name': 'Patchset Author',
'username': 'author_phil'},
'change': {'branch': self.branch,
'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
'number': str(self.number),
'owner': {'email': 'owner@example.com',
'name': 'Change Owner',
'username': 'owner_jane'},
'project': self.project,
'subject': self.subject,
'topic': 'master',
'url': 'https://hostname/459'},
'comment': message,
'patchSet': self.patchsets[-1],
'type': 'comment-added'}
self.data['submitRecords'] = self.getSubmitRecords()
return json.loads(json.dumps(event))
def getSubmitRecords(self):
status = {}
for cat in self.categories.keys():
status[cat] = 0
for a in self.patchsets[-1]['approvals']:
cur = status[a['type']]
cat_min, cat_max = self.categories[a['type']][1:]
new = int(a['value'])
if new == cat_min:
cur = new
elif abs(new) > abs(cur):
cur = new
status[a['type']] = cur
labels = []
ok = True
for typ, cat in self.categories.items():
cur = status[typ]
cat_min, cat_max = cat[1:]
if cur == cat_min:
value = 'REJECT'
ok = False
elif cur == cat_max:
value = 'OK'
else:
value = 'NEED'
ok = False
labels.append({'label': cat[0], 'status': value})
if ok:
return [{'status': 'OK'}]
return [{'status': 'NOT_READY',
'labels': labels}]
def setDependsOn(self, other, patchset):
self.depends_on_change = other
d = {'id': other.data['id'],
'number': other.data['number'],
'ref': other.patchsets[patchset - 1]['ref']
}
self.data['dependsOn'] = [d]
other.needed_by_changes.append(self)
needed = other.data.get('neededBy', [])
d = {'id': self.data['id'],
'number': self.data['number'],
'ref': self.patchsets[patchset - 1]['ref'],
'revision': self.patchsets[patchset - 1]['revision']
}
needed.append(d)
other.data['neededBy'] = needed
def query(self):
self.queried += 1
d = self.data.get('dependsOn')
if d:
d = d[0]
if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
d['isCurrentPatchSet'] = True
else:
d['isCurrentPatchSet'] = False
return json.loads(json.dumps(self.data))
def setMerged(self):
if (self.depends_on_change and
self.depends_on_change.data['status'] != 'MERGED'):
return
if self.fail_merge:
return
self.data['status'] = 'MERGED'
self.open = False
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
repo.heads[self.branch].commit = \
repo.commit(self.patchsets[-1]['revision'])
def setReported(self):
self.reported += 1
class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
log = logging.getLogger("zuul.test.FakeGerritConnection")
def __init__(self, connection_name, connection_config,
changes_db=None, queues_db=None, upstream_root=None):
super(FakeGerritConnection, self).__init__(connection_name,
connection_config)
self.event_queue = queues_db
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_db
self.queries = []
self.upstream_root = upstream_root
def addFakeChange(self, project, branch, subject, status='NEW'):
self.change_number += 1
c = FakeChange(self, self.change_number, project, branch, subject,
upstream_root=self.upstream_root,
status=status)
self.changes[self.change_number] = c
return c
def review(self, project, changeid, message, action):
number, ps = changeid.split(',')
change = self.changes[int(number)]
# Add the approval back onto the change (ie simulate what gerrit would
# do).
# Usually when zuul leaves a review it'll create a feedback loop where
# zuul's review enters another gerrit event (which is then picked up by
# zuul). However, we can't mimic this behaviour (by adding this
# approval event into the queue) as it stops jobs from checking what
# happens before this event is triggered. If a job needs to see what
# happens they can add their own verified event into the queue.
# Nevertheless, we can update change with the new review in gerrit.
for cat in ['CRVW', 'VRFY', 'APRV']:
if cat in action:
change.addApproval(cat, action[cat], username=self.user)
if 'label' in action:
parts = action['label'].split('=')
change.addApproval(parts[0], parts[2], username=self.user)
change.messages.append(message)
if 'submit' in action:
change.setMerged()
if message:
change.setReported()
def query(self, number):
change = self.changes.get(int(number))
if change:
return change.query()
return {}
def simpleQuery(self, query):
self.log.debug("simpleQuery: %s" % query)
self.queries.append(query)
if query.startswith('change:'):
# Query a specific changeid
changeid = query[len('change:'):]
l = [change.query() for change in self.changes.values()
if change.data['id'] == changeid]
elif query.startswith('message:'):
# Query the content of a commit message
msg = query[len('message:'):].strip()
l = [change.query() for change in self.changes.values()
if msg in change.data['commitMessage']]
else:
# Query all open changes
l = [change.query() for change in self.changes.values()]
return l
def _start_watcher_thread(self, *args, **kw):
pass
def getGitUrl(self, project):
return os.path.join(self.upstream_root, project.name)
class BuildHistory(object):
def __init__(self, **kw):
self.__dict__.update(kw)
def __repr__(self):
return ("<Completed build, result: %s name: %s #%s changes: %s>" %
(self.result, self.name, self.number, self.changes))
class FakeURLOpener(object):
def __init__(self, upstream_root, url):
self.upstream_root = upstream_root
self.url = url
def read(self):
res = urllib.parse.urlparse(self.url)
path = res.path
project = '/'.join(path.split('/')[2:-2])
ret = '001e# service=git-upload-pack\n'
ret += ('000000a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
'multi_ack thin-pack side-band side-band-64k ofs-delta '
'shallow no-progress include-tag multi_ack_detailed no-done\n')
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
for ref in repo.refs:
r = ref.object.hexsha + ' ' + ref.path + '\n'
ret += '%04x%s' % (len(r) + 4, r)
ret += '0000'
return ret
class FakeStatsd(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 0))
self.port = self.sock.getsockname()[1]
self.wake_read, self.wake_write = os.pipe()
self.stats = []
def run(self):
while True:
poll = select.poll()
poll.register(self.sock, select.POLLIN)
poll.register(self.wake_read, select.POLLIN)
ret = poll.poll()
for (fd, event) in ret:
if fd == self.sock.fileno():
data = self.sock.recvfrom(1024)
if not data:
return
self.stats.append(data[0])
if fd == self.wake_read:
return
def stop(self):
os.write(self.wake_write, '1\n')
class FakeBuild(threading.Thread):
log = logging.getLogger("zuul.test")
def __init__(self, worker, job, number, node):
threading.Thread.__init__(self)
self.daemon = True
self.worker = worker
self.job = job
self.name = job.name.split(':')[1]
self.number = number
self.node = node
self.parameters = json.loads(job.arguments)
self.unique = self.parameters['ZUUL_UUID']
self.wait_condition = threading.Condition()
self.waiting = False
self.aborted = False
self.created = time.time()
self.description = ''
self.run_error = False
def release(self):
self.wait_condition.acquire()
self.wait_condition.notify()
self.waiting = False
self.log.debug("Build %s released" % self.unique)
self.wait_condition.release()
def isWaiting(self):
self.wait_condition.acquire()
if self.waiting:
ret = True
else:
ret = False
self.wait_condition.release()
return ret
def _wait(self):
self.wait_condition.acquire()
self.waiting = True
self.log.debug("Build %s waiting" % self.unique)
self.wait_condition.wait()
self.wait_condition.release()
def run(self):
data = {
'url': 'https://server/job/%s/%s/' % (self.name, self.number),
'name': self.name,
'number': self.number,
'manager': self.worker.worker_id,
'worker_name': 'My Worker',
'worker_hostname': 'localhost',
'worker_ips': ['127.0.0.1', '192.168.1.1'],
'worker_fqdn': 'zuul.example.org',
'worker_program': 'FakeBuilder',
'worker_version': 'v1.1',
'worker_extra': {'something': 'else'}
}
self.log.debug('Running build %s' % self.unique)
self.job.sendWorkData(json.dumps(data))
self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
self.job.sendWorkStatus(0, 100)
if self.worker.hold_jobs_in_build:
self.log.debug('Holding build %s' % self.unique)
self._wait()
self.log.debug("Build %s continuing" % self.unique)
self.worker.lock.acquire()
result = 'SUCCESS'
if (('ZUUL_REF' in self.parameters) and
self.worker.shouldFailTest(self.name,
self.parameters['ZUUL_REF'])):
result = 'FAILURE'
if self.aborted:
result = 'ABORTED'
if self.run_error:
work_fail = True
result = 'RUN_ERROR'
else:
data['result'] = result
data['node_labels'] = ['bare-necessities']
data['node_name'] = 'foo'
work_fail = False
changes = None
if 'ZUUL_CHANGE_IDS' in self.parameters:
changes = self.parameters['ZUUL_CHANGE_IDS']
self.worker.build_history.append(
BuildHistory(name=self.name, number=self.number,
result=result, changes=changes, node=self.node,
uuid=self.unique, description=self.description,
parameters=self.parameters,
pipeline=self.parameters['ZUUL_PIPELINE'])
)
self.job.sendWorkData(json.dumps(data))
if work_fail:
self.job.sendWorkFail()
else:
self.job.sendWorkComplete(json.dumps(data))
del self.worker.gearman_jobs[self.job.unique]
self.worker.running_builds.remove(self)
self.worker.lock.release()
class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
def __init__(self, *args, **kw):
super(RecordingLaunchServer, self).__init__(*args, **kw)
self.job_history = []
def launch(self, job):
self.job_history.append(job)
job.data = []
def sendWorkComplete(data=b''):
job.data.append(data)
gear.WorkerJob.sendWorkComplete(job, data)
job.sendWorkComplete = sendWorkComplete
super(RecordingLaunchServer, self).launch(job)
class FakeWorker(gear.Worker):
def __init__(self, worker_id, test):
super(FakeWorker, self).__init__(worker_id)
self.gearman_jobs = {}
self.build_history = []
self.running_builds = []
self.build_counter = 0
self.fail_tests = {}
self.test = test
self.hold_jobs_in_build = False
self.lock = threading.Lock()
self.__work_thread = threading.Thread(target=self.work)
self.__work_thread.daemon = True
self.__work_thread.start()
def handleJob(self, job):
parts = job.name.split(":")
cmd = parts[0]
name = parts[1]
if len(parts) > 2:
node = parts[2]
else:
node = None
if cmd == 'build':
self.handleBuild(job, name, node)
elif cmd == 'stop':
self.handleStop(job, name)
elif cmd == 'set_description':
self.handleSetDescription(job, name)
def handleBuild(self, job, name, node):
build = FakeBuild(self, job, self.build_counter, node)
job.build = build
self.gearman_jobs[job.unique] = job
self.build_counter += 1
self.running_builds.append(build)
build.start()
def handleStop(self, job, name):
self.log.debug("handle stop")
parameters = json.loads(job.arguments)
name = parameters['name']
number = parameters['number']
for build in self.running_builds:
if build.name == name and build.number == number:
build.aborted = True
build.release()
job.sendWorkComplete()
return
job.sendWorkFail()
def handleSetDescription(self, job, name):
self.log.debug("handle set description")
parameters = json.loads(job.arguments)
name = parameters['name']
number = parameters['number']
descr = parameters['html_description']
for build in self.running_builds:
if build.name == name and build.number == number:
build.description = descr
job.sendWorkComplete()
return
for build in self.build_history:
if build.name == name and build.number == number:
build.description = descr
job.sendWorkComplete()
return
job.sendWorkFail()
def work(self):
while self.running:
try:
job = self.getJob()
except gear.InterruptedError:
continue
try:
self.handleJob(job)
except:
self.log.exception("Worker exception:")
def addFailTest(self, name, change):
l = self.fail_tests.get(name, [])
l.append(change)
self.fail_tests[name] = l
def shouldFailTest(self, name, ref):
l = self.fail_tests.get(name, [])
for change in l:
if self.test.ref_has_change(ref, change):
return True
return False
def release(self, regex=None):
builds = self.running_builds[:]
self.log.debug("releasing build %s (%s)" % (regex,
len(self.running_builds)))
for build in builds:
if not regex or re.match(regex, build.name):
self.log.debug("releasing build %s" %
(build.parameters['ZUUL_UUID']))
build.release()
else:
self.log.debug("not releasing build %s" %
(build.parameters['ZUUL_UUID']))
self.log.debug("done releasing builds %s (%s)" %
(regex, len(self.running_builds)))
class FakeGearmanServer(gear.Server):
def __init__(self):
self.hold_jobs_in_queue = False
super(FakeGearmanServer, self).__init__(0)
def getJobForConnection(self, connection, peek=False):
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
for job in queue:
if not hasattr(job, 'waiting'):
if job.name.startswith('build:'):
job.waiting = self.hold_jobs_in_queue
else:
job.waiting = False
if job.waiting:
continue
if job.name in connection.functions:
if not peek:
queue.remove(job)
connection.related_jobs[job.handle] = job
job.worker_connection = connection
job.running = True
return job
return None
def release(self, regex=None):
released = False
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
cmd, name = job.name.split(':')
if cmd != 'build':
continue
if not regex or re.match(regex, name):
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
released = True
else:
self.log.debug("not releasing queued job %s" %
job.unique)
if released:
self.wakeConnections()
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
class FakeSMTP(object):
log = logging.getLogger('zuul.FakeSMTP')
def __init__(self, messages, server, port):
self.server = server
self.port = port
self.messages = messages
def sendmail(self, from_email, to_email, msg):
self.log.info("Sending email from %s, to %s, with msg %s" % (
from_email, to_email, msg))
headers = msg.split('\n\n', 1)[0]
body = msg.split('\n\n', 1)[1]
self.messages.append(dict(
from_email=from_email,
to_email=to_email,
msg=msg,
headers=headers,
body=body,
))
return True
def quit(self):
return True
class FakeSwiftClientConnection(swiftclient.client.Connection):
def post_account(self, headers):
# Do nothing
pass
def get_auth(self):
# Returns endpoint and (unused) auth token
endpoint = os.path.join('https://storage.example.org', 'V1',
'AUTH_account')
return endpoint, ''
class BaseTestCase(testtools.TestCase):
log = logging.getLogger("zuul.test")
def setUp(self):
super(BaseTestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=False))
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
if (os.environ.get('OS_LOG_CAPTURE') == 'True' or
os.environ.get('OS_LOG_CAPTURE') == '1'):
self.useFixture(fixtures.FakeLogger(
level=logging.DEBUG,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
# NOTE(notmorgan): Extract logging overrides for specific libraries
# from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
# each. This is used to limit the output during test runs from
# libraries that zuul depends on such as gear.
log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
if log_defaults_from_env:
for default in log_defaults_from_env.split(','):
try:
name, level_str = default.split('=', 1)
level = getattr(logging, level_str, logging.DEBUG)
self.useFixture(fixtures.FakeLogger(
name=name,
level=level,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
except ValueError:
# NOTE(notmorgan): Invalid format of the log default,
# skip and don't try and apply a logger for the
# specified module
pass
class ZuulTestCase(BaseTestCase):
config_file = 'zuul.conf'
def setUp(self):
super(ZuulTestCase, self).setUp()
if USE_TEMPDIR:
tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
else:
tmp_root = os.environ.get("ZUUL_TEST_ROOT")
self.test_root = os.path.join(tmp_root, "zuul-test")
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
self.state_root = os.path.join(self.test_root, "lib")
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
os.makedirs(self.upstream_root)
os.makedirs(self.state_root)
# Make per test copy of Configuration.
self.setup_config()
self.config.set('zuul', 'tenant_config',
os.path.join(FIXTURE_DIR,
self.config.get('zuul', 'tenant_config')))
self.config.set('merger', 'git_dir', self.git_root)
self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
self.init_repo("org/project")
self.init_repo("org/project1")
self.init_repo("org/project2")
self.init_repo("org/project3")
self.init_repo("org/project4")
self.init_repo("org/project5")
self.init_repo("org/project6")
self.init_repo("org/one-job-project")
self.init_repo("org/nonvoting-project")
self.init_repo("org/templated-project")
self.init_repo("org/layered-project")
self.init_repo("org/node-project")
self.init_repo("org/conflict-project")
self.init_repo("org/noop-project")
self.init_repo("org/experimental-project")
self.init_repo("org/no-jobs-project")
self.setup_repos()
self.statsd = FakeStatsd()
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
os.environ['STATSD_HOST'] = '127.0.0.1'
os.environ['STATSD_PORT'] = str(self.statsd.port)
self.statsd.start()
# the statsd client object is configured in the statsd module import
reload_module(statsd)
reload_module(zuul.scheduler)
self.gearman_server = FakeGearmanServer()
self.config.set('gearman', 'port', str(self.gearman_server.port))
zuul.source.gerrit.GerritSource.replication_timeout = 1.5
zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
zuul.connection.gerrit.GerritEventConnector.delay = 0.0
self.sched = zuul.scheduler.Scheduler(self.config)
self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
FakeSwiftClientConnection))
self.swift = zuul.lib.swift.Swift(self.config)
self.event_queues = [
self.sched.result_event_queue,
self.sched.trigger_event_queue
]
self.configure_connections()
self.sched.registerConnections(self.connections)
self.ansible_server = RecordingLaunchServer(
self.config, self.connections)
self.ansible_server.start()
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib.request.Request):
return old_urlopen(*args, **kw)
return FakeURLOpener(self.upstream_root, *args, **kw)
old_urlopen = urllib.request.urlopen
urllib.request.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.client.LaunchClient(
self.config, self.sched, self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.start()
self.sched.reconfigure(self.config)
self.sched.resume()
self.webapp.start()
self.rpc.start()
self.launcher.gearman.waitForServer()
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
def configure_connections(self):
# Register connections from the config
self.smtp_messages = []
def FakeSMTPFactory(*args, **kw):
args = [self.smtp_messages] + list(args)
return FakeSMTP(*args, **kw)
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
self.gerrit_changes_dbs = {}
self.gerrit_queues_dbs = {}
self.connections = zuul.lib.connections.ConnectionRegistry()
for section_name in self.config.sections():
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
section_name, re.I)
if not con_match:
continue
con_name = con_match.group(2)
con_config = dict(self.config.items(section_name))
if 'driver' not in con_config:
raise Exception("No driver specified for connection %s."
% con_name)
con_driver = con_config['driver']
# TODO(jhesketh): load the required class automatically
if con_driver == 'gerrit':
if con_config['server'] not in self.gerrit_changes_dbs.keys():
self.gerrit_changes_dbs[con_config['server']] = {}
if con_config['server'] not in self.gerrit_queues_dbs.keys():
self.gerrit_queues_dbs[con_config['server']] = \
Queue.Queue()
self.event_queues.append(
self.gerrit_queues_dbs[con_config['server']])
self.connections.connections[con_name] = FakeGerritConnection(
con_name, con_config,
changes_db=self.gerrit_changes_dbs[con_config['server']],
queues_db=self.gerrit_queues_dbs[con_config['server']],
upstream_root=self.upstream_root
)
setattr(self, 'fake_' + con_name,
self.connections.connections[con_name])
elif con_driver == 'smtp':
self.connections.connections[con_name] = \
zuul.connection.smtp.SMTPConnection(con_name, con_config)
else:
raise Exception("Unknown driver, %s, for connection %s"
% (con_config['driver'], con_name))
# If the [gerrit] or [smtp] sections still exist, load them in as a
# connection named 'gerrit' or 'smtp' respectfully
if 'gerrit' in self.config.sections():
self.gerrit_changes_dbs['gerrit'] = {}
self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
self.connections.connections['gerrit'] = FakeGerritConnection(
'_legacy_gerrit', dict(self.config.items('gerrit')),
changes_db=self.gerrit_changes_dbs['gerrit'],
queues_db=self.gerrit_queues_dbs['gerrit'])
if 'smtp' in self.config.sections():
self.connections.connections['smtp'] = \
zuul.connection.smtp.SMTPConnection(
'_legacy_smtp', dict(self.config.items('smtp')))
def setup_config(self):
"""Per test config object. Override to set different config."""
self.config = ConfigParser.ConfigParser()
self.config.read(os.path.join(FIXTURE_DIR, self.config_file))
if hasattr(self, 'tenant_config_file'):
self.config.set('zuul', 'tenant_config', self.tenant_config_file)
git_path = os.path.join(
os.path.dirname(
os.path.join(FIXTURE_DIR, self.tenant_config_file)),
'git')
if os.path.exists(git_path):
for reponame in os.listdir(git_path):
self.copyDirToRepo(reponame,
os.path.join(git_path, reponame))
def copyDirToRepo(self, project, source_path):
repo_path = os.path.join(self.upstream_root, project)
if not os.path.exists(repo_path):
self.init_repo(project)
files = {}
for (dirpath, dirnames, filenames) in os.walk(source_path):
for filename in filenames:
test_tree_filepath = os.path.join(dirpath, filename)
common_path = os.path.commonprefix([test_tree_filepath,
source_path])
relative_filepath = test_tree_filepath[len(common_path) + 1:]
with open(test_tree_filepath, 'r') as f:
content = f.read()
files[relative_filepath] = content
self.addCommitToRepo(project, 'add content from fixture',
files, branch='master')
def setup_repos(self):
"""Subclasses can override to manipulate repos before tests"""
pass
def assertFinalState(self):
# Make sure that git.Repo objects have been garbage collected.
repos = []
gc.collect()
for obj in gc.get_objects():
if isinstance(obj, git.Repo):
repos.append(obj)
self.assertEqual(len(repos), 0)
self.assertEmptyQueues()
ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
def shutdown(self):
self.log.debug("Shutting down after tests")
self.launcher.stop()
self.merge_client.stop()
self.sched.stop()
self.sched.join()
self.statsd.stop()
self.statsd.join()
self.webapp.stop()
self.webapp.join()
self.rpc.stop()
self.rpc.join()
self.gearman_server.shutdown()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
def init_repo(self, project):
parts = project.split('/')
path = os.path.join(self.upstream_root, *parts[:-1])
if not os.path.exists(path):
os.makedirs(path)
path = os.path.join(self.upstream_root, project)
repo = git.Repo.init(path)
with repo.config_writer() as config_writer:
config_writer.set_value('user', 'email', 'user@example.com')
config_writer.set_value('user', 'name', 'User Name')
fn = os.path.join(path, 'README')
f = open(fn, 'w')
f.write("test\n")
f.close()
repo.index.add([fn])
repo.index.commit('initial commit')
master = repo.create_head('master')
repo.create_tag('init')
repo.head.reference = master
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
self.create_branch(project, 'mp')
def create_branch(self, project, branch):
path = os.path.join(self.upstream_root, project)
repo = git.Repo.init(path)
fn = os.path.join(path, 'README')
branch_head = repo.create_head(branch)
repo.head.reference = branch_head
f = open(fn, 'a')
f.write("test %s\n" % branch)
f.close()
repo.index.add([fn])
repo.index.commit('%s commit' % branch)
repo.head.reference = repo.heads['master']
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
def create_commit(self, project):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
repo.head.reference = repo.heads['master']
file_name = os.path.join(path, 'README')
with open(file_name, 'a') as f:
f.write('creating fake commit\n')
repo.index.add([file_name])
commit = repo.index.commit('Creating a fake commit')
return commit.hexsha
def ref_has_change(self, ref, change):
path = os.path.join(self.git_root, change.project)
repo = git.Repo(path)
try:
for commit in repo.iter_commits(ref):
if commit.message.strip() == ('%s-1' % change.subject):
return True
except GitCommandError:
pass
return False
def job_has_changes(self, *args):
job = args[0]
commits = args[1:]
if isinstance(job, FakeBuild):
parameters = job.parameters
else:
parameters = json.loads(job.arguments)
project = parameters['ZUUL_PROJECT']
path = os.path.join(self.git_root, project)
repo = git.Repo(path)
ref = parameters['ZUUL_REF']
sha = parameters['ZUUL_COMMIT']
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
commit_messages = ['%s-1' % commit.subject for commit in commits]
self.log.debug("Checking if job %s has changes; commit_messages %s;"
" repo_messages %s; sha %s" % (job, commit_messages,
repo_messages, sha))
for msg in commit_messages:
if msg not in repo_messages:
self.log.debug(" messages do not match")
return False
if repo_shas[0] != sha:
self.log.debug(" sha does not match")
return False
self.log.debug(" OK")
return True
def orderedRelease(self):
# Run one build at a time to ensure non-race order:
while len(self.builds):
self.release(self.builds[0])
self.waitUntilSettled()
def release(self, job):
if isinstance(job, FakeBuild):
job.release()
else:
job.waiting = False
self.log.debug("Queued job %s released" % job.unique)
self.gearman_server.wakeConnections()
def getParameter(self, job, name):
if isinstance(job, FakeBuild):
return job.parameters[name]
else:
parameters = json.loads(job.arguments)
return parameters[name]
def resetGearmanServer(self):
self.worker.setFunctions([])
while True:
done = True
for connection in self.gearman_server.active_connections:
if (connection.functions and
connection.client_id not in ['Zuul RPC Listener',
'Zuul Merger']):
done = False
if done:
break
time.sleep(0)
self.gearman_server.functions = set()
self.rpc.register()
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
if self.launcher.meta_jobs:
return False
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
for job in self.ansible_server.job_history:
zbuild = self.launcher.builds.get(job.unique)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
for connection in self.ansible_server.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
def areAllBuildsWaiting(self):
builds = self.launcher.builds.values()
for build in builds:
client_job = None
for conn in self.launcher.gearman.active_connections:
for j in conn.related_jobs.values():
if j.unique == build.uuid:
client_job = j
break
if not client_job:
self.log.debug("%s is not known to the gearman client" %
build)
return False
if not client_job.handle:
self.log.debug("%s has no handle" % client_job)
return False
server_job = self.gearman_server.jobs.get(client_job.handle)
if not server_job:
self.log.debug("%s is not known to the gearman server" %
client_job)
return False
if not hasattr(server_job, 'waiting'):
self.log.debug("%s is being enqueued" % server_job)
return False
if server_job.waiting:
continue
if build.number is None:
self.log.debug("%s has not reported start" % build)
return False
if False: # worker_job.build.isWaiting():
# TODOv3: when we grow the ability to have fake
# ansible jobs wait, check for that here.
continue
else:
self.log.debug("%s is running" % build)
return False
return True
def eventQueuesEmpty(self):
for queue in self.event_queues:
yield queue.empty()
def eventQueuesJoin(self):
for queue in self.event_queues:
queue.join()
def waitUntilSettled(self):
self.log.debug("Waiting until settled...")
start = time.time()
while True:
if time.time() - start > 10:
self.log.debug("Queue status:")
for queue in self.event_queues:
self.log.debug(" %s: %s" % (queue, queue.empty()))
self.log.debug("All builds waiting: %s" %
(self.areAllBuildsWaiting(),))
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
# processed
self.eventQueuesJoin()
self.sched.run_handler_lock.acquire()
if (not self.merge_client.jobs and
all(self.eventQueuesEmpty()) and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
jobs = filter(lambda x: x.result == result, jobs)
return len(jobs)
def getJobFromHistory(self, name, project=None):
history = self.ansible_server.job_history
for job in history:
params = json.loads(job.arguments)
if (params['job'] == name and
(project is None or params['ZUUL_PROJECT'] == project)):
result = json.loads(job.data[-1])
ret = BuildHistory(job=job,
name=params['job'],
result=result['result'])
return ret
raise Exception("Unable to find job %s in history" % name)
def assertEmptyQueues(self):
# Make sure there are no orphaned jobs
for tenant in self.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for queue in pipeline.queues:
if len(queue.queue) != 0:
print('pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue))
self.assertEqual(len(queue.queue), 0,
"Pipelines queues should be empty")
def assertReportedStat(self, key, value=None, kind=None):
start = time.time()
while time.time() < (start + 5):
for stat in self.statsd.stats:
pprint.pprint(self.statsd.stats)
k, v = stat.split(':')
if key == k:
if value is None and kind is None:
return
elif value:
if value == v:
return
elif kind:
if v.endswith('|' + kind):
return
time.sleep(0.1)
pprint.pprint(self.statsd.stats)
raise Exception("Key %s not found in reported stats" % key)
def getPipeline(self, name):
return self.sched.abide.tenants.values()[0].layout.pipelines.get(name)
def updateConfigLayout(self, path):
root = os.path.join(self.test_root, "config")
os.makedirs(root)
f = tempfile.NamedTemporaryFile(dir=root, delete=False)
f.write("""
tenants:
- name: openstack
include:
- %s
""" % os.path.abspath(path))
f.close()
self.config.set('zuul', 'tenant_config', f.name)
def addCommitToRepo(self, project, message, files, branch='master'):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
repo.head.reference = branch
zuul.merger.merger.reset_repo_to_head(repo)
for fn, content in files.items():
fn = os.path.join(path, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
commit = repo.index.commit(message)
repo.heads[branch].commit = commit
repo.head.reference = branch
repo.git.clean('-x', '-f', '-d')
repo.heads[branch].checkout()