Refactor th to have one gearman worker thread

Change-Id: I78f95a67b80ce0627b4a3bbb20578f3d16028714
diff --git a/doc/source/installation.rst b/doc/source/installation.rst
index 9916eb7..a0a0732 100644
--- a/doc/source/installation.rst
+++ b/doc/source/installation.rst
@@ -67,10 +67,11 @@
         for projects. This is the cache directory used by pip.
     **plugins**
         A list of enabled plugins and their settings in a dictionary.
-        The only required parameter is *name* which should be the
-        same as the folder containing the plugin module. Any other
-        parameters are specified by the plugin themselves as
-        required.
+        The only required parameters are *name*, which should be the
+        same as the folder containing the plugin module, and
+        *function*, which is the function registered with zuul.
+        Any other parameters are specified by the plugin themselves
+        as required.
     **publish_logs**
         Log results from plugins can be published using multiple
         methods. Currently only a local copy is fully implemented.
diff --git a/etc/turbo-hipster/config.json b/etc/turbo-hipster/config.json
index 2afd604..325a373 100644
--- a/etc/turbo-hipster/config.json
+++ b/etc/turbo-hipster/config.json
@@ -11,8 +11,8 @@
     "plugins": [
         {
             "name": "gate_real_db_upgrade",
-            "datasets_dir": "/var/lib/turbo-hipster/datasets",
-            "job": "gate-real-db-upgrade_nova_mysql"
+            "function": "build:gate-real-db-upgrade_nova_mysql",
+            "datasets_dir": "/var/lib/turbo-hipster/datasets"
         }
     ],
     "publish_logs": {
diff --git a/tests/fakes.py b/tests/fakes.py
index 57d60aa..a4b3b60 100644
--- a/tests/fakes.py
+++ b/tests/fakes.py
@@ -21,15 +21,15 @@
 import re
 import time
 
-from turbo_hipster.worker_manager import GearmanManager
+from turbo_hipster.worker_manager import ZuulManager
 from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\
     as RealDbUpgradeRunner
 
 
-class FakeGearmanManager(GearmanManager):
+class FakeZuulManager(ZuulManager):
     def __init__(self, config, tasks, test):
         self.test = test
-        super(FakeGearmanManager, self).__init__(config, tasks)
+        super(FakeZuulManager, self).__init__(config, tasks)
 
     def setup_gearman(self):
         hostname = os.uname()[1]
@@ -160,16 +160,6 @@
                                                       plugin_config,
                                                       worker_name)
 
-    def setup_gearman(self):
-        self.log.debug("Set up real_db gearman worker")
-        self.gearman_worker = FakeWorker('FakeRealDbUpgradeRunner_worker',
-                                         self.test)
-        self.gearman_worker.addServer(
-            self.global_config['zuul_server']['gearman_host'],
-            self.global_config['zuul_server']['gearman_port']
-        )
-        self.register_functions()
-
 
 class BuildHistory(object):
     def __init__(self, **kw):
diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py
index 11fc960..89e02b2 100644
--- a/tests/test_worker_manager.py
+++ b/tests/test_worker_manager.py
@@ -19,7 +19,7 @@
 import os
 import testtools
 import time
-from fakes import FakeGearmanManager, FakeGearmanServer,\
+from fakes import FakeZuulManager, FakeGearmanServer,\
     FakeRealDbUpgradeRunner
 
 CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc')
@@ -40,9 +40,7 @@
                                             'test-worker-1', self)
         self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task)
 
-        self.gearman_manager = FakeGearmanManager(self.config,
-                                                  self.tasks,
-                                                  self)
+        self.gearman_manager = FakeZuulManager(self.config, self.tasks, self)
 
     def test_manager_function_registered(self):
         """ Check the manager is set up correctly and registered with the
diff --git a/turbo_hipster/lib/utils.py b/turbo_hipster/lib/utils.py
index 5a0237d..11a3ade 100644
--- a/turbo_hipster/lib/utils.py
+++ b/turbo_hipster/lib/utils.py
@@ -224,6 +224,8 @@
 
 
 def determine_job_identifier(zuul_arguments, job, unique):
+    if 'build:' in job:
+        job = job.split('build:')[1]
     return os.path.join(zuul_arguments['ZUUL_CHANGE'][:2],
                         zuul_arguments['ZUUL_CHANGE'],
                         zuul_arguments['ZUUL_PATCHSET'],
diff --git a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py
index 7909fd1..a9f9e4b 100644
--- a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py
+++ b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py
@@ -13,12 +13,10 @@
 # under the License.
 
 
-import gear
 import json
 import logging
 import os
 import re
-import threading
 
 from turbo_hipster.lib import utils
 
@@ -31,7 +29,7 @@
 MIGRATION_END_RE = re.compile('^done$')
 
 
-class Runner(threading.Thread):
+class Runner(object):
 
     """ This thread handles the actual sql-migration tests.
         It pulls in a gearman job from the  build:gate-real-db-upgrade
