blob: c8df4e0f1107dac40520b3c93de2609cbdfdd1f4 [file] [log] [blame]
Joshua Hesketh0ddd6382013-07-26 10:33:36 +10001# Copyright ...
2
3import gear
4import json
5import logging
6import os
7import threading
8
9
10class GearmanManager(threading.Thread):
11
12 """ This thread manages all of the launched gearman workers.
13 As required by the zuul protocol it handles stopping builds when they
14 are cancelled through stop:rcbau-ci-manager-%hostname.
15 To do this it implements its own gearman worker waiting for events on
16 that manager. """
17
18 log = logging.getLogger("rcbau-ci.worker_manager.Manager")
19
20 def __init__(self, config, tasks):
21 super(GearmanManager, self).__init__()
22 self._stop = threading.Event()
23 self.config = config
24 self.tasks = tasks
25
26 self.gearman_worker = None
27 self.setup_gearman()
28
29 def setup_gearman(self):
30 hostname = os.uname()[1]
31 self.gearman_worker = gear.Worker('rcbau-manager-%s'
32 % hostname)
33 self.gearman_worker.addServer(
34 self.config['zuul_server']['gearman_host'],
35 self.config['zuul_server']['gearman_port']
36 )
37 self.gearman_worker.registerFunction(
38 'stop:rcbau-ci-manager-%s' % hostname)
39
40 def stop(self):
41 self._stop.set()
42 # Unblock gearman
43 self.log.debug("Telling gearman to stop waiting for jobs")
44 self.gearman_worker.stopWaitingForJobs()
45 self.gearman_worker.shutdown()
46
47 def stopped(self):
48 return self._stop.isSet()
49
50 def run(self):
51 while True and not self.stopped():
52 try:
53 # gearman_worker.getJob() blocks until a job is available
54 logging.debug("Waiting for job")
55 self.current_step = 0
56 job = self.gearman_worker.getJob()
57 self._handle_job(job)
58 except:
59 logging.exception('Exception retrieving log event.')
60
61 def _handle_job(self, job):
62 """ Handle the requested job """
63 try:
64 job_arguments = json.loads(job.arguments.decode('utf-8'))
65 self.tasks[job_arguments['name']].stop_worker(
66 job_arguments['number'])
67 except Exception as e:
68 self.log.exception('Exception handling log event.')
69 job.sendWorkException(str(e).encode('utf-8'))