Use RecordingLaunchServer to run all tests

Instead of having an entirely fake launch server and an entirely
real one, use the real launch server for everything, but add an
option to not actually execute ansible.  This will exercise most
of the code in the launcher, remove unecessary fakes, and still
maintain the speed benefit of not running ansible for every test
of scheduler behavior.

Some tests are still run with the launcher actually running ansible,
and that facility will continue to be available as we create tests
that validate actual ansible behavior.

Change-Id: Ie0fbba2b786a5aeb1c603597af30fcd728a8cec8
diff --git a/tests/base.py b/tests/base.py
index 8017c51..e1e568f 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -529,13 +529,12 @@
         os.write(self.wake_write, '1\n')
 
 
-class FakeBuild(threading.Thread):
+class FakeBuild(object):
     log = logging.getLogger("zuul.test")
 
-    def __init__(self, worker, job, number, node):
-        threading.Thread.__init__(self)
+    def __init__(self, launch_server, job, number, node):
         self.daemon = True
-        self.worker = worker
+        self.launch_server = launch_server
         self.job = job
         self.number = number
         self.node = node
@@ -547,6 +546,9 @@
         self.aborted = False
         self.created = time.time()
         self.run_error = False
+        self.changes = None
+        if 'ZUUL_CHANGE_IDS' in self.parameters:
+            self.changes = self.parameters['ZUUL_CHANGE_IDS']
 
     def release(self):
         self.wait_condition.acquire()
@@ -576,7 +578,7 @@
             'url': 'https://server/job/%s/%s/' % (self.name, self.number),
             'name': self.name,
             'number': self.number,
-            'manager': self.worker.worker_id,
+            'manager': self.launch_server.worker.worker_id,
             'worker_name': 'My Worker',
             'worker_hostname': 'localhost',
             'worker_ips': ['127.0.0.1', '192.168.1.1'],
@@ -592,81 +594,80 @@
         self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
         self.job.sendWorkStatus(0, 100)
 
-        if self.worker.hold_jobs_in_build:
+        if self.launch_server.hold_jobs_in_build:
             self.log.debug('Holding build %s' % self.unique)
             self._wait()
         self.log.debug("Build %s continuing" % self.unique)
 
-        self.worker.lock.acquire()
-
         result = 'SUCCESS'
         if (('ZUUL_REF' in self.parameters) and
-            self.worker.shouldFailTest(self.name,
-                                       self.parameters['ZUUL_REF'])):
+            self.launch_server.shouldFailTest(self.name,
+                                              self.parameters['ZUUL_REF'])):
             result = 'FAILURE'
         if self.aborted:
             result = 'ABORTED'
 
         if self.run_error:
-            work_fail = True
             result = 'RUN_ERROR'
-        else:
-            data['result'] = result
-            data['node_labels'] = ['bare-necessities']
-            data['node_name'] = 'foo'
-            work_fail = False
 
-        changes = None
-        if 'ZUUL_CHANGE_IDS' in self.parameters:
-            changes = self.parameters['ZUUL_CHANGE_IDS']
-
-        self.worker.build_history.append(
-            BuildHistory(name=self.name, number=self.number,
-                         result=result, changes=changes, node=self.node,
-                         uuid=self.unique, parameters=self.parameters,
-                         pipeline=self.parameters['ZUUL_PIPELINE'])
-        )
-
-        self.job.sendWorkData(json.dumps(data))
-        if work_fail:
-            self.job.sendWorkFail()
-        else:
-            self.job.sendWorkComplete(json.dumps(data))
-        del self.worker.gearman_jobs[self.job.unique]
-        self.worker.running_builds.remove(self)
-        self.worker.lock.release()
+        return result
 
 
 class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
     def __init__(self, *args, **kw):
+        self._run_ansible = kw.pop('_run_ansible', False)
         super(RecordingLaunchServer, self).__init__(*args, **kw)
-        self.job_history = []
+        self.hold_jobs_in_build = False
+        self.lock = threading.Lock()
+        self.running_builds = []
         self.build_history = []
+        self._build_counter_lock = threading.Lock()
+        self.build_counter = 0
+        self.fail_tests = {}
 
-    def launch(self, job):
-        self.job_history.append(job)
-        job.data = []
+    def addFailTest(self, name, change):
+        l = self.fail_tests.get(name, [])
+        l.append(change)
+        self.fail_tests[name] = l
 
