blob: 3923d357eff4cfbd106407f8335bd112955e8088 [file] [log] [blame]
Joshua Hesketh39a0fee2013-07-31 12:00:53 +10001# Copyright 2013 Rackspace Australia
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100015
16import gear
17import json
18import logging
19import os
20import threading
21
22
Joshua Hesketh4343b952013-11-20 12:11:55 +110023class ZuulManager(threading.Thread):
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100024
25 """ This thread manages all of the launched gearman workers.
26 As required by the zuul protocol it handles stopping builds when they
Joshua Hesketh363d0042013-07-26 11:44:07 +100027 are cancelled through stop:turbo-hipster-manager-%hostname.
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100028 To do this it implements its own gearman worker waiting for events on
29 that manager. """
30
Joshua Hesketh6eb5fdc2013-11-20 12:36:06 +110031 log = logging.getLogger("worker_manager.ZuulManager")
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100032
33 def __init__(self, config, tasks):
Joshua Hesketh4343b952013-11-20 12:11:55 +110034 super(ZuulManager, self).__init__()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100035 self._stop = threading.Event()
36 self.config = config
37 self.tasks = tasks
38
39 self.gearman_worker = None
40 self.setup_gearman()
41
42 def setup_gearman(self):
43 hostname = os.uname()[1]
Joshua Hesketh363d0042013-07-26 11:44:07 +100044 self.gearman_worker = gear.Worker('turbo-hipster-manager-%s'
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100045 % hostname)
46 self.gearman_worker.addServer(
47 self.config['zuul_server']['gearman_host'],
48 self.config['zuul_server']['gearman_port']
49 )
Joshua Hesketh9cd2f932014-03-05 16:49:22 +110050
51 def register_functions(self):
52 hostname = os.uname()[1]
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100053 self.gearman_worker.registerFunction(
Joshua Hesketh363d0042013-07-26 11:44:07 +100054 'stop:turbo-hipster-manager-%s' % hostname)
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100055
56 def stop(self):
57 self._stop.set()
58 # Unblock gearman
59 self.log.debug("Telling gearman to stop waiting for jobs")
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100060 self.gearman_worker.shutdown()
61
62 def stopped(self):
63 return self._stop.isSet()
64
65 def run(self):
Joshua Hesketh81b5fb82014-03-05 13:06:08 +110066 while not self.stopped():
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100067 try:
68 # gearman_worker.getJob() blocks until a job is available
Joshua Heskethdd50d272014-01-05 15:35:52 +110069 self.log.debug("Waiting for server")
70 self.gearman_worker.waitForServer()
Joshua Hesketh81b5fb82014-03-05 13:06:08 +110071 if (not self.stopped() and self.gearman_worker.running and
72 self.gearman_worker.active_connections):
Joshua Hesketh9cd2f932014-03-05 16:49:22 +110073 self.register_functions()
74 self.gearman_worker.waitForServer()
Joshua Hesketh81b5fb82014-03-05 13:06:08 +110075 logging.debug("Waiting for job")
76 self.current_step = 0
77 job = self.gearman_worker.getJob()
78 self._handle_job(job)
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100079 except:
80 logging.exception('Exception retrieving log event.')
Joshua Hesketh81b5fb82014-03-05 13:06:08 +110081 self.log.debug("Finished manager thread")
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100082
83 def _handle_job(self, job):
84 """ Handle the requested job """
85 try:
86 job_arguments = json.loads(job.arguments.decode('utf-8'))
Joshua Hesketh38a17182014-03-05 14:19:38 +110087 self.tasks[job_arguments['name']].stop_working(
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100088 job_arguments['number'])
Joshua Hesketh4a30d272013-08-12 11:17:25 +100089 job.sendWorkComplete()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100090 except Exception as e:
Joshua Heskethdd50d272014-01-05 15:35:52 +110091 self.log.exception('Exception waiting for management job.')
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100092 job.sendWorkException(str(e).encode('utf-8'))
Joshua Hesketh4343b952013-11-20 12:11:55 +110093
94
95class ZuulClient(threading.Thread):
96
97 """ ..."""
98
99 log = logging.getLogger("worker_manager.ZuulClient")
100
101 def __init__(self, global_config, worker_name):
102 super(ZuulClient, self).__init__()
103 self._stop = threading.Event()
104 self.global_config = global_config
105
106 self.worker_name = worker_name
107
108 # Set up the runner worker
109 self.gearman_worker = None
110 self.functions = {}
111
112 self.job = None
Joshua Hesketh4343b952013-11-20 12:11:55 +1100113
114 self.setup_gearman()
115
116 def setup_gearman(self):
117 self.log.debug("Set up gearman worker")
118 self.gearman_worker = gear.Worker(self.worker_name)
119 self.gearman_worker.addServer(
120 self.global_config['zuul_server']['gearman_host'],
121 self.global_config['zuul_server']['gearman_port']
122 )
Joshua Hesketh4343b952013-11-20 12:11:55 +1100123
124 def register_functions(self):
Joshua Heskethfc449062013-11-20 16:06:06 +1100125 self.log.debug("Register functions with gearman")
Joshua Hesketh4343b952013-11-20 12:11:55 +1100126 for function_name, plugin in self.functions.items():
127 self.gearman_worker.registerFunction(function_name)
Joshua Heskethb39f8842013-11-21 13:12:00 +1100128 self.log.debug(self.gearman_worker.functions)
Joshua Hesketh4343b952013-11-20 12:11:55 +1100129
130 def add_function(self, function_name, plugin):
Joshua Heskethfc449062013-11-20 16:06:06 +1100131 self.log.debug("Add function, %s, to list" % function_name)
Joshua Hesketh4343b952013-11-20 12:11:55 +1100132 self.functions[function_name] = plugin
133
134 def stop(self):
135 self._stop.set()
Joshua Hesketh38a17182014-03-05 14:19:38 +1100136 for task in self.functions.values():
137 task.stop_working()
Joshua Hesketh4343b952013-11-20 12:11:55 +1100138 # Unblock gearman
139 self.log.debug("Telling gearman to stop waiting for jobs")
Joshua Hesketh4343b952013-11-20 12:11:55 +1100140 self.gearman_worker.shutdown()
141
142 def stopped(self):
143 return self._stop.isSet()
144
145 def run(self):
Joshua Hesketh81b5fb82014-03-05 13:06:08 +1100146 while not self.stopped():
Joshua Hesketh4343b952013-11-20 12:11:55 +1100147 try:
Joshua Hesketh4343b952013-11-20 12:11:55 +1100148 # gearman_worker.getJob() blocks until a job is available
Joshua Heskethdd50d272014-01-05 15:35:52 +1100149 self.log.debug("Waiting for server")
150 self.gearman_worker.waitForServer()
Joshua Hesketh81b5fb82014-03-05 13:06:08 +1100151 if (not self.stopped() and self.gearman_worker.running and
152 self.gearman_worker.active_connections):
Joshua Hesketh9cd2f932014-03-05 16:49:22 +1100153 self.register_functions()
154 self.gearman_worker.waitForServer()
Joshua Hesketh81b5fb82014-03-05 13:06:08 +1100155 self.log.debug("Waiting for job")
156 self.job = self.gearman_worker.getJob()
157 self._handle_job()
Joshua Hesketh4343b952013-11-20 12:11:55 +1100158 except:
Joshua Heskethdd50d272014-01-05 15:35:52 +1100159 self.log.exception('Exception waiting for job.')
Joshua Hesketh81b5fb82014-03-05 13:06:08 +1100160 self.log.debug("Finished client thread")
Joshua Hesketh4343b952013-11-20 12:11:55 +1100161
162 def _handle_job(self):
163 """ We have a job, give it to the right plugin """
Joshua Hesketh81b5fb82014-03-05 13:06:08 +1100164 if self.job:
165 self.log.debug("We have a job, we'll launch the task now.")
166 self.functions[self.job.name].start_job(self.job)