@@ -39,16 +37,12 @@
 
     log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner")
 
-    def __init__(self, global_config, plugin_config, worker_name):
-        super(Runner, self).__init__()
-        self._stop = threading.Event()
+    def __init__(self, global_config, plugin_config, job_name):
         self.global_config = global_config
         self.plugin_config = plugin_config
-
-        self.worker_name = worker_name
+        self.job_name = job_name
 
         # Set up the runner worker
-        self.gearman_worker = None
         self.datasets = []
 
         self.job = None
@@ -61,31 +55,6 @@
         self.current_step = 0
         self.total_steps = 4
 
-        self.setup_gearman()
-
-    def setup_gearman(self):
-        self.log.debug("Set up real_db gearman worker")
-        self.gearman_worker = gear.Worker(self.worker_name)
-        self.gearman_worker.addServer(
-            self.global_config['zuul_server']['gearman_host'],
-            self.global_config['zuul_server']['gearman_port']
-        )
-        self.register_functions()
-
-    def register_functions(self):
-        self.gearman_worker.registerFunction(
-            'build:' + self.plugin_config['job'])
-
-    def stop(self):
-        self._stop.set()
-        # Unblock gearman
-        self.log.debug("Telling gearman to stop waiting for jobs")
-        self.gearman_worker.stopWaitingForJobs()
-        self.gearman_worker.shutdown()
-
-    def stopped(self):
-        return self._stop.isSet()
-
     def stop_worker(self, number):
         # Check the number is for this job instance
         # (makes it possible to run multiple workers with this task
@@ -93,22 +62,10 @@
         if number == self.job.unique:
             self.log.debug("We've been asked to stop by our gearman manager")
             self.cancelled = True
+            # TODO: Work out how to kill current step
 
-    def run(self):
-        while True and not self.stopped():
-            try:
-                # Reset job information:
-                self.current_step = 0
-                self.cancelled = False
-                self.work_data = None
-                # gearman_worker.getJob() blocks until a job is available
-                self.log.debug("Waiting for job")
-                self.job = self.gearman_worker.getJob()
-                self._handle_job()
-            except:
-                self.log.exception('Exception retrieving log event.')
-
-    def _handle_job(self):
+    def start_job(self, job):
+        self.job = job
         if self.job is not None:
             try:
                 self.job_arguments = \
@@ -215,7 +172,7 @@
                     dataset['config']['project'] and
                     self._get_project_command(dataset['config']['type'])):
                 dataset['determined_path'] = utils.determine_job_identifier(
-                    self.job_arguments, self.plugin_config['job'],
+                    self.job_arguments, self.plugin_config['function'],
                     self.job.unique
                 )
                 dataset['job_log_file_path'] = os.path.join(
@@ -315,7 +272,7 @@
             project_name + '/.git',
             os.path.join(
                 self.global_config['git_working_dir'],
-                self.worker_name,
+                self.job_name,
                 project_name
             )
         )