-        def sendWorkComplete(data=b''):
-            job.data.append(data)
-            params = json.loads(job.arguments)
-            result = json.loads(job.data[-1])
-            build = BuildHistory(job=job,
-                                 uuid=job.unique,
-                                 name=params['job'],
-                                 parameters=params,
-                                 result=result['result'])
-            self.build_history.append(build)
-            gear.WorkerJob.sendWorkComplete(job, data)
+    def shouldFailTest(self, name, ref):
+        l = self.fail_tests.get(name, [])
+        for change in l:
+            if self.test.ref_has_change(ref, change):
+                return True
+        return False
 
-        job.sendWorkComplete = sendWorkComplete
-        super(RecordingLaunchServer, self).launch(job)
+    def runAnsible(self, jobdir, job):
+        with self._build_counter_lock:
+            self.build_counter += 1
+            build_counter = self.build_counter
+        node = None
+        build = FakeBuild(self, job, build_counter, node)
+        job.build = build
+
+        self.running_builds.append(build)
+
+        if self._run_ansible:
+            result = super(RecordingLaunchServer, self).runAnsible(jobdir, job)
+        else:
+            result = build.run()
+
+        self.lock.acquire()
+        self.build_history.append(
+            BuildHistory(name=build.name, number=build.number,
+                         result=result, changes=build.changes, node=build.node,
+                         uuid=build.unique, parameters=build.parameters,
+                         pipeline=build.parameters['ZUUL_PIPELINE'])
+        )
+        if build:
+            self.running_builds.remove(build)
+        self.lock.release()
+        return result
 
 
 class FakeWorker(gear.Worker):
     def __init__(self, worker_id, test):
         super(FakeWorker, self).__init__(worker_id)
-        self.gearman_jobs = {}
         self.build_history = []
         self.running_builds = []
         self.build_counter = 0
@@ -693,7 +694,6 @@
         node = None
         build = FakeBuild(self, job, self.build_counter, node)
         job.build = build
-        self.gearman_jobs[job.unique] = job
         self.build_counter += 1
 
         self.running_builds.append(build)
@@ -894,22 +894,7 @@
 
 class ZuulTestCase(BaseTestCase):
     config_file = 'zuul.conf'
