Create ansible job launch server

Create a new server that acts as combined merger and job launcher.
Remove the merge step from the scheduler.
Update the gearman job launcher to target the new server.

Change-Id: I14e3d96cadec6e4b4cca66137071e8ed67f161a1
diff --git a/tests/base.py b/tests/base.py
index e1d23eb..90d2df5 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -48,6 +48,7 @@
 import zuul.scheduler
 import zuul.webapp
 import zuul.rpclistener
+import zuul.launcher.ansible
 import zuul.launcher.gearman
 import zuul.lib.swift
 import zuul.lib.connections
@@ -635,6 +636,23 @@
         self.worker.lock.release()
 
 
+class RecordingLaunchServer(zuul.launcher.ansible.LaunchServer):
+    def __init__(self, *args, **kw):
+        super(RecordingLaunchServer, self).__init__(*args, **kw)
+        self.job_history = []
+
+    def launch(self, job):
+        self.job_history.append(job)
+        job.data = []
+
+        def sendWorkComplete(data=b''):
+            job.data.append(data)
+            gear.WorkerJob.sendWorkComplete(job, data)
+
+        job.sendWorkComplete = sendWorkComplete
+        super(RecordingLaunchServer, self).launch(job)
+
+
 class FakeWorker(gear.Worker):
     def __init__(self, worker_id, test):
         super(FakeWorker, self).__init__(worker_id)
@@ -925,10 +943,6 @@
 
         self.config.set('gearman', 'port', str(self.gearman_server.port))
 
-        self.worker = FakeWorker('fake_worker', self)
-        self.worker.addServer('127.0.0.1', self.gearman_server.port)
-        self.gearman_server.worker = self.worker
-
         zuul.source.gerrit.GerritSource.replication_timeout = 1.5
         zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
         zuul.connection.gerrit.GerritEventConnector.delay = 0.0
@@ -947,6 +961,10 @@
         self.configure_connections(self.sched)
         self.sched.registerConnections(self.connections)
 
+        self.ansible_server = RecordingLaunchServer(
+            self.config, self.connections)
+        self.ansible_server.start()
+
         def URLOpenerFactory(*args, **kw):
             if isinstance(args[0], urllib2.Request):
                 return old_urlopen(*args, **kw)
@@ -976,9 +994,6 @@
         self.webapp.start()
         self.rpc.start()
         self.launcher.gearman.waitForServer()
-        self.registerJobs()
-        self.builds = self.worker.running_builds
-        self.history = self.worker.build_history
 
         self.addCleanup(self.assertFinalState)
         self.addCleanup(self.shutdown)
@@ -1086,7 +1101,6 @@
         self.merge_server.stop()
         self.merge_server.join()
         self.merge_client.stop()
-        self.worker.shutdown()
         self.sched.stop()
         self.sched.join()
         self.statsd.stop()
@@ -1183,18 +1197,6 @@
         self.log.debug("  OK")
         return True
 
-    def registerJobs(self):
-        count = 0
-        for tenant in self.sched.abide.tenants.values():
-            for job in tenant.layout.jobs.keys():
-                self.worker.registerFunction('build:' + job)
-                count += 1
-        self.worker.registerFunction('stop:' + self.worker.worker_id)
-        count += 1
-
-        while len(self.gearman_server.functions) < count:
-            time.sleep(0)
-
     def orderedRelease(self):
         # Run one build at a time to ensure non-race order:
         while len(self.builds):
@@ -1239,15 +1241,15 @@
         # Find out if every build that the worker has completed has been
         # reported back to Zuul.  If it hasn't then that means a Gearman
         # event is still in transit and the system is not stable.
-        for build in self.worker.build_history:
-            zbuild = self.launcher.builds.get(build.uuid)
+        for job in self.ansible_server.job_history:
+            zbuild = self.launcher.builds.get(job.unique)
             if not zbuild:
                 # It has already been reported
                 continue
             # It hasn't been reported yet.
             return False
         # Make sure that none of the worker connections are in GRAB_WAIT
-        for connection in self.worker.active_connections:
+        for connection in self.ansible_server.worker.active_connections:
             if connection.state == 'GRAB_WAIT':
                 return False
         return True
@@ -1311,7 +1313,6 @@
                 print self.areAllBuildsWaiting()
                 raise Exception("Timeout waiting for Zuul to settle")
             # Make sure no new events show up while we're checking
-            self.worker.lock.acquire()
             # have all build states propogated to zuul?
             if self.haveAllBuildsReported():
                 # Join ensures that the queue is empty _and_ events have been
@@ -1323,11 +1324,9 @@
                     self.haveAllBuildsReported() and
                     self.areAllBuildsWaiting()):
                     self.sched.run_handler_lock.release()
-                    self.worker.lock.release()
                     self.log.debug("...settled.")
                     return
                 self.sched.run_handler_lock.release()
-            self.worker.lock.release()
             self.sched.wake_event.wait(0.1)
 
     def countJobResults(self, jobs, result):
@@ -1335,10 +1334,16 @@
         return len(jobs)
 
     def getJobFromHistory(self, name):
-        history = self.worker.build_history
+        history = self.ansible_server.job_history
         for job in history:
-            if job.name == name:
-                return job
+            params = json.loads(job.arguments)
+            if params['job'] == name:
+                result = json.loads(job.data[-1])
+                print result
+                ret = BuildHistory(job=job,
+                                   name=params['job'],
+                                   result=result['result'])
+                return ret
         raise Exception("Unable to find job %s in history" % name)
 
     def assertEmptyQueues(self):