blob: fa6f7c0c9c7964f33cfba026d07394d023b56b36 [file] [log] [blame]
Joshua Hesketh0ddd6382013-07-26 10:33:36 +10001#!/usr/bin/python2
2#
Joshua Hesketh39a0fee2013-07-31 12:00:53 +10003# Copyright 2013 Rackspace Australia
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100016
Joshua Hesketh1377f6f2013-07-26 12:02:15 +100017
18""" worker_server.py is an executable worker server that loads and runs
19task_plugins. """
20
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100021import logging
22import os
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110023import threading
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100024
25import worker_manager
26
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100027
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110028class Server(threading.Thread):
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100029
30 """ This is the worker server object to be daemonized """
Joshua Hesketh363d0042013-07-26 11:44:07 +100031 log = logging.getLogger("worker_server.Server")
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100032
33 def __init__(self, config):
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110034 super(Server, self).__init__()
35 self._stop = threading.Event()
Joshua Hesketh83269072013-11-20 12:22:19 +110036 self.config = config
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100037 # Python logging output file.
38 self.debug_log = self.config['debug_log']
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110039 self.setup_logging()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100040
Joshua Hesketh4343b952013-11-20 12:11:55 +110041 # Config init
Joshua Hesketh4343b952013-11-20 12:11:55 +110042 self.zuul_manager = None
43 self.zuul_client = None
44 self.plugins = []
Joshua Heskethb39f8842013-11-21 13:12:00 +110045
46 # TODO: Make me unique (random?) and we should be able to run multiple
47 # instances of turbo-hipster on the one host
Joshua Hesketh4343b952013-11-20 12:11:55 +110048 self.worker_name = os.uname()[1]
49
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100050 self.tasks = {}
Joshua Hesketh4343b952013-11-20 12:11:55 +110051 self.load_plugins()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100052
53 def setup_logging(self):
Michael Still89e4c6d2014-01-09 17:38:24 +110054 if not self.debug_log:
55 raise Exception('Debug log not configured')
56
57 # NOTE(mikal): debug logging _must_ be enabled for the log writing
58 # in lib.utils.execute_to_log to work correctly.
59 if not os.path.isdir(os.path.dirname(self.debug_log)):
60 os.makedirs(os.path.dirname(self.debug_log))
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110061 logging.basicConfig(format='%(asctime)s %(name)-32s '
62 '%(levelname)-8s %(message)s',
Michael Still89e4c6d2014-01-09 17:38:24 +110063 filename=self.debug_log, level=logging.DEBUG)
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100064
65 def load_plugins(self):
66 """ Load the available plugins from task_plugins """
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +110067 self.log.debug('Loading plugins')
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100068 # Load plugins
Joshua Hesketh6b6700b2013-07-26 16:47:32 +100069 for plugin in self.config['plugins']:
Joshua Hesketha74f49d2013-09-06 15:52:49 +100070 self.plugins.append({
71 'module': __import__('turbo_hipster.task_plugins.' +
Joshua Hesketh3b74fcd2013-09-06 16:19:52 +100072 plugin['name'] + '.task',
73 fromlist='turbo_hipster.task_plugins' +
74 plugin['name']),
Joshua Hesketh4acd7162013-09-06 16:05:37 +100075 'plugin_config': plugin
Joshua Hesketha74f49d2013-09-06 15:52:49 +100076 })
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +110077 self.log.debug('Plugin %s loaded' % plugin['name'])
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100078
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110079 def start_zuul_client(self):
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100080 """ Run the tasks """
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +110081 self.log.debug('Starting zuul client')
Joshua Hesketh4343b952013-11-20 12:11:55 +110082 self.zuul_client = worker_manager.ZuulClient(self.config,
83 self.worker_name)
84
85 for task_number, plugin in enumerate(self.plugins):
Joshua Hesketha74f49d2013-09-06 15:52:49 +100086 module = plugin['module']
Joshua Hesketh4343b952013-11-20 12:11:55 +110087 job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
88 self.worker_name, task_number)
89 self.tasks[job_name] = module.Runner(
Joshua Hesketh12176932013-09-17 13:34:46 +100090 self.config,
Joshua Hesketh527966b2013-11-19 12:01:06 +110091 plugin['plugin_config'],
Joshua Hesketh4343b952013-11-20 12:11:55 +110092 job_name
Joshua Hesketh12176932013-09-17 13:34:46 +100093 )
Joshua Hesketh4343b952013-11-20 12:11:55 +110094 self.zuul_client.add_function(plugin['plugin_config']['function'],
95 self.tasks[job_name])
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100096
Joshua Hesketh4343b952013-11-20 12:11:55 +110097 self.zuul_client.register_functions()
Joshua Hesketh4343b952013-11-20 12:11:55 +110098 self.zuul_client.start()
99
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +1100100 def start_zuul_manager(self):
Joshua Hesketh4343b952013-11-20 12:11:55 +1100101 self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
Joshua Hesketh4343b952013-11-20 12:11:55 +1100102 self.zuul_manager.start()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +1000103
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +1100104 def stop(self):
105 self._stop.set()
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +1100106 self.log.debug('Exiting...')
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +1100107 self.zuul_client.stop()
108 self.zuul_manager.stop()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +1000109
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +1100110 def stopped(self):
111 return self._stop.isSet()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +1000112
Joshua Hesketh1d8c4e52014-03-05 14:15:31 +1100113 def run(self):
114 self.start_zuul_client()
115 self.start_zuul_manager()
116 while not self.stopped():
117 self._stop.wait()