Create AnsibleZuulTestCase
This test case base class maintains the current test case base
class behavior in v3, which is to run the actual ansible launcher.
However, that's not needed for all of the scheduler tests, some
of which want to have complete control of jobs in a way which may
be difficult in ansible. Create this new test case base class so
that tests where we know we want to exercise ansible are easy, but
if we don't need it, we get the Zuul v2 behavior of a fake worker.
Change-Id: I836fec935979d90eb0eb3ca765c87bc8300920aa
diff --git a/tests/base.py b/tests/base.py
index a75d36b..7fc828b 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -537,11 +537,11 @@
self.daemon = True
self.worker = worker
self.job = job
- self.name = job.name.split(':')[1]
self.number = number
self.node = node
self.parameters = json.loads(job.arguments)
self.unique = self.parameters['ZUUL_UUID']
+ self.name = self.parameters['job']
self.wait_condition = threading.Condition()
self.waiting = False
self.aborted = False
@@ -643,6 +643,7 @@
def __init__(self, *args, **kw):
super(RecordingLaunchServer, self).__init__(*args, **kw)
self.job_history = []
+ self.build_history = []
def launch(self, job):
self.job_history.append(job)
@@ -650,6 +651,14 @@
def sendWorkComplete(data=b''):
job.data.append(data)
+ params = json.loads(job.arguments)
+ result = json.loads(job.data[-1])
+ build = BuildHistory(job=job,
+ uuid=job.unique,
+ name=params['job'],
+ parameters=params,
+ result=result['result'])
+ self.build_history.append(build)
gear.WorkerJob.sendWorkComplete(job, data)
job.sendWorkComplete = sendWorkComplete
@@ -666,6 +675,7 @@
self.fail_tests = {}
self.test = test
+ self.registerFunction('launcher:launch')
self.hold_jobs_in_build = False
self.lock = threading.Lock()
self.__work_thread = threading.Thread(target=self.work)
@@ -674,20 +684,17 @@
def handleJob(self, job):
parts = job.name.split(":")
- cmd = parts[0]
- name = parts[1]
- if len(parts) > 2:
- node = parts[2]
- else:
- node = None
- if cmd == 'build':
- self.handleBuild(job, name, node)
+ cmd = parts[1]
+ if cmd == 'launch':
+ self.handleLaunch(job)
elif cmd == 'stop':
- self.handleStop(job, name)
+ self.handleStop(job)
elif cmd == 'set_description':
- self.handleSetDescription(job, name)
+ self.handleSetDescription(job)
- def handleBuild(self, job, name, node):
+ def handleLaunch(self, job):
+ # TODOv3(jeblair): handle nodes
+ node = None
build = FakeBuild(self, job, self.build_counter, node)
job.build = build
self.gearman_jobs[job.unique] = job
@@ -696,7 +703,7 @@
self.running_builds.append(build)
build.start()
- def handleStop(self, job, name):
+ def handleStop(self, job):
self.log.debug("handle stop")
parameters = json.loads(job.arguments)
name = parameters['name']
@@ -709,7 +716,7 @@
return
job.sendWorkFail()
- def handleSetDescription(self, job, name):
+ def handleSetDescription(self, job):
self.log.debug("handle set description")
parameters = json.loads(job.arguments)
name = parameters['name']
@@ -910,6 +917,27 @@
class ZuulTestCase(BaseTestCase):
config_file = 'zuul.conf'
+ def _startWorker(self):
+ self.worker = FakeWorker('fake_worker', self)
+ self.worker.addServer('127.0.0.1', self.gearman_server.port)
+ self.gearman_server.worker = self.worker
+ self.builds = self.worker.running_builds
+ self.history = self.worker.build_history
+
+ def _stopWorker(self):
+ self.worker.shutdown()
+
+ def _lockWorker(self):
+ self.worker.lock.acquire()
+
+ def _unlockWorker(self):
+ self.worker.lock.release()
+
+ def _startMerger(self):
+ self.merge_server = zuul.merger.server.MergeServer(self.config,
+ self.connections)
+ self.merge_server.start()
+
def setUp(self):
super(ZuulTestCase, self).setUp()
if USE_TEMPDIR:
@@ -988,10 +1016,6 @@
self.configure_connections()
self.sched.registerConnections(self.connections)
- self.ansible_server = RecordingLaunchServer(
- self.config, self.connections)
- self.ansible_server.start()
-
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib.request.Request):
return old_urlopen(*args, **kw)
@@ -1000,6 +1024,9 @@
old_urlopen = urllib.request.urlopen
urllib.request.urlopen = URLOpenerFactory
+ self._startMerger()
+ self._startWorker()
+
self.launcher = zuul.launcher.client.LaunchClient(
self.config, self.sched, self.swift)
self.merge_client = zuul.merger.client.MergeClient(
@@ -1149,7 +1176,10 @@
def shutdown(self):
self.log.debug("Shutting down after tests")
self.launcher.stop()
+ self.merge_server.stop()
+ self.merge_server.join()
self.merge_client.stop()
+ self._stopWorker()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@@ -1292,15 +1322,15 @@
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
- for job in self.ansible_server.job_history:
- zbuild = self.launcher.builds.get(job.unique)
+ for build in self.history:
+ zbuild = self.launcher.builds.get(build.uuid)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
- for connection in self.ansible_server.worker.active_connections:
+ for connection in self.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
@@ -1363,6 +1393,8 @@
(self.areAllBuildsWaiting(),))
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
+
+ self._lockWorker()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
@@ -1374,9 +1406,11 @@
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
+ self._unlockWorker()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
+ self._unlockWorker()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
@@ -1384,16 +1418,11 @@
return len(jobs)
def getJobFromHistory(self, name, project=None):
- history = self.ansible_server.job_history
- for job in history:
- params = json.loads(job.arguments)
- if (params['job'] == name and
- (project is None or params['ZUUL_PROJECT'] == project)):
- result = json.loads(job.data[-1])
- ret = BuildHistory(job=job,
- name=params['job'],
- result=result['result'])
- return ret
+ for job in self.history:
+ if (job.name == name and
+ (project is None or
+ job.parameters['ZUUL_PROJECT'] == project)):
+ return job
raise Exception("Unable to find job %s in history" % name)
def assertEmptyQueues(self):
@@ -1461,3 +1490,23 @@
repo.heads[branch].checkout()
if tag:
repo.create_tag(tag)
+
+
+class AnsibleZuulTestCase(ZuulTestCase):
+ """ZuulTestCase but with an actual ansible launcher running"""
+
+ def _startWorker(self):
+ self.ansible_server = RecordingLaunchServer(
+ self.config, self.connections)
+ self.ansible_server.start()
+ self.history = self.ansible_server.build_history
+ self.worker = self.ansible_server.worker
+
+ def _stopWorker(self):
+ self.ansible_server.stop()
+
+ def _lockWorker(self):
+ pass
+
+ def _unlockWorker(self):
+ pass