blob: a1cf47b01e8a41e3990a3fffebc2f388e67decc4 [file] [log] [blame]
# Copyright 2013 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
import logging
import os
import threading
import time
class ZuulManager(threading.Thread):
""" This thread manages all of the launched gearman workers.
As required by the zuul protocol it handles stopping builds when they
are cancelled through stop:turbo-hipster-manager-%hostname.
To do this it implements its own gearman worker waiting for events on
that manager. """
log = logging.getLogger("worker_manager.ZuulManager")
def __init__(self, worker_server, tasks):
super(ZuulManager, self).__init__()
self._stop = threading.Event()
self.stopping = False
self.running = False
self.worker_server = worker_server
self.tasks = tasks
self.gearman_worker = None
self.setup_gearman()
def setup_gearman(self):
hostname = os.uname()[1]
self.gearman_worker = gear.Worker('turbo-hipster-manager-%s'
% hostname)
self.gearman_worker.addServer(
self.worker_server.config['zuul_server']['gearman_host'],
self.worker_server.config['zuul_server']['gearman_port']
)
def register_functions(self):
hostname = os.uname()[1]
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
def stop_gracefully(self):
self.stopping = True
self.gearman_worker.stopWaitingForJobs()
while self.running:
self.log.debug('waiting to finish')
time.sleep(0.1)
self._stop.set()
self.gearman_worker.shutdown()
def stop(self):
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
while not self.stopped() and not self.stopping:
self.running = True
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
self.gearman_worker.waitForServer()
if (not self.stopped() and self.gearman_worker.running and
self.gearman_worker.active_connections):
self.register_functions()
self.gearman_worker.waitForServer()
logging.debug("Waiting for job")
self.current_step = 0
job = self.gearman_worker.getJob()
self._handle_job(job)
except gear.InterruptedError:
self.log.debug('We were asked to stop waiting for jobs')
except:
self.log.exception('Unknown exception waiting for job.')
self.running = False
self.log.debug("Finished manager thread")
def _handle_job(self, job):
""" Handle the requested job """
try:
job_arguments = json.loads(job.arguments.decode('utf-8'))
self.tasks[job_arguments['name']].stop_working(
job_arguments['number'])
job.sendWorkComplete()
except Exception as e:
self.log.exception('Exception waiting for management job.')
job.sendWorkException(str(e).encode('utf-8'))
class ZuulClient(threading.Thread):
""" ..."""
log = logging.getLogger("worker_manager.ZuulClient")
def __init__(self, worker_server):
super(ZuulClient, self).__init__()
self._stop = threading.Event()
self.stopping = False
self.running = False
self.worker_server = worker_server
# Set up the runner worker
self.gearman_worker = None
self.functions = {}
self.job = None
self.setup_gearman()
def setup_gearman(self):
self.log.debug("Set up gearman worker")
self.gearman_worker = gear.Worker(self.worker_server.worker_name)
self.gearman_worker.addServer(
self.worker_server.config['zuul_server']['gearman_host'],
self.worker_server.config['zuul_server']['gearman_port']
)
def register_functions(self):
self.log.debug("Register functions with gearman")
for function_name, plugin in self.functions.items():
self.gearman_worker.registerFunction(function_name)
self.log.debug(self.gearman_worker.functions)
def add_function(self, function_name, plugin):
self.log.debug("Add function, %s, to list" % function_name)
self.functions[function_name] = plugin
def stop(self):
self._stop.set()
for task in self.functions.values():
task.stop_working()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stop_gracefully(self):
self.stopping = True
self.gearman_worker.stopWaitingForJobs()
while self.running:
time.sleep(0.1)
self._stop.set()
self.gearman_worker.shutdown()
def stopped(self):
return self._stop.isSet()
def run(self):
while not self.stopped() and not self.stopping:
self.running = True
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
self.gearman_worker.waitForServer()
if (not self.stopped() and self.gearman_worker.running and
self.gearman_worker.active_connections):
self.register_functions()
self.gearman_worker.waitForServer()
self.log.debug("Waiting for job")
self.job = self.gearman_worker.getJob()
self._handle_job()
except gear.InterruptedError:
self.log.debug('We were asked to stop waiting for jobs')
except:
self.log.exception('Unknown exception waiting for job.')
self.running = False
self.log.debug("Finished client thread")
def _handle_job(self):
""" We have a job, give it to the right plugin """
if self.job:
self.log.debug("We have a job, we'll launch the task now.")
self.functions[self.job.name].start_job(self.job)