Merge "Use openstack's pypi mirror"
diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py
index 44d2604..d54978d 100644
--- a/turbo_hipster/cmd/server.py
+++ b/turbo_hipster/cmd/server.py
@@ -20,6 +20,7 @@
 import extras
 import json
 import os
+import signal
 import sys
 
 from turbo_hipster import worker_server
@@ -29,7 +30,32 @@
 PID_FILE_MODULE = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
 
 
-def main():
+def main(args):
+
+    with open(args.config, 'r') as config_stream:
+        config = json.load(config_stream)
+
+    server = worker_server.Server(config)
+
+    def term_handler(signum, frame):
+        server.stop()
+    signal.signal(signal.SIGTERM, term_handler)
+
+    if args.background:
+        server.daemon = True
+    server.start()
+
+    while not server.stopped():
+        try:
+            signal.pause()
+        except KeyboardInterrupt:
+            print "Ctrl + C: asking tasks to exit nicely...\n"
+            server.stop()
+
+
+if __name__ == '__main__':
+    sys.path.insert(0, os.path.abspath(
+                    os.path.join(os.path.dirname(__file__), '../')))
     parser = argparse.ArgumentParser()
     parser.add_argument('-c', '--config',
                         default=
@@ -42,21 +68,9 @@
                                 'turbo-hipster-worker-server.pid',
                         help='PID file to lock during daemonization.')
     args = parser.parse_args()
-
-    with open(args.config, 'r') as config_stream:
-        config = json.load(config_stream)
-
-    server = worker_server.Server(config)
-
     if args.background:
         pidfile = PID_FILE_MODULE.TimeoutPIDLockFile(args.pidfile, 10)
         with daemon.DaemonContext(pidfile=pidfile):
-            server.main()
+            main(args)
     else:
-        server.main()
-
-
-if __name__ == '__main__':
-    sys.path.insert(0, os.path.abspath(
-                    os.path.join(os.path.dirname(__file__), '../')))
-    main()
+        main(args)
diff --git a/turbo_hipster/lib/models.py b/turbo_hipster/lib/models.py
index d445379..cd73a73 100644
--- a/turbo_hipster/lib/models.py
+++ b/turbo_hipster/lib/models.py
@@ -23,7 +23,7 @@
 
 
 class Task(object):
-
+    """ A base object for running a job (aka Task) """
     log = logging.getLogger("lib.models.Task")
 
     def __init__(self, global_config, plugin_config, job_name):
@@ -68,11 +68,11 @@
                 if not self.cancelled:
                     self.job.sendWorkException(str(e).encode('utf-8'))
 
-    def stop_worker(self, number):
-        # Check the number is for this job instance
+    def stop_working(self, number=None):
+        # Check the number is for this job instance (None will cancel all)
         # (makes it possible to run multiple workers with this task
         # on this server)
-        if number == self.job.unique:
+        if number is None or number == self.job.unique:
             self.log.debug("We've been asked to stop by our gearman manager")
             self.cancelled = True
             # TODO: Work out how to kill current step
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index 746e4f7..3923d35 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -47,7 +47,9 @@
             self.config['zuul_server']['gearman_host'],
             self.config['zuul_server']['gearman_port']
         )
-        self.gearman_worker.waitForServer()
+
+    def register_functions(self):
+        hostname = os.uname()[1]
         self.gearman_worker.registerFunction(
             'stop:turbo-hipster-manager-%s' % hostname)
 
@@ -55,30 +57,34 @@
         self._stop.set()
         # Unblock gearman
         self.log.debug("Telling gearman to stop waiting for jobs")
-        self.gearman_worker.stopWaitingForJobs()
         self.gearman_worker.shutdown()
 
     def stopped(self):
         return self._stop.isSet()
 
     def run(self):
-        while True and not self.stopped():
+        while not self.stopped():
             try:
                 # gearman_worker.getJob() blocks until a job is available
                 self.log.debug("Waiting for server")
                 self.gearman_worker.waitForServer()
-                logging.debug("Waiting for job")
-                self.current_step = 0
-                job = self.gearman_worker.getJob()
-                self._handle_job(job)
+                if (not self.stopped() and self.gearman_worker.running and
+                    self.gearman_worker.active_connections):
+                    self.register_functions()
+                    self.gearman_worker.waitForServer()
+                    logging.debug("Waiting for job")
+                    self.current_step = 0
+                    job = self.gearman_worker.getJob()
+                    self._handle_job(job)
             except:
                 logging.exception('Exception retrieving log event.')
+        self.log.debug("Finished manager thread")
 
     def _handle_job(self, job):
         """ Handle the requested job """
         try:
             job_arguments = json.loads(job.arguments.decode('utf-8'))
