Merge "Use openstack's pypi mirror"
diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py
index 44d2604..d54978d 100644
--- a/turbo_hipster/cmd/server.py
+++ b/turbo_hipster/cmd/server.py
@@ -20,6 +20,7 @@
import extras
import json
import os
+import signal
import sys
from turbo_hipster import worker_server
@@ -29,7 +30,32 @@
PID_FILE_MODULE = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
-def main():
+def main(args):
+
+ with open(args.config, 'r') as config_stream:
+ config = json.load(config_stream)
+
+ server = worker_server.Server(config)
+
+ def term_handler(signum, frame):
+ server.stop()
+ signal.signal(signal.SIGTERM, term_handler)
+
+ if args.background:
+ server.daemon = True
+ server.start()
+
+ while not server.stopped():
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print "Ctrl + C: asking tasks to exit nicely...\n"
+ server.stop()
+
+
+if __name__ == '__main__':
+ sys.path.insert(0, os.path.abspath(
+ os.path.join(os.path.dirname(__file__), '../')))
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config',
default=
@@ -42,21 +68,9 @@
'turbo-hipster-worker-server.pid',
help='PID file to lock during daemonization.')
args = parser.parse_args()
-
- with open(args.config, 'r') as config_stream:
- config = json.load(config_stream)
-
- server = worker_server.Server(config)
-
if args.background:
pidfile = PID_FILE_MODULE.TimeoutPIDLockFile(args.pidfile, 10)
with daemon.DaemonContext(pidfile=pidfile):
- server.main()
+ main(args)
else:
- server.main()
-
-
-if __name__ == '__main__':
- sys.path.insert(0, os.path.abspath(
- os.path.join(os.path.dirname(__file__), '../')))
- main()
+ main(args)
diff --git a/turbo_hipster/lib/models.py b/turbo_hipster/lib/models.py
index d445379..cd73a73 100644
--- a/turbo_hipster/lib/models.py
+++ b/turbo_hipster/lib/models.py
@@ -23,7 +23,7 @@
class Task(object):
-
+ """ A base object for running a job (aka Task) """
log = logging.getLogger("lib.models.Task")
def __init__(self, global_config, plugin_config, job_name):
@@ -68,11 +68,11 @@
if not self.cancelled:
self.job.sendWorkException(str(e).encode('utf-8'))
- def stop_worker(self, number):
- # Check the number is for this job instance
+ def stop_working(self, number=None):
+ # Check the number is for this job instance (None will cancel all)
# (makes it possible to run multiple workers with this task
# on this server)
- if number == self.job.unique:
+ if number is None or number == self.job.unique:
self.log.debug("We've been asked to stop by our gearman manager")
self.cancelled = True
# TODO: Work out how to kill current step
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index 746e4f7..3923d35 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -47,7 +47,9 @@
self.config['zuul_server']['gearman_host'],
self.config['zuul_server']['gearman_port']
)
- self.gearman_worker.waitForServer()
+
+ def register_functions(self):
+ hostname = os.uname()[1]
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
@@ -55,30 +57,34 @@
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
- self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
- while True and not self.stopped():
+ while not self.stopped():
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
self.gearman_worker.waitForServer()
- logging.debug("Waiting for job")
- self.current_step = 0
- job = self.gearman_worker.getJob()
- self._handle_job(job)
+ if (not self.stopped() and self.gearman_worker.running and
+ self.gearman_worker.active_connections):
+ self.register_functions()
+ self.gearman_worker.waitForServer()
+ logging.debug("Waiting for job")
+ self.current_step = 0
+ job = self.gearman_worker.getJob()
+ self._handle_job(job)
except:
logging.exception('Exception retrieving log event.')
+ self.log.debug("Finished manager thread")
def _handle_job(self, job):
""" Handle the requested job """
try:
job_arguments = json.loads(job.arguments.decode('utf-8'))
- self.tasks[job_arguments['name']].stop_worker(
+ self.tasks[job_arguments['name']].stop_working(
job_arguments['number'])
job.sendWorkComplete()
except Exception as e:
@@ -104,7 +110,6 @@
self.functions = {}
self.job = None
- self.cancelled = False
self.setup_gearman()
@@ -119,7 +124,6 @@
def register_functions(self):
self.log.debug("Register functions with gearman")
for function_name, plugin in self.functions.items():
- self.gearman_worker.waitForServer()
self.gearman_worker.registerFunction(function_name)
self.log.debug(self.gearman_worker.functions)
@@ -129,28 +133,34 @@
def stop(self):
self._stop.set()
+ for task in self.functions.values():
+ task.stop_working()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
- self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
- while True and not self.stopped():
+ while not self.stopped():
try:
- self.cancelled = False
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
self.gearman_worker.waitForServer()
- self.log.debug("Waiting for job")
- self.job = self.gearman_worker.getJob()
- self._handle_job()
+ if (not self.stopped() and self.gearman_worker.running and
+ self.gearman_worker.active_connections):
+ self.register_functions()
+ self.gearman_worker.waitForServer()
+ self.log.debug("Waiting for job")
+ self.job = self.gearman_worker.getJob()
+ self._handle_job()
except:
self.log.exception('Exception waiting for job.')
+ self.log.debug("Finished client thread")
def _handle_job(self):
""" We have a job, give it to the right plugin """
- self.log.debug("We have a job, we'll launch the task now.")
- self.functions[self.job.name].start_job(self.job)
+ if self.job:
+ self.log.debug("We have a job, we'll launch the task now.")
+ self.functions[self.job.name].start_job(self.job)
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index dd13c08..d2edf18 100755
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -20,26 +20,29 @@
import logging
import os
-import signal
-import sys
+import threading
import worker_manager
-class Server(object):
+class Server(threading.Thread):
""" This is the worker server object to be daemonized """
log = logging.getLogger("worker_server.Server")
def __init__(self, config):
+ super(Server, self).__init__()
+ self._stop = threading.Event()
self.config = config
# Python logging output file.
self.debug_log = self.config['debug_log']
+ self.setup_logging()
# Config init
self.zuul_manager = None
self.zuul_client = None
self.plugins = []
+ self.services_started = False
# TODO: Make me unique (random?) and we should be able to run multiple
# instances of turbo-hipster on the one host
@@ -56,7 +59,8 @@
# in lib.utils.execute_to_log to work correctly.
if not os.path.isdir(os.path.dirname(self.debug_log)):
os.makedirs(os.path.dirname(self.debug_log))
- logging.basicConfig(format='%(asctime)s %(name)s %(message)s',
+ logging.basicConfig(format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s',
filename=self.debug_log, level=logging.DEBUG)
def load_plugins(self):
@@ -73,9 +77,9 @@
})
self.log.debug('Plugin %s loaded' % plugin['name'])
- def start_gearman_workers(self):
+ def start_zuul_client(self):
""" Run the tasks """
- self.log.debug('Starting gearman workers')
+ self.log.debug('Starting zuul client')
self.zuul_client = worker_manager.ZuulClient(self.config,
self.worker_name)
@@ -91,29 +95,24 @@
self.zuul_client.add_function(plugin['plugin_config']['function'],
self.tasks[job_name])
- self.zuul_client.register_functions()
- self.zuul_client.daemon = True
self.zuul_client.start()
+ def start_zuul_manager(self):
self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
- self.zuul_manager.daemon = True
self.zuul_manager.start()
- def exit_handler(self, signum):
+ def stop(self):
+ self._stop.set()
self.log.debug('Exiting...')
- signal.signal(signal.SIGUSR1, signal.SIG_IGN)
- for task_name, task in self.tasks.items():
- task.stop()
- self.manager.stop()
- sys.exit(0)
+ self.zuul_client.stop()
+ self.zuul_manager.stop()
- def main(self):
- self.setup_logging()
- self.start_gearman_workers()
+ def stopped(self):
+ return self._stop.isSet()
- while True:
- try:
- signal.pause()
- except KeyboardInterrupt:
- print "Ctrl + C: asking tasks to exit nicely...\n"
- self.exit_handler(signal.SIGINT)
+ def run(self):
+ self.start_zuul_client()
+ self.start_zuul_manager()
+ self.services_started = True
+ while not self.stopped():
+ self._stop.wait()