Re-order executor/job classes

There should be no code changes in this commit, only a re-ordering
of the classes involved to reduce the diff in a subsequent change.

Change-Id: I0fc4287b6fadfaea02250bc5c1f57eba7e65f450
diff --git a/tests/base.py b/tests/base.py
index b029f0b..028a194 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1361,6 +1361,63 @@
         return repos
 
 
+class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
+    def doMergeChanges(self, merger, items, repo_state):
+        # Get a merger in order to update the repos involved in this job.
+        commit = super(RecordingAnsibleJob, self).doMergeChanges(
+            merger, items, repo_state)
+        if not commit:  # merge conflict
+            self.recordResult('MERGER_FAILURE')
+        return commit
+
+    def recordResult(self, result):
+        build = self.executor_server.job_builds[self.job.unique]
+        self.executor_server.lock.acquire()
+        self.executor_server.build_history.append(
+            BuildHistory(name=build.name, result=result, changes=build.changes,
+                         node=build.node, uuid=build.unique,
+                         ref=build.parameters['zuul']['ref'],
+                         parameters=build.parameters, jobdir=build.jobdir,
+                         pipeline=build.parameters['zuul']['pipeline'])
+        )
+        self.executor_server.running_builds.remove(build)
+        del self.executor_server.job_builds[self.job.unique]
+        self.executor_server.lock.release()
+
+    def runPlaybooks(self, args):
+        build = self.executor_server.job_builds[self.job.unique]
+        build.jobdir = self.jobdir
+
+        result = super(RecordingAnsibleJob, self).runPlaybooks(args)
+        self.recordResult(result)
+        return result
+
+    def runAnsible(self, cmd, timeout, playbook):
+        build = self.executor_server.job_builds[self.job.unique]
+
+        if self.executor_server._run_ansible:
+            result = super(RecordingAnsibleJob, self).runAnsible(
+                cmd, timeout, playbook)
+        else:
+            if playbook.path:
+                result = build.run()
+            else:
+                result = (self.RESULT_NORMAL, 0)
+        return result
+
+    def getHostList(self, args):
+        self.log.debug("hostlist")
+        hosts = super(RecordingAnsibleJob, self).getHostList(args)
+        for host in hosts:
+            host['host_vars']['ansible_connection'] = 'local'
+
+        hosts.append(dict(
+            name='localhost',
+            host_vars=dict(ansible_connection='local'),
+            host_keys=[]))
+        return hosts
+
+
 class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
     """An Ansible executor to be used in tests.
 
@@ -1445,63 +1502,6 @@
         super(RecordingExecutorServer, self).stop()
 
 
-class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
-    def doMergeChanges(self, merger, items, repo_state):
-        # Get a merger in order to update the repos involved in this job.
-        commit = super(RecordingAnsibleJob, self).doMergeChanges(
-            merger, items, repo_state)
-        if not commit:  # merge conflict
-            self.recordResult('MERGER_FAILURE')
-        return commit
-
-    def recordResult(self, result):
-        build = self.executor_server.job_builds[self.job.unique]
-        self.executor_server.lock.acquire()
-        self.executor_server.build_history.append(
-            BuildHistory(name=build.name, result=result, changes=build.changes,
-                         node=build.node, uuid=build.unique,
-                         ref=build.parameters['zuul']['ref'],
-                         parameters=build.parameters, jobdir=build.jobdir,
-                         pipeline=build.parameters['zuul']['pipeline'])
-        )
-        self.executor_server.running_builds.remove(build)
-        del self.executor_server.job_builds[self.job.unique]
-        self.executor_server.lock.release()
-
-    def runPlaybooks(self, args):
-        build = self.executor_server.job_builds[self.job.unique]
-        build.jobdir = self.jobdir
-
-        result = super(RecordingAnsibleJob, self).runPlaybooks(args)
-        self.recordResult(result)
-        return result
-
-    def runAnsible(self, cmd, timeout, playbook):
-        build = self.executor_server.job_builds[self.job.unique]
-
-        if self.executor_server._run_ansible:
-            result = super(RecordingAnsibleJob, self).runAnsible(
-                cmd, timeout, playbook)
-        else:
-            if playbook.path:
-                result = build.run()
-            else:
-                result = (self.RESULT_NORMAL, 0)
-        return result
-
-    def getHostList(self, args):
-        self.log.debug("hostlist")
-        hosts = super(RecordingAnsibleJob, self).getHostList(args)
-        for host in hosts:
-            host['host_vars']['ansible_connection'] = 'local'
-
-        hosts.append(dict(
-            name='localhost',
-            host_vars=dict(ansible_connection='local'),
-            host_keys=[]))
-        return hosts
-
-
 class FakeGearmanServer(gear.Server):
     """A Gearman server for use in tests.
 
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index d739c18..670a420 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -500,412 +500,6 @@
     return inventory
 
 
