Merge "Add FakeNodepool test fixture" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index 9845484..b84bcf9 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -41,6 +41,7 @@
import gear
import fixtures
import kazoo.client
+import kazoo.exceptions
import statsd
import testtools
from git.exc import NoSuchPathError
@@ -64,6 +65,7 @@
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
+import zuul.zk
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@@ -871,6 +873,63 @@
return endpoint, ''
+class FakeNodepool(object):
+ REQUEST_ROOT = '/nodepool/requests'
+
+ log = logging.getLogger("zuul.test.FakeNodepool")
+
+ def __init__(self, host, port, chroot):
+ self.client = kazoo.client.KazooClient(
+ hosts='%s:%s%s' % (host, port, chroot))
+ self.client.start()
+ self._running = True
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def stop(self):
+ self._running = False
+ self.thread.join()
+ self.client.stop()
+ self.client.close()
+
+ def run(self):
+ while self._running:
+ self._run()
+ time.sleep(0.1)
+
+ def _run(self):
+ for req in self.getNodeRequests():
+ self.fulfillRequest(req)
+
+ def getNodeRequests(self):
+ try:
+ reqids = self.client.get_children(self.REQUEST_ROOT)
+ except kazoo.exceptions.NoNodeError:
+ return []
+ reqs = []
+ for oid in sorted(reqids):
+ path = self.REQUEST_ROOT + '/' + oid
+ data, stat = self.client.get(path)
+ data = json.loads(data)
+ data['_oid'] = oid
+ reqs.append(data)
+ return reqs
+
+ def fulfillRequest(self, request):
+ if request['state'] == 'fulfilled':
+ return
+ request = request.copy()
+ request['state'] = 'fulfilled'
+ request['state_time'] = time.time()
+ oid = request['_oid']
+ del request['_oid']
+ path = self.REQUEST_ROOT + '/' + oid
+ data = json.dumps(request)
+ self.log.debug("Fulfilling node request: %s %s" % (oid, data))
+ self.client.set(path, data)
+
+
class ChrootedKazooFixture(fixtures.Fixture):
def __init__(self):
super(ChrootedKazooFixture, self).__init__()
@@ -962,27 +1021,29 @@
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
- # NOTE(notmorgan): Extract logging overrides for specific libraries
- # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
- # each. This is used to limit the output during test runs from
- # libraries that zuul depends on such as gear.
- log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+ # NOTE(notmorgan): Extract logging overrides for specific libraries
+ # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+ # each. This is used to limit the output during test runs from
+ # libraries that zuul depends on such as gear.
+ log_defaults_from_env = os.environ.get(
+ 'OS_LOG_DEFAULTS',
+ 'git.cmd=INFO,kazoo.client=INFO')
- if log_defaults_from_env:
- for default in log_defaults_from_env.split(','):
- try:
- name, level_str = default.split('=', 1)
- level = getattr(logging, level_str, logging.DEBUG)
- self.useFixture(fixtures.FakeLogger(
- name=name,
- level=level,
- format='%(asctime)s %(name)-32s '
- '%(levelname)-8s %(message)s'))
- except ValueError:
- # NOTE(notmorgan): Invalid format of the log default,
- # skip and don't try and apply a logger for the
- # specified module
- pass
+ if log_defaults_from_env:
+ for default in log_defaults_from_env.split(','):
+ try:
+ name, level_str = default.split('=', 1)
+ level = getattr(logging, level_str, logging.DEBUG)
+ self.useFixture(fixtures.FakeLogger(
+ name=name,
+ level=level,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
+ except ValueError:
+ # NOTE(notmorgan): Invalid format of the log default,
+ # skip and don't try and apply a logger for the
+ # specified module
+ pass
class ZuulTestCase(BaseTestCase):
@@ -1140,10 +1201,17 @@
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.nodepool = zuul.nodepool.Nodepool(self.sched)
+ self.zk = zuul.zk.ZooKeeper()
+ self.zk.connect([self.zk_config])
+
+ self.fake_nodepool = FakeNodepool(self.zk_config.host,
+ self.zk_config.port,
+ self.zk_config.chroot)
self.sched.setLauncher(self.launch_client)
self.sched.setMerger(self.merge_client)
self.sched.setNodepool(self.nodepool)
+ self.sched.setZooKeeper(self.zk)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
@@ -1244,9 +1312,10 @@
def setupZK(self):
self.zk_chroot_fixture = self.useFixture(ChrootedKazooFixture())
- self.zookeeper_host = self.zk_chroot_fixture.zookeeper_host
- self.zookeeper_port = self.zk_chroot_fixture.zookeeper_port
- self.zookeeper_chroot = self.zk_chroot_fixture.zookeeper_chroot
+ self.zk_config = zuul.zk.ZooKeeperConnectionConfig(
+ self.zk_chroot_fixture.zookeeper_host,
+ self.zk_chroot_fixture.zookeeper_port,
+ self.zk_chroot_fixture.zookeeper_chroot)
def copyDirToRepo(self, project, source_path):
self.init_repo(project)
@@ -1295,6 +1364,8 @@
self.rpc.stop()
self.rpc.join()
self.gearman_server.shutdown()
+ self.fake_nodepool.stop()
+ self.zk.disconnect()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
@@ -1428,6 +1499,11 @@
return False
return True
+ def areAllNodeRequestsComplete(self):
+ if self.sched.nodepool.requests:
+ return False
+ return True
+
def eventQueuesEmpty(self):
for queue in self.event_queues:
yield queue.empty()
@@ -1461,7 +1537,8 @@
if (not self.merge_client.jobs and
all(self.eventQueuesEmpty()) and
self.haveAllBuildsReported() and
- self.areAllBuildsWaiting()):
+ self.areAllBuildsWaiting() and
+ self.areAllNodeRequestsComplete()):
self.sched.run_handler_lock.release()
self.launch_server.lock.release()
self.log.debug("...settled.")
diff --git a/zuul/model.py b/zuul/model.py
index 6e0176f..68ad603 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -395,11 +395,39 @@
self.build_set = build_set
self.job = job
self.nodeset = nodeset
- self.id = uuid4().hex
+ self._state = 'requested'
+ self.state_time = time.time()
+ self.stat = None
+ self.uid = uuid4().hex
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ # TODOv3(jeblair): reinstate
+ # if value not in STATES:
+ # raise TypeError("'%s' is not a valid state" % value)
+ self._state = value
+ self.state_time = time.time()
def __repr__(self):
return '<NodeRequest %s>' % (self.nodeset,)
+ def toDict(self):
+ d = {}
+ nodes = [n.image for n in self.nodeset.getNodes()]
+ d['node_types'] = nodes
+ d['requestor'] = 'zuul' # TODOv3(jeblair): better descriptor
+ d['state'] = self.state
+ d['state_time'] = self.state_time
+ return d
+
+ def updateFromDict(self, data):
+ self._state = data['state']
+ self.state_time = data['state_time']
+
class Job(object):
"""A Job represents the defintion of actions to perform."""
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index addeaf3..2fa74c8 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -10,18 +10,26 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
+
from zuul.model import NodeRequest
class Nodepool(object):
+ log = logging.getLogger('zuul.nodepool')
+
def __init__(self, scheduler):
self.requests = {}
self.sched = scheduler
def requestNodes(self, build_set, job):
req = NodeRequest(build_set, job, job.nodeset)
- self.requests[req.id] = req
- self._requestComplete(req.id)
+ self.requests[req.uid] = req
+ self.log.debug("Submitting node request: %s" % (req,))
+
+ self.sched.zk.submitNodeRequest(req)
+ self._updateNodeRequest(req)
+
return req
def cancelRequest(self, request):
@@ -31,7 +39,16 @@
def returnNodes(self, nodes, used=True):
pass
- def _requestComplete(self, id):
- req = self.requests[id]
- del self.requests[id]
- self.sched.onNodesProvisioned(req)
+ def _updateNodeRequest(self, request):
+ self.log.debug("Updating node request: %s" % (request,))
+
+ def callback(event):
+ self._updateNodeRequest(request)
+ self.sched.zk.getNodeRequest(request, callback)
+
+ if request.uid not in self.requests:
+ return
+
+ if request.state == 'fulfilled':
+ self.sched.onNodesProvisioned(request)
+ del self.requests[request.uid]
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 3f78e8d..4a6cc93 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -285,6 +285,9 @@
def setNodepool(self, nodepool):
self.nodepool = nodepool
+ def setZooKeeper(self, zk):
+ self.zk = zk
+
def addEvent(self, event):
self.log.debug("Adding trigger event: %s" % event)
try:
diff --git a/zuul/zk.py b/zuul/zk.py
new file mode 100644
index 0000000..2e87205
--- /dev/null
+++ b/zuul/zk.py
@@ -0,0 +1,309 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import six
+import time
+from kazoo.client import KazooClient, KazooState
+from kazoo import exceptions as kze
+
+# States:
+# We are building this node but it is not ready for use.
+BUILDING = 'building'
+# The node is ready for use.
+READY = 'ready'
+# The node should be deleted.
+DELETING = 'deleting'
+
+STATES = set([BUILDING, READY, DELETING])
+
+
+class ZooKeeperConnectionConfig(object):
+ '''
+ Represents the connection parameters for a ZooKeeper server.
+ '''
+
+ def __eq__(self, other):
+ if isinstance(other, ZooKeeperConnectionConfig):
+ if other.__dict__ == self.__dict__:
+ return True
+ return False
+
+ def __init__(self, host, port=2181, chroot=None):
+ '''Initialize the ZooKeeperConnectionConfig object.
+
+ :param str host: The hostname of the ZooKeeper server.
+ :param int port: The port on which ZooKeeper is listening.
+ Optional, default: 2181.
+ :param str chroot: A chroot for this connection. All
+ ZooKeeper nodes will be underneath this root path.
+ Optional, default: None.
+
+ (one per server) defining the ZooKeeper cluster servers. Only
+ the 'host' attribute is required.'.
+
+ '''
+ self.host = host
+ self.port = port
+ self.chroot = chroot or ''
+
+
+def buildZooKeeperHosts(host_list):
+ '''
+ Build the ZK cluster host list for client connections.
+
+ :param list host_list: A list of
+ :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one
+ per server) defining the ZooKeeper cluster servers.
+ '''
+ if not isinstance(host_list, list):
+ raise Exception("'host_list' must be a list")
+ hosts = []
+ for host_def in host_list:
+ host = '%s:%s%s' % (host_def.host, host_def.port, host_def.chroot)
+ hosts.append(host)
+ return ",".join(hosts)
+
+
+class BaseModel(object):
+ def __init__(self, o_id):
+ if o_id:
+ self.id = o_id
+ self._state = None
+ self.state_time = None
+ self.stat = None
+
+ @property
+ def id(self):
+ return self._id
+
+ @id.setter
+ def id(self, value):
+ if not isinstance(value, six.string_types):
+ raise TypeError("'id' attribute must be a string type")
+ self._id = value
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ if value not in STATES:
+ raise TypeError("'%s' is not a valid state" % value)
+ self._state = value
+ self.state_time = time.time()
+
+ def toDict(self):
+ '''
+ Convert a BaseModel object's attributes to a dictionary.
+ '''
+ d = {}
+ d['state'] = self.state
+ d['state_time'] = self.state_time
+ return d
+
+ def fromDict(self, d):
+ '''
+ Set base attributes based on the given dict.
+
+ Unlike the derived classes, this should NOT return an object as it
+ assumes self has already been instantiated.
+ '''
+ if 'state' in d:
+ self.state = d['state']
+ if 'state_time' in d:
+ self.state_time = d['state_time']
+
+
+class NodeRequest(BaseModel):
+ '''
+ Class representing a node request.
+ '''
+
+ def __init__(self, id=None):
+ super(NodeRequest, self).__init__(id)
+
+ def __repr__(self):
+ d = self.toDict()
+ d['id'] = self.id
+ d['stat'] = self.stat
+ return '<NodeRequest %s>' % d
+
+ def toDict(self):
+ '''
+ Convert a NodeRequest object's attributes to a dictionary.
+ '''
+ d = super(NodeRequest, self).toDict()
+ return d
+
+ @staticmethod
+ def fromDict(d, o_id=None):
+ '''
+ Create a NodeRequest object from a dictionary.
+
+ :param dict d: The dictionary.
+ :param str o_id: The object ID.
+
+ :returns: An initialized ImageBuild object.
+ '''
+ o = NodeRequest(o_id)
+ super(NodeRequest, o).fromDict(d)
+ return o
+
+
+class ZooKeeper(object):
+ '''
+ Class implementing the ZooKeeper interface.
+
+ This class uses the facade design pattern to keep common interaction
+ with the ZooKeeper API simple and consistent for the caller, and
+ limits coupling between objects. It allows for more complex interactions
+ by providing direct access to the client connection when needed (though
+ that is discouraged). It also provides for a convenient entry point for
+ testing only ZooKeeper interactions.
+ '''
+
+ log = logging.getLogger("zuul.zk.ZooKeeper")
+
+ REQUEST_ROOT = '/nodepool/requests'
+
+ def __init__(self):
+ '''
+ Initialize the ZooKeeper object.
+ '''
+ self.client = None
+ self._became_lost = False
+
+ def _dictToStr(self, data):
+ return json.dumps(data)
+
+ def _strToDict(self, data):
+ return json.loads(data)
+
+ def _connection_listener(self, state):
+ '''
+ Listener method for Kazoo connection state changes.
+
+ .. warning:: This method must not block.
+ '''
+ if state == KazooState.LOST:
+ self.log.debug("ZooKeeper connection: LOST")
+ self._became_lost = True
+ elif state == KazooState.SUSPENDED:
+ self.log.debug("ZooKeeper connection: SUSPENDED")
+ else:
+ self.log.debug("ZooKeeper connection: CONNECTED")
+
+ @property
+ def connected(self):
+ return self.client.state == KazooState.CONNECTED
+
+ @property
+ def suspended(self):
+ return self.client.state == KazooState.SUSPENDED
+
+ @property
+ def lost(self):
+ return self.client.state == KazooState.LOST
+
+ @property
+ def didLoseConnection(self):
+ return self._became_lost
+
+ def resetLostFlag(self):
+ self._became_lost = False
+
+ def connect(self, host_list, read_only=False):
+ '''
+ Establish a connection with ZooKeeper cluster.
+
+ Convenience method if a pre-existing ZooKeeper connection is not
+ supplied to the ZooKeeper object at instantiation time.
+
+ :param list host_list: A list of
+ :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
+ (one per server) defining the ZooKeeper cluster servers.
+ :param bool read_only: If True, establishes a read-only connection.
+
+ '''
+ if self.client is None:
+ hosts = buildZooKeeperHosts(host_list)
+ self.client = KazooClient(hosts=hosts, read_only=read_only)
+ self.client.add_listener(self._connection_listener)
+ self.client.start()
+
+ def disconnect(self):
+ '''
+ Close the ZooKeeper cluster connection.
+
+ You should call this method if you used connect() to establish a
+ cluster connection.
+ '''
+ if self.client is not None and self.client.connected:
+ self.client.stop()
+ self.client.close()
+ self.client = None
+
+ def resetHosts(self, host_list):
+ '''
+ Reset the ZooKeeper cluster connection host list.
+
+ :param list host_list: A list of
+ :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
+ (one per server) defining the ZooKeeper cluster servers.
+ '''
+ if self.client is not None:
+ hosts = buildZooKeeperHosts(host_list)
+ self.client.set_hosts(hosts=hosts)
+
+ def submitNodeRequest(self, node_request):
+ '''
+ Submit a request for nodes to Nodepool.
+
+ :param NodeRequest node_request: A NodeRequest with the
+ contents of the request.
+ '''
+ priority = 100 # TODO(jeblair): integrate into nodereq
+
+ data = node_request.toDict()
+ data['created_time'] = time.time()
+
+ path = '%s/%s-' % (self.REQUEST_ROOT, priority)
+ self.log.debug(data)
+ path = self.client.create(path, self._dictToStr(data),
+ makepath=True,
+ sequence=True, ephemeral=True)
+ reqid = path.split("/")[-1]
+ node_request.id = reqid
+
+ def getNodeRequest(self, node_request, watcher):
+ '''
+ Read the specified node request and update its values.
+
+ :param NodeRequest node_request: A NodeRequest to be read. It
+ will be updated with the results of the read.
+ :param callable watcher: A watch function to be called when the
+ node request is updated.
+ '''
+
+ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+ try:
+ data, stat = self.client.get(path, watch=watcher)
+ except kze.NoNodeError:
+ return
+ data = self._strToDict(data)
+ node_request.updateFromDict(data)
+ # TODOv3(jeblair): re-register watches on disconnect