| #!/usr/bin/python2 |
| # |
| # Copyright 2013 Rackspace Australia |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| |
| import logging |
| import os |
| import threading |
| import yaml |
| |
| import worker_manager |
| from os.path import join, isdir, isfile |
| |
| |
| class Server(threading.Thread): |
| |
| """ This is the worker server object to be daemonized """ |
| log = logging.getLogger("worker_server.Server") |
| |
| def __init__(self, config): |
| super(Server, self).__init__() |
| self._stop = threading.Event() |
| self.config = config |
| |
| # Load extra configuration first |
| # NOTE(Mattoliverau): debug_log might be specified in |
| # a conf.d snippet. |
| if 'conf_d' in self.config: |
| self.load_extra_configuration() |
| |
| # Python logging output file. |
| self.debug_log = self.config['debug_log'] |
| self.setup_logging(self.debug_log) |
| |
| # Config init |
| self.zuul_manager = None |
| self.zuul_client = None |
| self.services_started = False |
| |
| # TODO: Make me unique (random?) and we should be able to run multiple |
| # instances of turbo-hipster on the one host |
| self.worker_name = os.uname()[1] |
| |
| self.jobs = {} |
| self.load_jobs() |
| |
| def load_extra_configuration(self): |
| if isdir(self.config["conf_d"]): |
| extra_configs = (join(self.config["conf_d"], item) |
| for item in os.listdir(self.config["conf_d"]) |
| if isfile(join(self.config["conf_d"], item))) |
| for conf in extra_configs: |
| try: |
| with open(conf, 'r') as config_stream: |
| extra_config = yaml.safe_load(config_stream) |
| self.config.update(extra_config) |
| except: |
| self.log.warn("Failed to load extra configuration: '%s'" % |
| (conf)) |
| continue |
| else: |
| self.log.warn("conf_d parameter '%s' isn't a directory" % |
| (self.config["conf_d"])) |
| |
| def setup_logging(self, log_file=None): |
| if log_file: |
| if not os.path.isdir(os.path.dirname(log_file)): |
| os.makedirs(os.path.dirname(log_file)) |
| logging.basicConfig(format='%(asctime)s %(name)-32s ' |
| '%(levelname)-8s %(message)s', |
| filename=log_file, |
| level=logging.DEBUG) |
| |
| def load_jobs(self): |
| # Legacy, load the plugins first |
| self.load_plugins() |
| |
| self.log.debug("Loading jobs") |
| if 'jobs' in self.config: |
| for job in self.config['jobs']: |
| try: |
| plugin = 'shell_script' |
| if 'plugin' in job: |
| plugin = job['plugin'] |
| |
| module = __import__('turbo_hipster.task_plugins.' + |
| plugin + '.task', |
| fromlist='turbo_hipster.task_plugins' + |
| plugin) |
| |
| self.jobs[job['name']] = { |
| 'name': job['name'], |
| 'plugin': plugin, |
| 'job_config': job, |
| 'runner': module.Runner(self, job['name'], job), |
| } |
| self.log.debug('Job %s loaded' % job['name']) |
| except Exception as e: |
| self.log.exception("Failure loading job") |
| self.log.exception(e) |
| |
| def load_plugins(self): |
| """ Load the available plugins from task_plugins """ |
| self.log.debug('Loading plugins') |
| # Load plugins |
| if 'plugins' in self.config: |
| for plugin in self.config['plugins']: |
| try: |
| module = __import__('turbo_hipster.task_plugins.' + |
| plugin['name'] + '.task', |
| fromlist='turbo_hipster.task_plugins' + |
| plugin['name']) |
| |
| self.jobs[plugin['function']] = { |
| 'name': plugin['function'], |
| 'plugin': plugin['name'], |
| 'plugin_config': plugin, |
| 'runner': module.Runner( |
| self, plugin['function'], plugin |
| ), |
| } |
| self.log.debug('Job %s loaded' % plugin['function']) |
| except Exception as e: |
| self.log.exception("Failure loading plugin") |
| self.log.exception(e) |
| |
| def start_zuul_client(self): |
| """ Run the tasks """ |
| self.log.debug('Starting zuul client') |
| self.zuul_client = worker_manager.ZuulClient(self) |
| |
| for job in self.jobs.values(): |
| self.zuul_client.add_function(job['name'], job['runner']) |
| |
| self.zuul_client.start() |
| |
| def start_zuul_manager(self): |
| self.zuul_manager = worker_manager.ZuulManager(self, self.jobs) |
| self.zuul_manager.start() |
| |
| def shutdown_gracefully(self): |
| """ Shutdown while no work is currently happening """ |
| self.log.debug('Graceful shutdown once jobs are complete...') |
| thread = threading.Thread(target=self._shutdown_gracefully) |
| thread.start() |
| |
| def _shutdown_gracefully(self): |
| self.zuul_client.stop_gracefully() |
| self.zuul_manager.stop_gracefully() |
| self._stop.set() |
| |
| def shutdown(self): |
| self.log.debug('Shutting down now!...') |
| self.zuul_client.stop() |
| self.zuul_manager.stop() |
| self._stop.set() |
| |
| def stopped(self): |
| return self._stop.isSet() |
| |
| def run(self): |
| self.start_zuul_client() |
| self.start_zuul_manager() |
| self.services_started = True |
| while not self.stopped(): |
| self._stop.wait() |