blob: a4b3b603a20dea39a81932ac21edf2d17f88de68 [file] [log] [blame]
#!/usr/bin/python2
#
# Copyright 2013 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
import threading
import os
import re
import time
from turbo_hipster.worker_manager import ZuulManager
from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\
as RealDbUpgradeRunner
class FakeZuulManager(ZuulManager):
def __init__(self, config, tasks, test):
self.test = test
super(FakeZuulManager, self).__init__(config, tasks)
def setup_gearman(self):
hostname = os.uname()[1]
self.gearman_worker = FakeWorker('turbo-hipster-manager-%s'
% hostname, self.test)
self.gearman_worker.addServer(
self.config['zuul_server']['gearman_host'],
self.config['zuul_server']['gearman_port']
)
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
class FakeWorker(gear.Worker):
def __init__(self, worker_id, test):
super(FakeWorker, self).__init__(worker_id)
self.gearman_jobs = {}
self.build_history = []
self.running_builds = []
self.build_counter = 0
self.fail_tests = {}
self.test = test
self.hold_jobs_in_build = False
self.lock = threading.Lock()
self.__work_thread = threading.Thread(target=self.work)
self.__work_thread.daemon = True
self.__work_thread.start()
def handleJob(self, job):
parts = job.name.split(":")
cmd = parts[0]
name = parts[1]
if len(parts) > 2:
node = parts[2]
else:
node = None
if cmd == 'build':
self.handleBuild(job, name, node)
elif cmd == 'stop':
self.handleStop(job, name)
elif cmd == 'set_description':
self.handleSetDescription(job, name)
def handleBuild(self, job, name, node):
build = FakeBuild(self, job, self.build_counter, node)
job.build = build
self.gearman_jobs[job.unique] = job
self.build_counter += 1
self.running_builds.append(build)
build.start()
def handleStop(self, job, name):
self.log.debug("handle stop")
parameters = json.loads(job.arguments)
name = parameters['name']
number = parameters['number']
for build in self.running_builds:
if build.name == name and build.number == number:
build.aborted = True
build.release()
job.sendWorkComplete()
return
job.sendWorkFail()
def handleSetDescription(self, job, name):
self.log.debug("handle set description")
parameters = json.loads(job.arguments)
name = parameters['name']
number = parameters['number']
descr = parameters['html_description']
for build in self.running_builds:
if build.name == name and build.number == number:
build.description = descr
job.sendWorkComplete()
return
for build in self.build_history:
if build.name == name and build.number == number:
build.description = descr
job.sendWorkComplete()
return
job.sendWorkFail()
def work(self):
while self.running:
try:
job = self.getJob()
except gear.InterruptedError:
continue
try:
self.handleJob(job)
except:
self.log.exception("Worker exception:")
def addFailTest(self, name, change):
l = self.fail_tests.get(name, [])
l.append(change)
self.fail_tests[name] = l
def shouldFailTest(self, name, ref):
l = self.fail_tests.get(name, [])
for change in l:
if self.test.ref_has_change(ref, change):
return True
return False
def release(self, regex=None):
builds = self.running_builds[:]
self.log.debug("releasing build %s (%s)" % (regex,
len(self.running_builds)))
for build in builds:
if not regex or re.match(regex, build.name):
self.log.debug("releasing build %s" %
(build.parameters['ZUUL_UUID']))
build.release()
else:
self.log.debug("not releasing build %s" %
(build.parameters['ZUUL_UUID']))
self.log.debug("done releasing builds %s (%s)" %
(regex, len(self.running_builds)))
class FakeRealDbUpgradeRunner(RealDbUpgradeRunner):
def __init__(self, global_config, plugin_config, worker_name, test):
self.test = test
super(FakeRealDbUpgradeRunner, self).__init__(global_config,
plugin_config,
worker_name)
class BuildHistory(object):
def __init__(self, **kw):
self.__dict__.update(kw)
def __repr__(self):
return ("<Completed build, result: %s name: %s #%s changes: %s>" %
(self.result, self.name, self.number, self.changes))
class FakeBuild(threading.Thread):
def __init__(self, worker, job, number, node):
threading.Thread.__init__(self)
self.daemon = True
self.worker = worker
self.job = job
self.name = job.name.split(':')[1]
self.number = number
self.node = node
self.parameters = json.loads(job.arguments)
self.unique = self.parameters['ZUUL_UUID']
self.wait_condition = threading.Condition()
self.waiting = False
self.aborted = False
self.created = time.time()
self.description = ''
def release(self):
self.wait_condition.acquire()
self.wait_condition.notify()
self.waiting = False
self.log.debug("Build %s released" % self.unique)
self.wait_condition.release()
def isWaiting(self):
self.wait_condition.acquire()
if self.waiting:
ret = True
else:
ret = False
self.wait_condition.release()
return ret
def _wait(self):
self.wait_condition.acquire()
self.waiting = True
self.log.debug("Build %s waiting" % self.unique)
self.wait_condition.wait()
self.wait_condition.release()
def run(self):
data = {
'url': 'https://server/job/%s/%s/' % (self.name, self.number),
'name': self.name,
'number': self.number,
'manager': self.worker.worker_id,
}
self.job.sendWorkData(json.dumps(data))
self.job.sendWorkStatus(0, 100)
if self.worker.hold_jobs_in_build:
self._wait()
self.log.debug("Build %s continuing" % self.unique)
self.worker.lock.acquire()
result = 'SUCCESS'
if (('ZUUL_REF' in self.parameters) and
self.worker.shouldFailTest(self.name,
self.parameters['ZUUL_REF'])):
result = 'FAILURE'
if self.aborted:
result = 'ABORTED'
data = {'result': result}
changes = None
if 'ZUUL_CHANGE_IDS' in self.parameters:
changes = self.parameters['ZUUL_CHANGE_IDS']
self.worker.build_history.append(
BuildHistory(name=self.name, number=self.number,
result=result, changes=changes, node=self.node,
uuid=self.unique, description=self.description,
pipeline=self.parameters['ZUUL_PIPELINE'])
)
self.job.sendWorkComplete(json.dumps(data))
del self.worker.gearman_jobs[self.job.unique]
self.worker.running_builds.remove(self)
self.worker.lock.release()
class FakeGearmanServer(gear.Server):
def __init__(self, port=4730):
self.hold_jobs_in_queue = False
super(FakeGearmanServer, self).__init__(port)
def getJobForConnection(self, connection, peek=False):
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
for job in queue:
if not hasattr(job, 'waiting'):
if job.name.startswith('build:'):
job.waiting = self.hold_jobs_in_queue
else:
job.waiting = False
if job.waiting:
continue
if job.name in connection.functions:
if not peek:
queue.remove(job)
connection.related_jobs[job.handle] = job
job.worker_connection = connection
job.running = True
return job
return None
def release(self, regex=None):
released = False
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
cmd, name = job.name.split(':')
if cmd != 'build':
continue
if not regex or re.match(regex, name):
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
released = True
else:
self.log.debug("not releasing queued job %s" %
job.unique)
if released:
self.wakeConnections()
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))