| #!/usr/bin/python2 |
| # |
| # Copyright 2013 Rackspace Australia |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import gear |
| import json |
| import threading |
| import os |
| import re |
| import time |
| |
| from turbo_hipster.worker_manager import GearmanManager |
| from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\ |
| as RealDbUpgradeRunner |
| |
| |
| class FakeGearmanManager(GearmanManager): |
| def __init__(self, config, tasks, test): |
| self.test = test |
| super(FakeGearmanManager, self).__init__(config, tasks) |
| |
| def setup_gearman(self): |
| hostname = os.uname()[1] |
| self.gearman_worker = FakeWorker('turbo-hipster-manager-%s' |
| % hostname, self.test) |
| self.gearman_worker.addServer( |
| self.config['zuul_server']['gearman_host'], |
| self.config['zuul_server']['gearman_port'] |
| ) |
| self.gearman_worker.registerFunction( |
| 'stop:turbo-hipster-manager-%s' % hostname) |
| |
| |
| class FakeWorker(gear.Worker): |
| def __init__(self, worker_id, test): |
| super(FakeWorker, self).__init__(worker_id) |
| self.gearman_jobs = {} |
| self.build_history = [] |
| self.running_builds = [] |
| self.build_counter = 0 |
| self.fail_tests = {} |
| self.test = test |
| |
| self.hold_jobs_in_build = False |
| self.lock = threading.Lock() |
| self.__work_thread = threading.Thread(target=self.work) |
| self.__work_thread.daemon = True |
| self.__work_thread.start() |
| |
| def handleJob(self, job): |
| parts = job.name.split(":") |
| cmd = parts[0] |
| name = parts[1] |
| if len(parts) > 2: |
| node = parts[2] |
| else: |
| node = None |
| if cmd == 'build': |
| self.handleBuild(job, name, node) |
| elif cmd == 'stop': |
| self.handleStop(job, name) |
| elif cmd == 'set_description': |
| self.handleSetDescription(job, name) |
| |
| def handleBuild(self, job, name, node): |
| build = FakeBuild(self, job, self.build_counter, node) |
| job.build = build |
| self.gearman_jobs[job.unique] = job |
| self.build_counter += 1 |
| |
| self.running_builds.append(build) |
| build.start() |
| |
| def handleStop(self, job, name): |
| self.log.debug("handle stop") |
| parameters = json.loads(job.arguments) |
| name = parameters['name'] |
| number = parameters['number'] |
| for build in self.running_builds: |
| if build.name == name and build.number == number: |
| build.aborted = True |
| build.release() |
| job.sendWorkComplete() |
| return |
| job.sendWorkFail() |
| |
| def handleSetDescription(self, job, name): |
| self.log.debug("handle set description") |
| parameters = json.loads(job.arguments) |
| name = parameters['name'] |
| number = parameters['number'] |
| descr = parameters['html_description'] |
| for build in self.running_builds: |
| if build.name == name and build.number == number: |
| build.description = descr |
| job.sendWorkComplete() |
| return |
| for build in self.build_history: |
| if build.name == name and build.number == number: |
| build.description = descr |
| job.sendWorkComplete() |
| return |
| job.sendWorkFail() |
| |
| def work(self): |
| while self.running: |
| try: |
| job = self.getJob() |
| except gear.InterruptedError: |
| continue |
| try: |
| self.handleJob(job) |
| except: |
| self.log.exception("Worker exception:") |
| |
| def addFailTest(self, name, change): |
| l = self.fail_tests.get(name, []) |
| l.append(change) |
| self.fail_tests[name] = l |
| |
| def shouldFailTest(self, name, ref): |
| l = self.fail_tests.get(name, []) |
| for change in l: |
| if self.test.ref_has_change(ref, change): |
| return True |
| return False |
| |
| def release(self, regex=None): |
| builds = self.running_builds[:] |
| self.log.debug("releasing build %s (%s)" % (regex, |
| len(self.running_builds))) |
| for build in builds: |
| if not regex or re.match(regex, build.name): |
| self.log.debug("releasing build %s" % |
| (build.parameters['ZUUL_UUID'])) |
| build.release() |
| else: |
| self.log.debug("not releasing build %s" % |
| (build.parameters['ZUUL_UUID'])) |
| self.log.debug("done releasing builds %s (%s)" % |
| (regex, len(self.running_builds))) |
| |
| |
| class FakeRealDbUpgradeRunner(RealDbUpgradeRunner): |
| def __init__(self, config, test): |
| self.test = test |
| super(FakeRealDbUpgradeRunner, self).__init__(config) |
| |
| def setup_gearman(self): |
| self.log.debug("Set up real_db gearman worker") |
| self.gearman_worker = FakeWorker('FakeRealDbUpgradeRunner_worker', |
| self.test) |
| self.gearman_worker.addServer( |
| self.config['zuul_server']['gearman_host'], |
| self.config['zuul_server']['gearman_port'] |
| ) |
| self.register_functions() |
| |
| |
| class BuildHistory(object): |
| def __init__(self, **kw): |
| self.__dict__.update(kw) |
| |
| def __repr__(self): |
| return ("<Completed build, result: %s name: %s #%s changes: %s>" % |
| (self.result, self.name, self.number, self.changes)) |
| |
| |
| class FakeBuild(threading.Thread): |
| def __init__(self, worker, job, number, node): |
| threading.Thread.__init__(self) |
| self.daemon = True |
| self.worker = worker |
| self.job = job |
| self.name = job.name.split(':')[1] |
| self.number = number |
| self.node = node |
| self.parameters = json.loads(job.arguments) |
| self.unique = self.parameters['ZUUL_UUID'] |
| self.wait_condition = threading.Condition() |
| self.waiting = False |
| self.aborted = False |
| self.created = time.time() |
| self.description = '' |
| |
| def release(self): |
| self.wait_condition.acquire() |
| self.wait_condition.notify() |
| self.waiting = False |
| self.log.debug("Build %s released" % self.unique) |
| self.wait_condition.release() |
| |
| def isWaiting(self): |
| self.wait_condition.acquire() |
| if self.waiting: |
| ret = True |
| else: |
| ret = False |
| self.wait_condition.release() |
| return ret |
| |
| def _wait(self): |
| self.wait_condition.acquire() |
| self.waiting = True |
| self.log.debug("Build %s waiting" % self.unique) |
| self.wait_condition.wait() |
| self.wait_condition.release() |
| |
| def run(self): |
| data = { |
| 'url': 'https://server/job/%s/%s/' % (self.name, self.number), |
| 'name': self.name, |
| 'number': self.number, |
| 'manager': self.worker.worker_id, |
| } |
| |
| self.job.sendWorkData(json.dumps(data)) |
| self.job.sendWorkStatus(0, 100) |
| |
| if self.worker.hold_jobs_in_build: |
| self._wait() |
| self.log.debug("Build %s continuing" % self.unique) |
| |
| self.worker.lock.acquire() |
| |
| result = 'SUCCESS' |
| if (('ZUUL_REF' in self.parameters) and |
| self.worker.shouldFailTest(self.name, |
| self.parameters['ZUUL_REF'])): |
| result = 'FAILURE' |
| if self.aborted: |
| result = 'ABORTED' |
| |
| data = {'result': result} |
| changes = None |
| if 'ZUUL_CHANGE_IDS' in self.parameters: |
| changes = self.parameters['ZUUL_CHANGE_IDS'] |
| |
| self.worker.build_history.append( |
| BuildHistory(name=self.name, number=self.number, |
| result=result, changes=changes, node=self.node, |
| uuid=self.unique, description=self.description, |
| pipeline=self.parameters['ZUUL_PIPELINE']) |
| ) |
| |
| self.job.sendWorkComplete(json.dumps(data)) |
| del self.worker.gearman_jobs[self.job.unique] |
| self.worker.running_builds.remove(self) |
| self.worker.lock.release() |
| |
| |
| class FakeGearmanServer(gear.Server): |
| def __init__(self, port=4730): |
| self.hold_jobs_in_queue = False |
| super(FakeGearmanServer, self).__init__(port) |
| |
| def getJobForConnection(self, connection, peek=False): |
| for queue in [self.high_queue, self.normal_queue, self.low_queue]: |
| for job in queue: |
| if not hasattr(job, 'waiting'): |
| if job.name.startswith('build:'): |
| job.waiting = self.hold_jobs_in_queue |
| else: |
| job.waiting = False |
| if job.waiting: |
| continue |
| if job.name in connection.functions: |
| if not peek: |
| queue.remove(job) |
| connection.related_jobs[job.handle] = job |
| job.worker_connection = connection |
| job.running = True |
| return job |
| return None |
| |
| def release(self, regex=None): |
| released = False |
| qlen = (len(self.high_queue) + len(self.normal_queue) + |
| len(self.low_queue)) |
| self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) |
| for job in self.getQueue(): |
| cmd, name = job.name.split(':') |
| if cmd != 'build': |
| continue |
| if not regex or re.match(regex, name): |
| self.log.debug("releasing queued job %s" % |
| job.unique) |
| job.waiting = False |
| released = True |
| else: |
| self.log.debug("not releasing queued job %s" % |
| job.unique) |
| if released: |
| self.wakeConnections() |
| qlen = (len(self.high_queue) + len(self.normal_queue) + |
| len(self.low_queue)) |
| self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen)) |