-
-    def _startWorker(self):
-        self.worker = FakeWorker('fake_worker', self)
-        self.worker.addServer('127.0.0.1', self.gearman_server.port)
-        self.gearman_server.worker = self.worker
-        self.builds = self.worker.running_builds
-        self.history = self.worker.build_history
-
-    def _stopWorker(self):
-        self.worker.shutdown()
-
-    def _lockWorker(self):
-        self.worker.lock.acquire()
-
-    def _unlockWorker(self):
-        self.worker.lock.release()
+    run_ansible = False
 
     def _startMerger(self):
         self.merge_server = zuul.merger.server.MergeServer(self.config,
@@ -1003,15 +988,20 @@
         urllib.request.urlopen = URLOpenerFactory
 
         self._startMerger()
-        self._startWorker()
 
-        self.launcher = zuul.launcher.client.LaunchClient(
+        self.launch_server = RecordingLaunchServer(
+            self.config, self.connections, _run_ansible=self.run_ansible)
+        self.launch_server.start()
+        self.history = self.launch_server.build_history
+        self.builds = self.launch_server.running_builds
+
+        self.launch_client = zuul.launcher.client.LaunchClient(
             self.config, self.sched, self.swift)
         self.merge_client = zuul.merger.client.MergeClient(
             self.config, self.sched)
         self.nodepool = zuul.nodepool.Nodepool(self.sched)
 
-        self.sched.setLauncher(self.launcher)
+        self.sched.setLauncher(self.launch_client)
         self.sched.setMerger(self.merge_client)
         self.sched.setNodepool(self.nodepool)
 
@@ -1024,7 +1014,7 @@
         self.sched.resume()
         self.webapp.start()
         self.rpc.start()
-        self.launcher.gearman.waitForServer()
+        self.launch_client.gearman.waitForServer()
 
         self.addCleanup(self.assertFinalState)
         self.addCleanup(self.shutdown)
@@ -1153,11 +1143,11 @@
 
     def shutdown(self):
         self.log.debug("Shutting down after tests")
-        self.launcher.stop()
+        self.launch_client.stop()
         self.merge_server.stop()
         self.merge_server.join()
         self.merge_client.stop()
-        self._stopWorker()
+        self.launch_server.stop()
         self.sched.stop()
         self.sched.join()
         self.statsd.stop()
@@ -1279,7 +1269,7 @@
             return parameters[name]
 
     def resetGearmanServer(self):
-        self.worker.setFunctions([])
+        self.launch_server.worker.setFunctions([])
         while True:
             done = True
             for connection in self.gearman_server.active_connections:
@@ -1295,29 +1285,29 @@
 
     def haveAllBuildsReported(self):
         # See if Zuul is waiting on a meta job to complete
-        if self.launcher.meta_jobs:
+        if self.launch_client.meta_jobs:
             return False
         # Find out if every build that the worker has completed has been
         # reported back to Zuul.  If it hasn't then that means a Gearman
         # event is still in transit and the system is not stable.
         for build in self.history:
-            zbuild = self.launcher.builds.get(build.uuid)
+            zbuild = self.launch_client.builds.get(build.uuid)
             if not zbuild:
                 # It has already been reported
                 continue
             # It hasn't been reported yet.
             return False
         # Make sure that none of the worker connections are in GRAB_WAIT
-        for connection in self.worker.active_connections:
+        for connection in self.launch_server.worker.active_connections:
             if connection.state == 'GRAB_WAIT':
                 return False
         return True
 
     def areAllBuildsWaiting(self):
-        builds = self.launcher.builds.values()
+        builds = self.launch_client.builds.values()
         for build in builds:
             client_job = None
-            for conn in self.launcher.gearman.active_connections:
+            for conn in self.launch_client.gearman.active_connections:
                 for j in conn.related_jobs.values():
                     if j.unique == build.uuid:
                         client_job = j
@@ -1372,7 +1362,7 @@
                 raise Exception("Timeout waiting for Zuul to settle")
             # Make sure no new events show up while we're checking
 
-            self._lockWorker()
+            self.launch_server.lock.acquire()
             # have all build states propogated to zuul?
             if self.haveAllBuildsReported():
                 # Join ensures that the queue is empty _and_ events have been
@@ -1384,11 +1374,11 @@
                     self.haveAllBuildsReported() and
                     self.areAllBuildsWaiting()):
                     self.sched.run_handler_lock.release()
-                    self._unlockWorker()
+                    self.launch_server.lock.release()
                     self.log.debug("...settled.")
                     return
                 self.sched.run_handler_lock.release()
-            self._unlockWorker()
+            self.launch_server.lock.release()
             self.sched.wake_event.wait(0.1)
 
     def countJobResults(self, jobs, result):
@@ -1472,19 +1462,4 @@
 
 class AnsibleZuulTestCase(ZuulTestCase):
     """ZuulTestCase but with an actual ansible launcher running"""
-
-    def _startWorker(self):
-        self.ansible_server = RecordingLaunchServer(
-            self.config, self.connections)
-        self.ansible_server.start()
-        self.history = self.ansible_server.build_history
-        self.worker = self.ansible_server.worker
-
-    def _stopWorker(self):
-        self.ansible_server.stop()
-
-    def _lockWorker(self):
-        pass
-
-    def _unlockWorker(self):
-        pass
+    run_ansible = True
diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py
index 7cac5f7..45c42dd 100644
--- a/zuul/launcher/server.py
+++ b/zuul/launcher/server.py
@@ -243,7 +243,7 @@
 
             # TODOv3: Ansible the ansible thing here.
             self.prepareAnsibleFiles(jobdir, args)
-            result = self.runAnsible(jobdir)
+            result = self.runAnsible(jobdir, job)
 
             data = {
                 'url': 'https://server/job',
@@ -277,7 +277,8 @@
             config.write('[defaults]\n')
             config.write('hostfile = %s\n' % jobdir.inventory)
 
-    def runAnsible(self, jobdir):
+    def runAnsible(self, jobdir, job):
+        # Job is included here for the benefit of the test framework.
         proc = subprocess.Popen(
             ['ansible-playbook', jobdir.playbook],
             cwd=jobdir.ansible_root,