-class ExecutorMergeWorker(gear.TextWorker):
-    def __init__(self, executor_server, *args, **kw):
-        self.zuul_executor_server = executor_server
-        super(ExecutorMergeWorker, self).__init__(*args, **kw)
-
-    def handleNoop(self, packet):
-        # Wait until the update queue is empty before responding
-        while self.zuul_executor_server.update_queue.qsize():
-            time.sleep(1)
-
-        with self.zuul_executor_server.merger_lock:
-            super(ExecutorMergeWorker, self).handleNoop(packet)
-
-
-class ExecutorExecuteWorker(gear.TextWorker):
-    def __init__(self, executor_server, *args, **kw):
-        self.zuul_executor_server = executor_server
-        super(ExecutorExecuteWorker, self).__init__(*args, **kw)
-
-    def handleNoop(self, packet):
-        # Delay our response to running a new job based on the number
-        # of jobs we're currently running, in an attempt to spread
-        # load evenly among executors.
-        workers = len(self.zuul_executor_server.job_workers)
-        delay = (workers ** 2) / 1000.0
-        time.sleep(delay)
-        return super(ExecutorExecuteWorker, self).handleNoop(packet)
-
-
-class ExecutorServer(object):
-    log = logging.getLogger("zuul.ExecutorServer")
-
-    def __init__(self, config, connections={}, jobdir_root=None,
-                 keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
-        self.config = config
-        self.keep_jobdir = keep_jobdir
-        self.jobdir_root = jobdir_root
-        # TODOv3(mordred): make the executor name more unique --
-        # perhaps hostname+pid.
-        self.hostname = socket.gethostname()
-        self.log_streaming_port = log_streaming_port
-        self.merger_lock = threading.Lock()
-        self.verbose = False
-        self.command_map = dict(
-            stop=self.stop,
-            pause=self.pause,
-            unpause=self.unpause,
-            graceful=self.graceful,
-            verbose=self.verboseOn,
-            unverbose=self.verboseOff,
-            keep=self.keep,
-            nokeep=self.nokeep,
-        )
-
-        self.merge_root = get_default(self.config, 'executor', 'git_dir',
-                                      '/var/lib/zuul/executor-git')
-        self.default_username = get_default(self.config, 'executor',
-                                            'default_username', 'zuul')
-        self.disk_limit_per_job = int(get_default(self.config, 'executor',
-                                                  'disk_limit_per_job', 250))
-        self.merge_email = get_default(self.config, 'merger', 'git_user_email')
-        self.merge_name = get_default(self.config, 'merger', 'git_user_name')
-        self.merge_speed_limit = get_default(
-            config, 'merger', 'git_http_low_speed_limit', '1000')
-        self.merge_speed_time = get_default(
-            config, 'merger', 'git_http_low_speed_time', '30')
-        execution_wrapper_name = get_default(self.config, 'executor',
-                                             'execution_wrapper', 'bubblewrap')
-        load_multiplier = float(get_default(self.config, 'executor',
-                                            'load_multiplier', '2.5'))
-        self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
-        self.accepting_work = False
-        self.execution_wrapper = connections.drivers[execution_wrapper_name]
-
-        self.connections = connections
-        # This merger and its git repos are used to maintain
-        # up-to-date copies of all the repos that are used by jobs, as
-        # well as to support the merger:cat functon to supply
-        # configuration information to Zuul when it starts.
-        self.merger = self._getMerger(self.merge_root)
-        self.update_queue = DeduplicateQueue()
-
-        state_dir = get_default(self.config, 'executor', 'state_dir',
-                                '/var/lib/zuul', expand_user=True)
-        path = os.path.join(state_dir, 'executor.socket')
-        self.command_socket = commandsocket.CommandSocket(path)
-        ansible_dir = os.path.join(state_dir, 'ansible')
-        self.ansible_dir = ansible_dir
-        if os.path.exists(ansible_dir):
-            shutil.rmtree(ansible_dir)
-
-        zuul_dir = os.path.join(ansible_dir, 'zuul')
-        plugin_dir = os.path.join(zuul_dir, 'ansible')
-
-        os.makedirs(plugin_dir, mode=0o0755)
-
-        self.library_dir = os.path.join(plugin_dir, 'library')
-        self.action_dir = os.path.join(plugin_dir, 'action')
-        self.callback_dir = os.path.join(plugin_dir, 'callback')
-        self.lookup_dir = os.path.join(plugin_dir, 'lookup')
-        self.filter_dir = os.path.join(plugin_dir, 'filter')
-
-        _copy_ansible_files(zuul.ansible, plugin_dir)
-
-        # We're copying zuul.ansible.* into a directory we are going
-        # to add to pythonpath, so our plugins can "import
-        # zuul.ansible".  But we're not installing all of zuul, so
-        # create a __init__.py file for the stub "zuul" module.
-        with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
-            pass
-
-        self.job_workers = {}
-        self.disk_accountant = DiskAccountant(self.jobdir_root,
-                                              self.disk_limit_per_job,
-                                              self.stopJobDiskFull,
-                                              self.merge_root)
-
-    def _getMerger(self, root, logger=None):
-        if root != self.merge_root:
-            cache_root = self.merge_root
-        else:
-            cache_root = None
-        return zuul.merger.merger.Merger(
-            root, self.connections, self.merge_email, self.merge_name,
-            self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
-
-    def start(self):
-        self._running = True
-        self._command_running = True
-        server = self.config.get('gearman', 'server')
-        port = get_default(self.config, 'gearman', 'port', 4730)
-        ssl_key = get_default(self.config, 'gearman', 'ssl_key')
-        ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
-        ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
-        self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
-        self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
-        self.executor_worker = ExecutorExecuteWorker(
-            self, 'Zuul Executor Server')
-        self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
-        self.log.debug("Waiting for server")
-        self.merger_worker.waitForServer()
-        self.executor_worker.waitForServer()
-        self.log.debug("Registering")
-        self.register()
-
-        self.log.debug("Starting command processor")
-        self.command_socket.start()
-        self.command_thread = threading.Thread(target=self.runCommand)
-        self.command_thread.daemon = True
-        self.command_thread.start()
-
-        self.log.debug("Starting worker")
-        self.update_thread = threading.Thread(target=self._updateLoop)
-        self.update_thread.daemon = True
-        self.update_thread.start()
-        self.merger_thread = threading.Thread(target=self.run_merger)
-        self.merger_thread.daemon = True
-        self.merger_thread.start()
-        self.executor_thread = threading.Thread(target=self.run_executor)
-        self.executor_thread.daemon = True
-        self.executor_thread.start()
-        self.governor_stop_event = threading.Event()
-        self.governor_thread = threading.Thread(target=self.run_governor)
-        self.governor_thread.daemon = True
-        self.governor_thread.start()
-        self.disk_accountant.start()
-
-    def register(self):
-        self.register_work()
-        self.executor_worker.registerFunction("executor:stop:%s" %
-                                              self.hostname)
-        self.merger_worker.registerFunction("merger:merge")
-        self.merger_worker.registerFunction("merger:cat")
-        self.merger_worker.registerFunction("merger:refstate")
-
-    def register_work(self):
-        self.accepting_work = True
-        self.executor_worker.registerFunction("executor:execute")
-
-    def unregister_work(self):
-        self.accepting_work = False
-        self.executor_worker.unregisterFunction("executor:execute")
-
-    def stop(self):
-        self.log.debug("Stopping")
-        self.disk_accountant.stop()
-        self.governor_stop_event.set()
-        self._running = False
-        self._command_running = False
-        self.command_socket.stop()
-        self.update_queue.put(None)
-
-        for job_worker in list(self.job_workers.values()):
-            try:
-                job_worker.stop()
-            except Exception:
-                self.log.exception("Exception sending stop command "
-                                   "to worker:")
-        self.merger_worker.shutdown()
-        self.executor_worker.shutdown()
-        self.log.debug("Stopped")
-
-    def pause(self):
-        # TODOv3: implement
-        pass
-
-    def unpause(self):
-        # TODOv3: implement
-        pass
-
-    def graceful(self):
-        # TODOv3: implement
-        pass
-
-    def verboseOn(self):
-        self.verbose = True
-
-    def verboseOff(self):
-        self.verbose = False
-
-    def keep(self):
-        self.keep_jobdir = True
-
-    def nokeep(self):
-        self.keep_jobdir = False
-
-    def join(self):
-        self.update_thread.join()
-        self.merger_thread.join()
-        self.executor_thread.join()
-        self.governor_thread.join()
-
-    def runCommand(self):
-        while self._command_running:
-            try:
-                command = self.command_socket.get().decode('utf8')
-                if command != '_stop':
-                    self.command_map[command]()
-            except Exception:
-                self.log.exception("Exception while processing command")
-
-    def _updateLoop(self):
-        while self._running:
-            try:
-                self._innerUpdateLoop()
-            except:
-                self.log.exception("Exception in update thread:")
-
-    def _innerUpdateLoop(self):
-        # Inside of a loop that keeps the main repositories up to date
-        task = self.update_queue.get()
-        if task is None:
-            # We are asked to stop
-            return
-        with self.merger_lock:
-            self.log.info("Updating repo %s/%s" % (
-                task.connection_name, task.project_name))
-            self.merger.updateRepo(task.connection_name, task.project_name)
-            self.log.debug("Finished updating repo %s/%s" %
-                           (task.connection_name, task.project_name))
-        task.setComplete()
-
-    def update(self, connection_name, project_name):
-        # Update a repository in the main merger
-        task = UpdateTask(connection_name, project_name)
-        task = self.update_queue.put(task)
-        return task
-
-    def run_merger(self):
-        self.log.debug("Starting merger listener")
-        while self._running:
-            try:
-                job = self.merger_worker.getJob()
-                try:
-                    if job.name == 'merger:cat':
-                        self.log.debug("Got cat job: %s" % job.unique)
-                        self.cat(job)
-                    elif job.name == 'merger:merge':
-                        self.log.debug("Got merge job: %s" % job.unique)
-                        self.merge(job)
-                    elif job.name == 'merger:refstate':
-                        self.log.debug("Got refstate job: %s" % job.unique)
-                        self.refstate(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
-                except Exception:
-                    self.log.exception("Exception while running job")
-                    job.sendWorkException(
-                        traceback.format_exc().encode('utf8'))
-            except gear.InterruptedError:
-                pass
-            except Exception:
-                self.log.exception("Exception while getting job")
-
-    def run_executor(self):
-        self.log.debug("Starting executor listener")
-        while self._running:
-            try:
-                job = self.executor_worker.getJob()
-                try:
-                    if job.name == 'executor:execute':
-                        self.log.debug("Got execute job: %s" % job.unique)
-                        self.executeJob(job)
-                    elif job.name.startswith('executor:stop'):
-                        self.log.debug("Got stop job: %s" % job.unique)
-                        self.stopJob(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
-                except Exception:
-                    self.log.exception("Exception while running job")
-                    job.sendWorkException(
-                        traceback.format_exc().encode('utf8'))
-            except gear.InterruptedError:
-                pass
-            except Exception:
-                self.log.exception("Exception while getting job")
-
-    def run_governor(self):
-        while not self.governor_stop_event.wait(30):
-            self.manageLoad()
-
-    def executeJob(self, job):
-        self.job_workers[job.unique] = AnsibleJob(self, job)
-        self.job_workers[job.unique].run()
-
-    def manageLoad(self):
-        ''' Apply some heuristics to decide whether or not we should
-            be askign for more jobs '''
-        load_avg = os.getloadavg()[0]
-        if self.accepting_work:
-            # Don't unregister if we don't have any active jobs.
-            if load_avg > self.max_load_avg and self.job_workers:
-                self.log.info(
-                    "Unregistering due to high system load {} > {}".format(
-                        load_avg, self.max_load_avg))
-                self.unregister_work()
-        elif load_avg <= self.max_load_avg:
-            self.log.info(
-                "Re-registering as load is within limits {} <= {}".format(
-                    load_avg, self.max_load_avg))
-            self.register_work()
-
-    def finishJob(self, unique):
-        del(self.job_workers[unique])
-
-    def stopJobDiskFull(self, jobdir):
-        unique = os.path.basename(jobdir)
-        self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
-
-    def stopJob(self, job):
-        try:
-            args = json.loads(job.arguments)
-            self.log.debug("Stop job with arguments: %s" % (args,))
-            unique = args['uuid']
-            self.stopJobByUnique(unique)
-        finally:
-            job.sendWorkComplete()
-
-    def stopJobByUnique(self, unique, reason=None):
-        job_worker = self.job_workers.get(unique)
-        if not job_worker:
-            self.log.debug("Unable to find worker for job %s" % (unique,))
-            return
-        try:
-            job_worker.stop(reason)
-        except Exception:
-            self.log.exception("Exception sending stop command "
-                               "to worker:")
-
-    def cat(self, job):
-        args = json.loads(job.arguments)
-        task = self.update(args['connection'], args['project'])
-        task.wait()
-        with self.merger_lock:
-            files = self.merger.getFiles(args['connection'], args['project'],
-                                         args['branch'], args['files'],
-                                         args.get('dirs', []))
-        result = dict(updated=True,
-                      files=files)
-        job.sendWorkComplete(json.dumps(result))
-
-    def refstate(self, job):
-        args = json.loads(job.arguments)
-        with self.merger_lock:
-            success, repo_state = self.merger.getRepoState(args['items'])
-        result = dict(updated=success,
-                      repo_state=repo_state)
-        job.sendWorkComplete(json.dumps(result))
-
-    def merge(self, job):
-        args = json.loads(job.arguments)
-        with self.merger_lock:
-            ret = self.merger.mergeChanges(args['items'], args.get('files'),
-                                           args.get('dirs', []),
-                                           args.get('repo_state'))
-        result = dict(merged=(ret is not None))
-        if ret is None:
-            result['commit'] = result['files'] = result['repo_state'] = None
-        else:
-            (result['commit'], result['files'], result['repo_state'],
-             recent) = ret
-        job.sendWorkComplete(json.dumps(result))
-
-
 class AnsibleJobLogAdapter(logging.LoggerAdapter):
     def process(self, msg, kwargs):
         msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
@@ -1856,3 +1450,409 @@
 
         self.emitPlaybookBanner(playbook, 'END', phase, result=result)
         return result, code
+
+
+class ExecutorMergeWorker(gear.TextWorker):
+    def __init__(self, executor_server, *args, **kw):
+        self.zuul_executor_server = executor_server
+        super(ExecutorMergeWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        # Wait until the update queue is empty before responding
+        while self.zuul_executor_server.update_queue.qsize():
+            time.sleep(1)
+
+        with self.zuul_executor_server.merger_lock:
+            super(ExecutorMergeWorker, self).handleNoop(packet)
+
+
+class ExecutorExecuteWorker(gear.TextWorker):
+    def __init__(self, executor_server, *args, **kw):
+        self.zuul_executor_server = executor_server
+        super(ExecutorExecuteWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        # Delay our response to running a new job based on the number
+        # of jobs we're currently running, in an attempt to spread
+        # load evenly among executors.
+        workers = len(self.zuul_executor_server.job_workers)
+        delay = (workers ** 2) / 1000.0
+        time.sleep(delay)
+        return super(ExecutorExecuteWorker, self).handleNoop(packet)
+
+
+class ExecutorServer(object):
+    log = logging.getLogger("zuul.ExecutorServer")
+
+    def __init__(self, config, connections={}, jobdir_root=None,
+                 keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
+        self.config = config
+        self.keep_jobdir = keep_jobdir
+        self.jobdir_root = jobdir_root
+        # TODOv3(mordred): make the executor name more unique --
+        # perhaps hostname+pid.
+        self.hostname = socket.gethostname()
+        self.log_streaming_port = log_streaming_port
+        self.merger_lock = threading.Lock()
+        self.verbose = False
+        self.command_map = dict(
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            graceful=self.graceful,
+            verbose=self.verboseOn,
+            unverbose=self.verboseOff,
+            keep=self.keep,
+            nokeep=self.nokeep,
+        )
+
+        self.merge_root = get_default(self.config, 'executor', 'git_dir',
+                                      '/var/lib/zuul/executor-git')
+        self.default_username = get_default(self.config, 'executor',
+                                            'default_username', 'zuul')
+        self.disk_limit_per_job = int(get_default(self.config, 'executor',
+                                                  'disk_limit_per_job', 250))
+        self.merge_email = get_default(self.config, 'merger', 'git_user_email')
+        self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+        self.merge_speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        self.merge_speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
+        execution_wrapper_name = get_default(self.config, 'executor',
+                                             'execution_wrapper', 'bubblewrap')
+        load_multiplier = float(get_default(self.config, 'executor',
+                                            'load_multiplier', '2.5'))
+        self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+        self.accepting_work = False
+        self.execution_wrapper = connections.drivers[execution_wrapper_name]
+
+        self.connections = connections
+        # This merger and its git repos are used to maintain
+        # up-to-date copies of all the repos that are used by jobs, as
+        # well as to support the merger:cat functon to supply
+        # configuration information to Zuul when it starts.
+        self.merger = self._getMerger(self.merge_root)
+        self.update_queue = DeduplicateQueue()
+
+        state_dir = get_default(self.config, 'executor', 'state_dir',
+                                '/var/lib/zuul', expand_user=True)
+        path = os.path.join(state_dir, 'executor.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
+        ansible_dir = os.path.join(state_dir, 'ansible')
+        self.ansible_dir = ansible_dir
+        if os.path.exists(ansible_dir):
+            shutil.rmtree(ansible_dir)
+
+        zuul_dir = os.path.join(ansible_dir, 'zuul')
+        plugin_dir = os.path.join(zuul_dir, 'ansible')
+
+        os.makedirs(plugin_dir, mode=0o0755)
+
+        self.library_dir = os.path.join(plugin_dir, 'library')
+        self.action_dir = os.path.join(plugin_dir, 'action')
+        self.callback_dir = os.path.join(plugin_dir, 'callback')
+        self.lookup_dir = os.path.join(plugin_dir, 'lookup')
+        self.filter_dir = os.path.join(plugin_dir, 'filter')
+
+        _copy_ansible_files(zuul.ansible, plugin_dir)
+
+        # We're copying zuul.ansible.* into a directory we are going
+        # to add to pythonpath, so our plugins can "import
+        # zuul.ansible".  But we're not installing all of zuul, so
+        # create a __init__.py file for the stub "zuul" module.
+        with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
+            pass
+
+        self.job_workers = {}
+        self.disk_accountant = DiskAccountant(self.jobdir_root,
+                                              self.disk_limit_per_job,
+                                              self.stopJobDiskFull,
+                                              self.merge_root)
+
+    def _getMerger(self, root, logger=None):
+        if root != self.merge_root:
+            cache_root = self.merge_root
+        else:
+            cache_root = None
+        return zuul.merger.merger.Merger(
+            root, self.connections, self.merge_email, self.merge_name,
+            self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
+
+    def start(self):
+        self._running = True
+        self._command_running = True
+        server = self.config.get('gearman', 'server')
+        port = get_default(self.config, 'gearman', 'port', 4730)
+        ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+        ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+        ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+        self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
+        self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+        self.executor_worker = ExecutorExecuteWorker(
+            self, 'Zuul Executor Server')
+        self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+        self.log.debug("Waiting for server")
+        self.merger_worker.waitForServer()
+        self.executor_worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
+        self.log.debug("Starting worker")
+        self.update_thread = threading.Thread(target=self._updateLoop)
+        self.update_thread.daemon = True
+        self.update_thread.start()
+        self.merger_thread = threading.Thread(target=self.run_merger)
+        self.merger_thread.daemon = True
+        self.merger_thread.start()
+        self.executor_thread = threading.Thread(target=self.run_executor)
+        self.executor_thread.daemon = True
+        self.executor_thread.start()
+        self.governor_stop_event = threading.Event()
+        self.governor_thread = threading.Thread(target=self.run_governor)
+        self.governor_thread.daemon = True
+        self.governor_thread.start()
+        self.disk_accountant.start()
+
+    def register(self):
+        self.register_work()
+        self.executor_worker.registerFunction("executor:stop:%s" %
+                                              self.hostname)
+        self.merger_worker.registerFunction("merger:merge")
+        self.merger_worker.registerFunction("merger:cat")
+        self.merger_worker.registerFunction("merger:refstate")
+
+    def register_work(self):
+        self.accepting_work = True
+        self.executor_worker.registerFunction("executor:execute")
+
+    def unregister_work(self):
+        self.accepting_work = False
+        self.executor_worker.unregisterFunction("executor:execute")
+
+    def stop(self):
+        self.log.debug("Stopping")
+        self.disk_accountant.stop()
+        self.governor_stop_event.set()
+        self._running = False
+        self._command_running = False
+        self.command_socket.stop()
+        self.update_queue.put(None)
+
+        for job_worker in list(self.job_workers.values()):
+            try:
+                job_worker.stop()
+            except Exception:
+                self.log.exception("Exception sending stop command "
+                                   "to worker:")
+        self.merger_worker.shutdown()
+        self.executor_worker.shutdown()
+        self.log.debug("Stopped")
+
+    def pause(self):
+        # TODOv3: implement
+        pass
+
+    def unpause(self):
+        # TODOv3: implement
+        pass
+
+    def graceful(self):
+        # TODOv3: implement
+        pass
+
+    def verboseOn(self):
+        self.verbose = True
+
+    def verboseOff(self):
+        self.verbose = False
+
+    def keep(self):
+        self.keep_jobdir = True
+
+    def nokeep(self):
+        self.keep_jobdir = False
+
+    def join(self):
+        self.update_thread.join()
+        self.merger_thread.join()
+        self.executor_thread.join()
+        self.governor_thread.join()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
+
+    def _updateLoop(self):
+        while self._running:
+            try:
+                self._innerUpdateLoop()
+            except:
+                self.log.exception("Exception in update thread:")
+
+    def _innerUpdateLoop(self):
+        # Inside of a loop that keeps the main repositories up to date
+        task = self.update_queue.get()
+        if task is None:
+            # We are asked to stop
+            return
+        with self.merger_lock:
+            self.log.info("Updating repo %s/%s" % (
+                task.connection_name, task.project_name))
+            self.merger.updateRepo(task.connection_name, task.project_name)
+            self.log.debug("Finished updating repo %s/%s" %
+                           (task.connection_name, task.project_name))
+        task.setComplete()
+
+    def update(self, connection_name, project_name):
+        # Update a repository in the main merger
+        task = UpdateTask(connection_name, project_name)
+        task = self.update_queue.put(task)
+        return task
+
+    def run_merger(self):
+        self.log.debug("Starting merger listener")
+        while self._running:
+            try:
+                job = self.merger_worker.getJob()
+                try:
+                    if job.name == 'merger:cat':
+                        self.log.debug("Got cat job: %s" % job.unique)
+                        self.cat(job)
+                    elif job.name == 'merger:merge':
+                        self.log.debug("Got merge job: %s" % job.unique)
+                        self.merge(job)
+                    elif job.name == 'merger:refstate':
+                        self.log.debug("Got refstate job: %s" % job.unique)
+                        self.refstate(job)
+                    else:
+                        self.log.error("Unable to handle job %s" % job.name)
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(
+                        traceback.format_exc().encode('utf8'))
+            except gear.InterruptedError:
+                pass
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def run_executor(self):
+        self.log.debug("Starting executor listener")
+        while self._running:
+            try:
+                job = self.executor_worker.getJob()
+                try:
+                    if job.name == 'executor:execute':
+                        self.log.debug("Got execute job: %s" % job.unique)
+                        self.executeJob(job)
+                    elif job.name.startswith('executor:stop'):
+                        self.log.debug("Got stop job: %s" % job.unique)
+                        self.stopJob(job)
+                    else:
+                        self.log.error("Unable to handle job %s" % job.name)
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(
+                        traceback.format_exc().encode('utf8'))
+            except gear.InterruptedError:
+                pass
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def run_governor(self):
+        while not self.governor_stop_event.wait(30):
+            self.manageLoad()
+
+    def executeJob(self, job):
+        self.job_workers[job.unique] = AnsibleJob(self, job)
+        self.job_workers[job.unique].run()
+
+    def manageLoad(self):
+        ''' Apply some heuristics to decide whether or not we should
+            be askign for more jobs '''
+        load_avg = os.getloadavg()[0]
+        if self.accepting_work:
+            # Don't unregister if we don't have any active jobs.
+            if load_avg > self.max_load_avg and self.job_workers:
+                self.log.info(
+                    "Unregistering due to high system load {} > {}".format(
+                        load_avg, self.max_load_avg))
+                self.unregister_work()
+        elif load_avg <= self.max_load_avg:
+            self.log.info(
+                "Re-registering as load is within limits {} <= {}".format(
+                    load_avg, self.max_load_avg))
+            self.register_work()
+
+    def finishJob(self, unique):
+        del(self.job_workers[unique])
+
+    def stopJobDiskFull(self, jobdir):
+        unique = os.path.basename(jobdir)
+        self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
+
+    def stopJob(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Stop job with arguments: %s" % (args,))
+            unique = args['uuid']
+            self.stopJobByUnique(unique)
+        finally:
+            job.sendWorkComplete()
+
+    def stopJobByUnique(self, unique, reason=None):
+        job_worker = self.job_workers.get(unique)
+        if not job_worker:
+            self.log.debug("Unable to find worker for job %s" % (unique,))
+            return
+        try:
+            job_worker.stop(reason)
+        except Exception:
+            self.log.exception("Exception sending stop command "
+                               "to worker:")
+
+    def cat(self, job):
+        args = json.loads(job.arguments)
+        task = self.update(args['connection'], args['project'])
+        task.wait()
+        with self.merger_lock:
+            files = self.merger.getFiles(args['connection'], args['project'],
+                                         args['branch'], args['files'],
+                                         args.get('dirs', []))
+        result = dict(updated=True,
+                      files=files)
+        job.sendWorkComplete(json.dumps(result))
+
+    def refstate(self, job):
+        args = json.loads(job.arguments)
+        with self.merger_lock:
+            success, repo_state = self.merger.getRepoState(args['items'])
+        result = dict(updated=success,
+                      repo_state=repo_state)
+        job.sendWorkComplete(json.dumps(result))
+
+    def merge(self, job):
+        args = json.loads(job.arguments)
+        with self.merger_lock:
+            ret = self.merger.mergeChanges(args['items'], args.get('files'),
+                                           args.get('dirs', []),
+                                           args.get('repo_state'))
+        result = dict(merged=(ret is not None))
+        if ret is None:
+            result['commit'] = result['files'] = result['repo_state'] = None
+        else:
+            (result['commit'], result['files'], result['repo_state'],
+             recent) = ret
+        job.sendWorkComplete(json.dumps(result))