-            self.tasks[job_arguments['name']].stop_worker(
+            self.tasks[job_arguments['name']].stop_working(
                 job_arguments['number'])
             job.sendWorkComplete()
         except Exception as e:
@@ -104,7 +110,6 @@
         self.functions = {}
 
         self.job = None
-        self.cancelled = False
 
         self.setup_gearman()
 
@@ -119,7 +124,6 @@
     def register_functions(self):
         self.log.debug("Register functions with gearman")
         for function_name, plugin in self.functions.items():
-            self.gearman_worker.waitForServer()
             self.gearman_worker.registerFunction(function_name)
         self.log.debug(self.gearman_worker.functions)
 
@@ -129,28 +133,34 @@
 
     def stop(self):
         self._stop.set()
+        for task in self.functions.values():
+            task.stop_working()
         # Unblock gearman
         self.log.debug("Telling gearman to stop waiting for jobs")
-        self.gearman_worker.stopWaitingForJobs()
         self.gearman_worker.shutdown()
 
     def stopped(self):
         return self._stop.isSet()
 
     def run(self):
-        while True and not self.stopped():
+        while not self.stopped():
             try:
-                self.cancelled = False
                 # gearman_worker.getJob() blocks until a job is available
                 self.log.debug("Waiting for server")
                 self.gearman_worker.waitForServer()
-                self.log.debug("Waiting for job")
-                self.job = self.gearman_worker.getJob()
-                self._handle_job()
+                if (not self.stopped() and self.gearman_worker.running and
+                    self.gearman_worker.active_connections):
+                    self.register_functions()
+                    self.gearman_worker.waitForServer()
+                    self.log.debug("Waiting for job")
+                    self.job = self.gearman_worker.getJob()
+                    self._handle_job()
             except:
                 self.log.exception('Exception waiting for job.')
+        self.log.debug("Finished client thread")
 
     def _handle_job(self):
         """ We have a job, give it to the right plugin """
-        self.log.debug("We have a job, we'll launch the task now.")
-        self.functions[self.job.name].start_job(self.job)
+        if self.job:
+            self.log.debug("We have a job, we'll launch the task now.")
+            self.functions[self.job.name].start_job(self.job)
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index dd13c08..d2edf18 100755
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -20,26 +20,29 @@
 
 import logging
 import os
-import signal
-import sys
+import threading
 
 import worker_manager
 
 
-class Server(object):
+class Server(threading.Thread):
 
     """ This is the worker server object to be daemonized """
     log = logging.getLogger("worker_server.Server")
 
     def __init__(self, config):
+        super(Server, self).__init__()
+        self._stop = threading.Event()
         self.config = config
         # Python logging output file.
         self.debug_log = self.config['debug_log']
+        self.setup_logging()
 
         # Config init
         self.zuul_manager = None
         self.zuul_client = None
         self.plugins = []
+        self.services_started = False
 
         # TODO: Make me unique (random?) and we should be able to run multiple
         # instances of turbo-hipster on the one host
@@ -56,7 +59,8 @@
         # in lib.utils.execute_to_log to work correctly.
         if not os.path.isdir(os.path.dirname(self.debug_log)):
             os.makedirs(os.path.dirname(self.debug_log))
-        logging.basicConfig(format='%(asctime)s %(name)s %(message)s',
+        logging.basicConfig(format='%(asctime)s %(name)-32s '
+                            '%(levelname)-8s %(message)s',
                             filename=self.debug_log, level=logging.DEBUG)
 
     def load_plugins(self):
@@ -73,9 +77,9 @@
             })
             self.log.debug('Plugin %s loaded' % plugin['name'])
 
-    def start_gearman_workers(self):
+    def start_zuul_client(self):
         """ Run the tasks """
-        self.log.debug('Starting gearman workers')
+        self.log.debug('Starting zuul client')
         self.zuul_client = worker_manager.ZuulClient(self.config,
                                                      self.worker_name)
 
@@ -91,29 +95,24 @@
             self.zuul_client.add_function(plugin['plugin_config']['function'],
                                           self.tasks[job_name])
 
-        self.zuul_client.register_functions()
-        self.zuul_client.daemon = True
         self.zuul_client.start()
 
+    def start_zuul_manager(self):
         self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
-        self.zuul_manager.daemon = True
         self.zuul_manager.start()
 
-    def exit_handler(self, signum):
+    def stop(self):
+        self._stop.set()
         self.log.debug('Exiting...')
-        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
-        for task_name, task in self.tasks.items():
-            task.stop()
-        self.manager.stop()
-        sys.exit(0)
+        self.zuul_client.stop()
+        self.zuul_manager.stop()
 
-    def main(self):
-        self.setup_logging()
-        self.start_gearman_workers()
+    def stopped(self):
+        return self._stop.isSet()
 
-        while True:
-            try:
-                signal.pause()
-            except KeyboardInterrupt:
-                print "Ctrl + C: asking tasks to exit nicely...\n"
-                self.exit_handler(signal.SIGINT)
+    def run(self):
+        self.start_zuul_client()
+        self.start_zuul_manager()
+        self.services_started = True
+        while not self.stopped():
+            self._stop.wait()