Joshua Hesketh | 0ddd638 | 2013-07-26 10:33:36 +1000 | [diff] [blame^] | 1 | # Copyright ... |
| 2 | |
| 3 | import gear |
| 4 | import json |
| 5 | import logging |
| 6 | import os |
| 7 | import threading |
| 8 | |
| 9 | |
| 10 | class GearmanManager(threading.Thread): |
| 11 | |
| 12 | """ This thread manages all of the launched gearman workers. |
| 13 | As required by the zuul protocol it handles stopping builds when they |
| 14 | are cancelled through stop:rcbau-ci-manager-%hostname. |
| 15 | To do this it implements its own gearman worker waiting for events on |
| 16 | that manager. """ |
| 17 | |
| 18 | log = logging.getLogger("rcbau-ci.worker_manager.Manager") |
| 19 | |
| 20 | def __init__(self, config, tasks): |
| 21 | super(GearmanManager, self).__init__() |
| 22 | self._stop = threading.Event() |
| 23 | self.config = config |
| 24 | self.tasks = tasks |
| 25 | |
| 26 | self.gearman_worker = None |
| 27 | self.setup_gearman() |
| 28 | |
| 29 | def setup_gearman(self): |
| 30 | hostname = os.uname()[1] |
| 31 | self.gearman_worker = gear.Worker('rcbau-manager-%s' |
| 32 | % hostname) |
| 33 | self.gearman_worker.addServer( |
| 34 | self.config['zuul_server']['gearman_host'], |
| 35 | self.config['zuul_server']['gearman_port'] |
| 36 | ) |
| 37 | self.gearman_worker.registerFunction( |
| 38 | 'stop:rcbau-ci-manager-%s' % hostname) |
| 39 | |
| 40 | def stop(self): |
| 41 | self._stop.set() |
| 42 | # Unblock gearman |
| 43 | self.log.debug("Telling gearman to stop waiting for jobs") |
| 44 | self.gearman_worker.stopWaitingForJobs() |
| 45 | self.gearman_worker.shutdown() |
| 46 | |
| 47 | def stopped(self): |
| 48 | return self._stop.isSet() |
| 49 | |
| 50 | def run(self): |
| 51 | while True and not self.stopped(): |
| 52 | try: |
| 53 | # gearman_worker.getJob() blocks until a job is available |
| 54 | logging.debug("Waiting for job") |
| 55 | self.current_step = 0 |
| 56 | job = self.gearman_worker.getJob() |
| 57 | self._handle_job(job) |
| 58 | except: |
| 59 | logging.exception('Exception retrieving log event.') |
| 60 | |
| 61 | def _handle_job(self, job): |
| 62 | """ Handle the requested job """ |
| 63 | try: |
| 64 | job_arguments = json.loads(job.arguments.decode('utf-8')) |
| 65 | self.tasks[job_arguments['name']].stop_worker( |
| 66 | job_arguments['number']) |
| 67 | except Exception as e: |
| 68 | self.log.exception('Exception handling log event.') |
| 69 | job.sendWorkException(str(e).encode('utf-8')) |