blob: 72034fb367367131a3d4c6101a22cb9628ac3f32 [file] [log] [blame]
Joshua Hesketh39a0fee2013-07-31 12:00:53 +10001# Copyright 2013 Rackspace Australia
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100015
16import gear
17import json
18import logging
19import os
20import threading
21
22
23class GearmanManager(threading.Thread):
24
25 """ This thread manages all of the launched gearman workers.
26 As required by the zuul protocol it handles stopping builds when they
Joshua Hesketh363d0042013-07-26 11:44:07 +100027 are cancelled through stop:turbo-hipster-manager-%hostname.
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100028 To do this it implements its own gearman worker waiting for events on
29 that manager. """
30
Joshua Hesketh363d0042013-07-26 11:44:07 +100031 log = logging.getLogger("worker_manager.GearmanManager")
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100032
33 def __init__(self, config, tasks):
34 super(GearmanManager, self).__init__()
35 self._stop = threading.Event()
36 self.config = config
37 self.tasks = tasks
38
39 self.gearman_worker = None
40 self.setup_gearman()
41
42 def setup_gearman(self):
43 hostname = os.uname()[1]
Joshua Hesketh363d0042013-07-26 11:44:07 +100044 self.gearman_worker = gear.Worker('turbo-hipster-manager-%s'
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100045 % hostname)
46 self.gearman_worker.addServer(
47 self.config['zuul_server']['gearman_host'],
48 self.config['zuul_server']['gearman_port']
49 )
50 self.gearman_worker.registerFunction(
Joshua Hesketh363d0042013-07-26 11:44:07 +100051 'stop:turbo-hipster-manager-%s' % hostname)
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100052
53 def stop(self):
54 self._stop.set()
55 # Unblock gearman
56 self.log.debug("Telling gearman to stop waiting for jobs")
57 self.gearman_worker.stopWaitingForJobs()
58 self.gearman_worker.shutdown()
59
60 def stopped(self):
61 return self._stop.isSet()
62
63 def run(self):
64 while True and not self.stopped():
65 try:
66 # gearman_worker.getJob() blocks until a job is available
67 logging.debug("Waiting for job")
68 self.current_step = 0
69 job = self.gearman_worker.getJob()
70 self._handle_job(job)
71 except:
72 logging.exception('Exception retrieving log event.')
73
74 def _handle_job(self, job):
75 """ Handle the requested job """
76 try:
77 job_arguments = json.loads(job.arguments.decode('utf-8'))
78 self.tasks[job_arguments['name']].stop_worker(
79 job_arguments['number'])
Joshua Hesketh4a30d272013-08-12 11:17:25 +100080 job.sendWorkComplete()
Joshua Hesketh0ddd6382013-07-26 10:33:36 +100081 except Exception as e:
82 self.log.exception('Exception handling log event.')
83 job.sendWorkException(str(e).encode('utf-8'))