Add FakeNodepool test fixture
Add a fake nodepool that immediately successfully fulfills all
requests, but actually uses the Nodepool ZooKeeper API.
Update the Zuul Nodepool facade to use the Nodepool ZooKeeper API.
Change-Id: If7859f0c6531439c3be38cc6ca6b699b3b5eade2
diff --git a/tests/base.py b/tests/base.py
index 9845484..b84bcf9 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -41,6 +41,7 @@
import gear
import fixtures
import kazoo.client
+import kazoo.exceptions
import statsd
import testtools
from git.exc import NoSuchPathError
@@ -64,6 +65,7 @@
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
+import zuul.zk
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@@ -871,6 +873,63 @@
return endpoint, ''
+class FakeNodepool(object):
+ REQUEST_ROOT = '/nodepool/requests'
+
+ log = logging.getLogger("zuul.test.FakeNodepool")
+
+ def __init__(self, host, port, chroot):
+ self.client = kazoo.client.KazooClient(
+ hosts='%s:%s%s' % (host, port, chroot))
+ self.client.start()
+ self._running = True
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def stop(self):
+ self._running = False
+ self.thread.join()
+ self.client.stop()
+ self.client.close()
+
+ def run(self):
+ while self._running:
+ self._run()
+ time.sleep(0.1)
+
+ def _run(self):
+ for req in self.getNodeRequests():
+ self.fulfillRequest(req)
+
+ def getNodeRequests(self):
+ try:
+ reqids = self.client.get_children(self.REQUEST_ROOT)
+ except kazoo.exceptions.NoNodeError:
+ return []
+ reqs = []
+ for oid in sorted(reqids):
+ path = self.REQUEST_ROOT + '/' + oid
+ data, stat = self.client.get(path)
+ data = json.loads(data)
+ data['_oid'] = oid
+ reqs.append(data)
+ return reqs
+
+ def fulfillRequest(self, request):
+ if request['state'] == 'fulfilled':
+ return
+ request = request.copy()
+ request['state'] = 'fulfilled'
+ request['state_time'] = time.time()
+ oid = request['_oid']
+ del request['_oid']
+ path = self.REQUEST_ROOT + '/' + oid
+ data = json.dumps(request)
+ self.log.debug("Fulfilling node request: %s %s" % (oid, data))
+ self.client.set(path, data)
+
+
class ChrootedKazooFixture(fixtures.Fixture):
def __init__(self):
super(ChrootedKazooFixture, self).__init__()
@@ -962,27 +1021,29 @@
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
- # NOTE(notmorgan): Extract logging overrides for specific libraries
- # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
- # each. This is used to limit the output during test runs from
- # libraries that zuul depends on such as gear.
- log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+ # NOTE(notmorgan): Extract logging overrides for specific libraries
+ # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+ # each. This is used to limit the output during test runs from
+ # libraries that zuul depends on such as gear.
+ log_defaults_from_env = os.environ.get(
+ 'OS_LOG_DEFAULTS',
+ 'git.cmd=INFO,kazoo.client=INFO')
- if log_defaults_from_env:
- for default in log_defaults_from_env.split(','):
- try:
- name, level_str = default.split('=', 1)
- level = getattr(logging, level_str, logging.DEBUG)
- self.useFixture(fixtures.FakeLogger(
- name=name,
- level=level,
- format='%(asctime)s %(name)-32s '
- '%(levelname)-8s %(message)s'))
- except ValueError:
- # NOTE(notmorgan): Invalid format of the log default,
- # skip and don't try and apply a logger for the
- # specified module
- pass
+ if log_defaults_from_env:
+ for default in log_defaults_from_env.split(','):
+ try:
+ name, level_str = default.split('=', 1)
+ level = getattr(logging, level_str, logging.DEBUG)
+ self.useFixture(fixtures.FakeLogger(
+ name=name,
+ level=level,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
+ except ValueError:
+ # NOTE(notmorgan): Invalid format of the log default,
+ # skip and don't try and apply a logger for the
+ # specified module
+ pass
class ZuulTestCase(BaseTestCase):
@@ -1140,10 +1201,17 @@
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.nodepool = zuul.nodepool.Nodepool(self.sched)
+ self.zk = zuul.zk.ZooKeeper()
+ self.zk.connect([self.zk_config])
+
+ self.fake_nodepool = FakeNodepool(self.zk_config.host,
+ self.zk_config.port,
+ self.zk_config.chroot)
self.sched.setLauncher(self.launch_client)
self.sched.setMerger(self.merge_client)
self.sched.setNodepool(self.nodepool)
+ self.sched.setZooKeeper(self.zk)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
@@ -1244,9 +1312,10 @@
def setupZK(self):
self.zk_chroot_fixture = self.useFixture(ChrootedKazooFixture())
- self.zookeeper_host = self.zk_chroot_fixture.zookeeper_host
- self.zookeeper_port = self.zk_chroot_fixture.zookeeper_port
- self.zookeeper_chroot = self.zk_chroot_fixture.zookeeper_chroot
+ self.zk_config = zuul.zk.ZooKeeperConnectionConfig(
+ self.zk_chroot_fixture.zookeeper_host,
+ self.zk_chroot_fixture.zookeeper_port,
+ self.zk_chroot_fixture.zookeeper_chroot)
def copyDirToRepo(self, project, source_path):
self.init_repo(project)
@@ -1295,6 +1364,8 @@
self.rpc.stop()
self.rpc.join()
self.gearman_server.shutdown()
+ self.fake_nodepool.stop()
+ self.zk.disconnect()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
@@ -1428,6 +1499,11 @@
return False
return True
+ def areAllNodeRequestsComplete(self):
+ if self.sched.nodepool.requests:
+ return False
+ return True
+
def eventQueuesEmpty(self):
for queue in self.event_queues:
yield queue.empty()
@@ -1461,7 +1537,8 @@
if (not self.merge_client.jobs and
all(self.eventQueuesEmpty()) and
self.haveAllBuildsReported() and
- self.areAllBuildsWaiting()):
+ self.areAllBuildsWaiting() and
+ self.areAllNodeRequestsComplete()):
self.sched.run_handler_lock.release()
self.launch_server.lock.release()
self.log.debug("...settled.")