blob: dd13c0870be5dbd3a75f11a3bd929567e0bf601e [file] [log] [blame]
Joshua Hesketh0ddd6382013-07-26 10:33:36 +10001#!/usr/bin/python2
2#
Joshua Hesketh39a0fee2013-07-31 12:00:53 +10003# Copyright 2013 Rackspace Australia
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100016
Joshua Hesketh1377f6f2013-07-26 12:02:15 +100017
18""" worker_server.py is an executable worker server that loads and runs
19task_plugins. """
20
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100021import logging
22import os
23import signal
24import sys
25
26import worker_manager
27
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100028
29class Server(object):
30
31 """ This is the worker server object to be daemonized """
Joshua Hesketh363d0042013-07-26 11:44:07 +100032 log = logging.getLogger("worker_server.Server")
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100033
34 def __init__(self, config):
Joshua Hesketh83269072013-11-20 12:22:19 +110035 self.config = config
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100036 # Python logging output file.
37 self.debug_log = self.config['debug_log']
38
Joshua Hesketh4343b952013-11-20 12:11:55 +110039 # Config init
Joshua Hesketh4343b952013-11-20 12:11:55 +110040 self.zuul_manager = None
41 self.zuul_client = None
42 self.plugins = []
Joshua Heskethb39f8842013-11-21 13:12:00 +110043
44 # TODO: Make me unique (random?) and we should be able to run multiple
45 # instances of turbo-hipster on the one host
Joshua Hesketh4343b952013-11-20 12:11:55 +110046 self.worker_name = os.uname()[1]
47
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100048 self.tasks = {}
Joshua Hesketh4343b952013-11-20 12:11:55 +110049 self.load_plugins()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100050
51 def setup_logging(self):
Michael Still89e4c6d2014-01-09 17:38:24 +110052 if not self.debug_log:
53 raise Exception('Debug log not configured')
54
55 # NOTE(mikal): debug logging _must_ be enabled for the log writing
56 # in lib.utils.execute_to_log to work correctly.
57 if not os.path.isdir(os.path.dirname(self.debug_log)):
58 os.makedirs(os.path.dirname(self.debug_log))
59 logging.basicConfig(format='%(asctime)s %(name)s %(message)s',
60 filename=self.debug_log, level=logging.DEBUG)
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100061
62 def load_plugins(self):
63 """ Load the available plugins from task_plugins """
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +110064 self.log.debug('Loading plugins')
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100065 # Load plugins
Joshua Hesketh6b6700b2013-07-26 16:47:32 +100066 for plugin in self.config['plugins']:
Joshua Hesketha74f49d2013-09-06 15:52:49 +100067 self.plugins.append({
68 'module': __import__('turbo_hipster.task_plugins.' +
Joshua Hesketh3b74fcd2013-09-06 16:19:52 +100069 plugin['name'] + '.task',
70 fromlist='turbo_hipster.task_plugins' +
71 plugin['name']),
Joshua Hesketh4acd7162013-09-06 16:05:37 +100072 'plugin_config': plugin
Joshua Hesketha74f49d2013-09-06 15:52:49 +100073 })
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +110074 self.log.debug('Plugin %s loaded' % plugin['name'])
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100075
Joshua Hesketh4343b952013-11-20 12:11:55 +110076 def start_gearman_workers(self):
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100077 """ Run the tasks """
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +110078 self.log.debug('Starting gearman workers')
Joshua Hesketh4343b952013-11-20 12:11:55 +110079 self.zuul_client = worker_manager.ZuulClient(self.config,
80 self.worker_name)
81
82 for task_number, plugin in enumerate(self.plugins):
Joshua Hesketha74f49d2013-09-06 15:52:49 +100083 module = plugin['module']
Joshua Hesketh4343b952013-11-20 12:11:55 +110084 job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
85 self.worker_name, task_number)
86 self.tasks[job_name] = module.Runner(
Joshua Hesketh12176932013-09-17 13:34:46 +100087 self.config,
Joshua Hesketh527966b2013-11-19 12:01:06 +110088 plugin['plugin_config'],
Joshua Hesketh4343b952013-11-20 12:11:55 +110089 job_name
Joshua Hesketh12176932013-09-17 13:34:46 +100090 )
Joshua Hesketh4343b952013-11-20 12:11:55 +110091 self.zuul_client.add_function(plugin['plugin_config']['function'],
92 self.tasks[job_name])
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100093
Joshua Hesketh4343b952013-11-20 12:11:55 +110094 self.zuul_client.register_functions()
95 self.zuul_client.daemon = True
96 self.zuul_client.start()
97
98 self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
99 self.zuul_manager.daemon = True
100 self.zuul_manager.start()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +1000101
102 def exit_handler(self, signum):
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +1100103 self.log.debug('Exiting...')
Joshua Hesketh0ddd6382013-07-26 10:33:36 +1000104 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
105 for task_name, task in self.tasks.items():
106 task.stop()
107 self.manager.stop()
108 sys.exit(0)
109
110 def main(self):
111 self.setup_logging()
Joshua Hesketh4343b952013-11-20 12:11:55 +1100112 self.start_gearman_workers()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +1000113
114 while True:
115 try:
116 signal.pause()
117 except KeyboardInterrupt:
118 print "Ctrl + C: asking tasks to exit nicely...\n"
119 self.exit_handler(signal.SIGINT)