Add in start of worker_manager tests and fakes
diff --git a/tests/etc/config.json b/tests/etc/config.json
new file mode 100644
index 0000000..2b6451c
--- /dev/null
+++ b/tests/etc/config.json
@@ -0,0 +1,18 @@
+{
+ "zuul_server": {
+ "git_url": "/home/josh/var/lib/zuul/git/",
+ "gearman_host": "localhost",
+ "gearman_port": 14730
+ },
+ "debug_log": "/home/josh/var/log/turbo-hipster/debug.log",
+ "jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
+ "git_working_dir": "/home/josh/var/lib/turbo-hipster/git",
+ "pip_download_cache": "/home/josh/var/cache/pip",
+ "plugins": ["gate_real_db_upgrade"],
+ "publish_logs":
+ {
+ "type": "local",
+ "path": "/home/josh/var/www/results/",
+ "prepend_url": "http://localhost/results/"
+ }
+}
\ No newline at end of file
diff --git a/tests/fakes.py b/tests/fakes.py
new file mode 100644
index 0000000..5a2f972
--- /dev/null
+++ b/tests/fakes.py
@@ -0,0 +1,280 @@
+#!/usr/bin/python2
+#
+# Copyright 2013 Rackspace Australia
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+import threading
+import os
+
+from turbo_hipster.worker_manager import GearmanManager
+
+
+class FakeGearmanManager(GearmanManager):
+ def __init__(self, config, tasks, test):
+ self.test = test
+ super(FakeGearmanManager, self).__init__(config, tasks)
+
+ def setup_gearman(self):
+ hostname = os.uname()[1]
+ self.gearman_worker = FakeWorker('turbo-hipster-manager-%s'
+ % hostname, self.test)
+ self.gearman_worker.addServer(
+ self.config['zuul_server']['gearman_host'],
+ self.config['zuul_server']['gearman_port']
+ )
+ self.gearman_worker.registerFunction(
+ 'stop:turbo-hipster-manager-%s' % hostname)
+
+
+class FakeWorker(gear.Worker):
+ def __init__(self, worker_id, test):
+ super(FakeWorker, self).__init__(worker_id)
+ self.gearman_jobs = {}
+ self.build_history = []
+ self.running_builds = []
+ self.build_counter = 0
+ self.fail_tests = {}
+ self.test = test
+
+ self.hold_jobs_in_build = False
+ self.lock = threading.Lock()
+ self.__work_thread = threading.Thread(target=self.work)
+ self.__work_thread.daemon = True
+ self.__work_thread.start()
+
+ def handleJob(self, job):
+ parts = job.name.split(":")
+ cmd = parts[0]
+ name = parts[1]
+ if len(parts) > 2:
+ node = parts[2]
+ else:
+ node = None
+ if cmd == 'build':
+ self.handleBuild(job, name, node)
+ elif cmd == 'stop':
+ self.handleStop(job, name)
+ elif cmd == 'set_description':
+ self.handleSetDescription(job, name)
+
+ def handleBuild(self, job, name, node):
+ build = FakeBuild(self, job, self.build_counter, node)
+ job.build = build
+ self.gearman_jobs[job.unique] = job
+ self.build_counter += 1
+
+ self.running_builds.append(build)
+ build.start()
+
+ def handleStop(self, job, name):
+ self.log.debug("handle stop")
+ parameters = json.loads(job.arguments)
+ name = parameters['name']
+ number = parameters['number']
+ for build in self.running_builds:
+ if build.name == name and build.number == number:
+ build.aborted = True
+ build.release()
+ job.sendWorkComplete()
+ return
+ job.sendWorkFail()
+
+ def handleSetDescription(self, job, name):
+ self.log.debug("handle set description")
+ parameters = json.loads(job.arguments)
+ name = parameters['name']
+ number = parameters['number']
+ descr = parameters['html_description']
+ for build in self.running_builds:
+ if build.name == name and build.number == number:
+ build.description = descr
+ job.sendWorkComplete()
+ return
+ for build in self.build_history:
+ if build.name == name and build.number == number:
+ build.description = descr
+ job.sendWorkComplete()
+ return
+ job.sendWorkFail()
+
+ def work(self):
+ while self.running:
+ try:
+ job = self.getJob()
+ except gear.InterruptedError:
+ continue
+ try:
+ self.handleJob(job)
+ except:
+ self.log.exception("Worker exception:")
+
+ def addFailTest(self, name, change):
+ l = self.fail_tests.get(name, [])
+ l.append(change)
+ self.fail_tests[name] = l
+
+ def shouldFailTest(self, name, ref):
+ l = self.fail_tests.get(name, [])
+ for change in l:
+ if self.test.ref_has_change(ref, change):
+ return True
+ return False
+
+ def release(self, regex=None):
+ builds = self.running_builds[:]
+ self.log.debug("releasing build %s (%s)" % (regex,
+ len(self.running_builds)))
+ for build in builds:
+ if not regex or re.match(regex, build.name):
+ self.log.debug("releasing build %s" %
+ (build.parameters['ZUUL_UUID']))
+ build.release()
+ else:
+ self.log.debug("not releasing build %s" %
+ (build.parameters['ZUUL_UUID']))
+ self.log.debug("done releasing builds %s (%s)" %
+ (regex, len(self.running_builds)))
+
+
+class FakeBuild(threading.Thread):
+ def __init__(self, worker, job, number, node):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.worker = worker
+ self.job = job
+ self.name = job.name.split(':')[1]
+ self.number = number
+ self.node = node
+ self.parameters = json.loads(job.arguments)
+ self.unique = self.parameters['ZUUL_UUID']
+ self.wait_condition = threading.Condition()
+ self.waiting = False
+ self.aborted = False
+ self.created = time.time()
+ self.description = ''
+
+ def release(self):
+ self.wait_condition.acquire()
+ self.wait_condition.notify()
+ self.waiting = False
+ self.log.debug("Build %s released" % self.unique)
+ self.wait_condition.release()
+
+ def isWaiting(self):
+ self.wait_condition.acquire()
+ if self.waiting:
+ ret = True
+ else:
+ ret = False
+ self.wait_condition.release()
+ return ret
+
+ def _wait(self):
+ self.wait_condition.acquire()
+ self.waiting = True
+ self.log.debug("Build %s waiting" % self.unique)
+ self.wait_condition.wait()
+ self.wait_condition.release()
+
+ def run(self):
+ data = {
+ 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
+ 'name': self.name,
+ 'number': self.number,
+ 'manager': self.worker.worker_id,
+ }
+
+ self.job.sendWorkData(json.dumps(data))
+ self.job.sendWorkStatus(0, 100)
+
+ if self.worker.hold_jobs_in_build:
+ self._wait()
+ self.log.debug("Build %s continuing" % self.unique)
+
+ self.worker.lock.acquire()
+
+ result = 'SUCCESS'
+ if (('ZUUL_REF' in self.parameters) and
+ self.worker.shouldFailTest(self.name,
+ self.parameters['ZUUL_REF'])):
+ result = 'FAILURE'
+ if self.aborted:
+ result = 'ABORTED'
+
+ data = {'result': result}
+ changes = None
+ if 'ZUUL_CHANGE_IDS' in self.parameters:
+ changes = self.parameters['ZUUL_CHANGE_IDS']
+
+ self.worker.build_history.append(
+ BuildHistory(name=self.name, number=self.number,
+ result=result, changes=changes, node=self.node,
+ uuid=self.unique, description=self.description,
+ pipeline=self.parameters['ZUUL_PIPELINE'])
+ )
+
+ self.job.sendWorkComplete(json.dumps(data))
+ del self.worker.gearman_jobs[self.job.unique]
+ self.worker.running_builds.remove(self)
+ self.worker.lock.release()
+
+
+class FakeGearmanServer(gear.Server):
+ def __init__(self, port=4730):
+ self.hold_jobs_in_queue = False
+ super(FakeGearmanServer, self).__init__(port)
+
+ def getJobForConnection(self, connection, peek=False):
+ for queue in [self.high_queue, self.normal_queue, self.low_queue]:
+ for job in queue:
+ if not hasattr(job, 'waiting'):
+ if job.name.startswith('build:'):
+ job.waiting = self.hold_jobs_in_queue
+ else:
+ job.waiting = False
+ if job.waiting:
+ continue
+ if job.name in connection.functions:
+ if not peek:
+ queue.remove(job)
+ connection.related_jobs[job.handle] = job
+ job.worker_connection = connection
+ job.running = True
+ return job
+ return None
+
+ def release(self, regex=None):
+ released = False
+ qlen = (len(self.high_queue) + len(self.normal_queue) +
+ len(self.low_queue))
+ self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
+ for job in self.getQueue():
+ cmd, name = job.name.split(':')
+ if cmd != 'build':
+ continue
+ if not regex or re.match(regex, name):
+ self.log.debug("releasing queued job %s" %
+ job.unique)
+ job.waiting = False
+ released = True
+ else:
+ self.log.debug("not releasing queued job %s" %
+ job.unique)
+ if released:
+ self.wakeConnections()
+ qlen = (len(self.high_queue) + len(self.normal_queue) +
+ len(self.low_queue))
+ self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
+
diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py
index ac2b2a9..11b1a22 100644
--- a/tests/test_worker_manager.py
+++ b/tests/test_worker_manager.py
@@ -1,29 +1,46 @@
-import unittest
+#!/usr/bin/python2
+#
+# Copyright 2013 Rackspace Australia
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
-class TestGearmanManager(unittest.TestCase):
- def test___init__(self):
- # gearman_manager = GearmanManager(config, tasks)
- assert False # TODO: implement your test here
- def test_run(self):
- # gearman_manager = GearmanManager(config, tasks)
- # self.assertEqual(expected, gearman_manager.run())
- assert False # TODO: implement your test here
+import json
+import os
+import testtools
+from fakes import FakeGearmanManager, FakeGearmanServer
- def test_setup_gearman(self):
- # gearman_manager = GearmanManager(config, tasks)
- # self.assertEqual(expected, gearman_manager.setup_gearman())
- assert False # TODO: implement your test here
+CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc')
+with open(os.path.join(CONFIG_DIR, 'config.json'), 'r') as config_stream:
+ CONFIG = json.load(config_stream)
- def test_stop(self):
- # gearman_manager = GearmanManager(config, tasks)
- # self.assertEqual(expected, gearman_manager.stop())
- assert False # TODO: implement your test here
- def test_stopped(self):
- # gearman_manager = GearmanManager(config, tasks)
- # self.assertEqual(expected, gearman_manager.stopped())
- assert False # TODO: implement your test here
+class TestGearmanManager(testtools.TestCase):
+ def setUp(self):
+ super(TestGearmanManager, self).setUp()
+ self.config = CONFIG
+ self.tasks = []
+ self.gearman_server = FakeGearmanServer(
+ self.config['zuul_server']['gearman_port'])
+
+ self.gearman_manager = FakeGearmanManager(self.config,
+ self.tasks,
+ self)
+
+ def test_manager_function_registered(self):
+ """ Check the manager is set up correctly and registered with the
+ gearman server with an appropriate function """
+ pass
if __name__ == '__main__':
unittest.main()