@@ -333,7 +290,7 @@
         if self.work_data is None:
             hostname = os.uname()[1]
             self.work_data = dict(
-                name=self.worker_name,
+                name=self.job_name,
                 number=self.job.unique,
                 manager='turbo-hipster-manager-%s' % hostname,
                 url='http://localhost',
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index 72034fb..4b2e02f 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -20,7 +20,7 @@
 import threading
 
 
-class GearmanManager(threading.Thread):
+class ZuulManager(threading.Thread):
 
     """ This thread manages all of the launched gearman workers.
         As required by the zuul protocol it handles stopping builds when they
@@ -31,7 +31,7 @@
     log = logging.getLogger("worker_manager.GearmanManager")
 
     def __init__(self, config, tasks):
-        super(GearmanManager, self).__init__()
+        super(ZuulManager, self).__init__()
         self._stop = threading.Event()
         self.config = config
         self.tasks = tasks
@@ -81,3 +81,67 @@
         except Exception as e:
             self.log.exception('Exception handling log event.')
             job.sendWorkException(str(e).encode('utf-8'))
+
+
+class ZuulClient(threading.Thread):
+
+    """ ..."""
+
+    log = logging.getLogger("worker_manager.ZuulClient")
+
+    def __init__(self, global_config, worker_name):
+        super(ZuulClient, self).__init__()
+        self._stop = threading.Event()
+        self.global_config = global_config
+
+        self.worker_name = worker_name
+
+        # Set up the runner worker
+        self.gearman_worker = None
+        self.functions = {}
+
+        self.job = None
+        self.cancelled = False
+
+        self.setup_gearman()
+
+    def setup_gearman(self):
+        self.log.debug("Set up gearman worker")
+        self.gearman_worker = gear.Worker(self.worker_name)
+        self.gearman_worker.addServer(
+            self.global_config['zuul_server']['gearman_host'],
+            self.global_config['zuul_server']['gearman_port']
+        )
+        self.register_functions()
+
+    def register_functions(self):
+        for function_name, plugin in self.functions.items():
+            self.gearman_worker.registerFunction(function_name)
+
+    def add_function(self, function_name, plugin):
+        self.functions[function_name] = plugin
+
+    def stop(self):
+        self._stop.set()
+        # Unblock gearman
+        self.log.debug("Telling gearman to stop waiting for jobs")
+        self.gearman_worker.stopWaitingForJobs()
+        self.gearman_worker.shutdown()
+
+    def stopped(self):
+        return self._stop.isSet()
+
+    def run(self):
+        while True and not self.stopped():
+            try:
+                self.cancelled = False
+                # gearman_worker.getJob() blocks until a job is available
+                self.log.debug("Waiting for job")
+                self.job = self.gearman_worker.getJob()
+                self._handle_job()
+            except:
+                self.log.exception('Exception retrieving log event.')
+
+    def _handle_job(self):
+        """ We have a job, give it to the right plugin """
+        self.functions[self.job.name].start_job(self.job)
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index 8bbf42d..556dcf9 100755
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -40,16 +40,18 @@
     log = logging.getLogger("worker_server.Server")
 
     def __init__(self, config):
-        # Config init
-        self.config = config
-        self.manager = None
-        self.plugins = []
-        self.load_plugins()
-
         # Python logging output file.
         self.debug_log = self.config['debug_log']
 
+        # Config init
+        self.config = config
+        self.zuul_manager = None
+        self.zuul_client = None
+        self.plugins = []
+        self.worker_name = os.uname()[1]
+
         self.tasks = {}
+        self.load_plugins()
 
     def setup_logging(self):
         if self.debug_log:
@@ -74,23 +76,30 @@
                 'plugin_config': plugin
             })
 
-    def run_tasks(self):
+    def start_gearman_workers(self):
         """ Run the tasks """
-        for thread_number, plugin in enumerate(self.plugins):
+        self.zuul_client = worker_manager.ZuulClient(self.config,
+                                                     self.worker_name)
+
+        for task_number, plugin in enumerate(self.plugins):
             module = plugin['module']
-            worker_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
-                                        os.uname()[1], thread_number)
-            self.tasks[worker_name] = module.Runner(
+            job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
+                                     self.worker_name, task_number)
+            self.tasks[job_name] = module.Runner(
                 self.config,
                 plugin['plugin_config'],
-                worker_name
+                job_name
             )
-            self.tasks[worker_name].daemon = True
-            self.tasks[worker_name].start()
+            self.zuul_client.add_function(plugin['plugin_config']['function'],
+                                          self.tasks[job_name])
 
-        self.manager = worker_manager.GearmanManager(self.config, self.tasks)
-        self.manager.daemon = True
-        self.manager.start()
+        self.zuul_client.register_functions()
+        self.zuul_client.daemon = True
+        self.zuul_client.start()
+
+        self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
+        self.zuul_manager.daemon = True
+        self.zuul_manager.start()
 
     def exit_handler(self, signum):
         signal.signal(signal.SIGUSR1, signal.SIG_IGN)
@@ -101,7 +110,7 @@
 
     def main(self):
         self.setup_logging()
-        self.run_tasks()
+        self.start_gearman_workers()
 
         while True:
             try: