Create ansible job launch server
Create a new server that acts as combined merger and job launcher.
Remove the merge step from the scheduler.
Update the gearman job launcher to target the new server.
Change-Id: I14e3d96cadec6e4b4cca66137071e8ed67f161a1
diff --git a/tests/base.py b/tests/base.py
index e1d23eb..90d2df5 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -48,6 +48,7 @@
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
+import zuul.launcher.ansible
import zuul.launcher.gearman
import zuul.lib.swift
import zuul.lib.connections
@@ -635,6 +636,23 @@
self.worker.lock.release()
+class RecordingLaunchServer(zuul.launcher.ansible.LaunchServer):
+ def __init__(self, *args, **kw):
+ super(RecordingLaunchServer, self).__init__(*args, **kw)
+ self.job_history = []
+
+ def launch(self, job):
+ self.job_history.append(job)
+ job.data = []
+
+ def sendWorkComplete(data=b''):
+ job.data.append(data)
+ gear.WorkerJob.sendWorkComplete(job, data)
+
+ job.sendWorkComplete = sendWorkComplete
+ super(RecordingLaunchServer, self).launch(job)
+
+
class FakeWorker(gear.Worker):
def __init__(self, worker_id, test):
super(FakeWorker, self).__init__(worker_id)
@@ -925,10 +943,6 @@
self.config.set('gearman', 'port', str(self.gearman_server.port))
- self.worker = FakeWorker('fake_worker', self)
- self.worker.addServer('127.0.0.1', self.gearman_server.port)
- self.gearman_server.worker = self.worker
-
zuul.source.gerrit.GerritSource.replication_timeout = 1.5
zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
zuul.connection.gerrit.GerritEventConnector.delay = 0.0
@@ -947,6 +961,10 @@
self.configure_connections(self.sched)
self.sched.registerConnections(self.connections)
+ self.ansible_server = RecordingLaunchServer(
+ self.config, self.connections)
+ self.ansible_server.start()
+
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib2.Request):
return old_urlopen(*args, **kw)
@@ -976,9 +994,6 @@
self.webapp.start()
self.rpc.start()
self.launcher.gearman.waitForServer()
- self.registerJobs()
- self.builds = self.worker.running_builds
- self.history = self.worker.build_history
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
@@ -1086,7 +1101,6 @@
self.merge_server.stop()
self.merge_server.join()
self.merge_client.stop()
- self.worker.shutdown()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@@ -1183,18 +1197,6 @@
self.log.debug(" OK")
return True
- def registerJobs(self):
- count = 0
- for tenant in self.sched.abide.tenants.values():
- for job in tenant.layout.jobs.keys():
- self.worker.registerFunction('build:' + job)
- count += 1
- self.worker.registerFunction('stop:' + self.worker.worker_id)
- count += 1
-
- while len(self.gearman_server.functions) < count:
- time.sleep(0)
-
def orderedRelease(self):
# Run one build at a time to ensure non-race order:
while len(self.builds):
@@ -1239,15 +1241,15 @@
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
- for build in self.worker.build_history:
- zbuild = self.launcher.builds.get(build.uuid)
+ for job in self.ansible_server.job_history:
+ zbuild = self.launcher.builds.get(job.unique)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
- for connection in self.worker.active_connections:
+ for connection in self.ansible_server.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
@@ -1311,7 +1313,6 @@
print self.areAllBuildsWaiting()
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
- self.worker.lock.acquire()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
@@ -1323,11 +1324,9 @@
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
- self.worker.lock.release()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
- self.worker.lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
@@ -1335,10 +1334,16 @@
return len(jobs)
def getJobFromHistory(self, name):
- history = self.worker.build_history
+ history = self.ansible_server.job_history
for job in history:
- if job.name == name:
- return job
+ params = json.loads(job.arguments)
+ if params['job'] == name:
+ result = json.loads(job.data[-1])
+ print result
+ ret = BuildHistory(job=job,
+ name=params['job'],
+ result=result['result'])
+ return ret
raise Exception("Unable to find job %s in history" % name)
def assertEmptyQueues(self):