Merge "Add FakeNodepool test fixture" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index 9845484..b84bcf9 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -41,6 +41,7 @@
 import gear
 import fixtures
 import kazoo.client
+import kazoo.exceptions
 import statsd
 import testtools
 from git.exc import NoSuchPathError
@@ -64,6 +65,7 @@
 import zuul.trigger.gerrit
 import zuul.trigger.timer
 import zuul.trigger.zuultrigger
+import zuul.zk
 
 FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
                            'fixtures')
@@ -871,6 +873,63 @@
         return endpoint, ''
 
 
+class FakeNodepool(object):
+    REQUEST_ROOT = '/nodepool/requests'
+
+    log = logging.getLogger("zuul.test.FakeNodepool")
+
+    def __init__(self, host, port, chroot):
+        self.client = kazoo.client.KazooClient(
+            hosts='%s:%s%s' % (host, port, chroot))
+        self.client.start()
+        self._running = True
+        self.thread = threading.Thread(target=self.run)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
+        self.thread.join()
+        self.client.stop()
+        self.client.close()
+
+    def run(self):
+        while self._running:
+            self._run()
+            time.sleep(0.1)
+
+    def _run(self):
+        for req in self.getNodeRequests():
+            self.fulfillRequest(req)
+
+    def getNodeRequests(self):
+        try:
+            reqids = self.client.get_children(self.REQUEST_ROOT)
+        except kazoo.exceptions.NoNodeError:
+            return []
+        reqs = []
+        for oid in sorted(reqids):
+            path = self.REQUEST_ROOT + '/' + oid
+            data, stat = self.client.get(path)
+            data = json.loads(data)
+            data['_oid'] = oid
+            reqs.append(data)
+        return reqs
+
+    def fulfillRequest(self, request):
+        if request['state'] == 'fulfilled':
+            return
+        request = request.copy()
+        request['state'] = 'fulfilled'
+        request['state_time'] = time.time()
+        oid = request['_oid']
+        del request['_oid']
+        path = self.REQUEST_ROOT + '/' + oid
+        data = json.dumps(request)
+        self.log.debug("Fulfilling node request: %s %s" % (oid, data))
+        self.client.set(path, data)
+
+
 class ChrootedKazooFixture(fixtures.Fixture):
     def __init__(self):
         super(ChrootedKazooFixture, self).__init__()
@@ -962,27 +1021,29 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
-            # NOTE(notmorgan): Extract logging overrides for specific libraries
-            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
-            # each. This is used to limit the output during test runs from
-            # libraries that zuul depends on such as gear.
-            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+        # NOTE(notmorgan): Extract logging overrides for specific libraries
+        # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+        # each. This is used to limit the output during test runs from
+        # libraries that zuul depends on such as gear.
+        log_defaults_from_env = os.environ.get(
+            'OS_LOG_DEFAULTS',
+            'git.cmd=INFO,kazoo.client=INFO')
 
-            if log_defaults_from_env:
-                for default in log_defaults_from_env.split(','):
-                    try:
-                        name, level_str = default.split('=', 1)
-                        level = getattr(logging, level_str, logging.DEBUG)
-                        self.useFixture(fixtures.FakeLogger(
-                            name=name,
-                            level=level,
-                            format='%(asctime)s %(name)-32s '
-                                   '%(levelname)-8s %(message)s'))
-                    except ValueError:
-                        # NOTE(notmorgan): Invalid format of the log default,
-                        # skip and don't try and apply a logger for the
-                        # specified module
-                        pass
+        if log_defaults_from_env:
+            for default in log_defaults_from_env.split(','):
+                try:
+                    name, level_str = default.split('=', 1)
+                    level = getattr(logging, level_str, logging.DEBUG)
+                    self.useFixture(fixtures.FakeLogger(
+                        name=name,
+                        level=level,
+                        format='%(asctime)s %(name)-32s '
+                               '%(levelname)-8s %(message)s'))
+                except ValueError:
+                    # NOTE(notmorgan): Invalid format of the log default,
+                    # skip and don't try and apply a logger for the
+                    # specified module
+                    pass
 
 
 class ZuulTestCase(BaseTestCase):
@@ -1140,10 +1201,17 @@
         self.merge_client = zuul.merger.client.MergeClient(
             self.config, self.sched)
         self.nodepool = zuul.nodepool.Nodepool(self.sched)
+        self.zk = zuul.zk.ZooKeeper()
+        self.zk.connect([self.zk_config])
+
+        self.fake_nodepool = FakeNodepool(self.zk_config.host,
+                                          self.zk_config.port,
+                                          self.zk_config.chroot)
 
         self.sched.setLauncher(self.launch_client)
         self.sched.setMerger(self.merge_client)
         self.sched.setNodepool(self.nodepool)
+        self.sched.setZooKeeper(self.zk)
 
         self.webapp = zuul.webapp.WebApp(
             self.sched, port=0, listen_address='127.0.0.1')
@@ -1244,9 +1312,10 @@
 
     def setupZK(self):
         self.zk_chroot_fixture = self.useFixture(ChrootedKazooFixture())
-        self.zookeeper_host = self.zk_chroot_fixture.zookeeper_host
-        self.zookeeper_port = self.zk_chroot_fixture.zookeeper_port
-        self.zookeeper_chroot = self.zk_chroot_fixture.zookeeper_chroot
+        self.zk_config = zuul.zk.ZooKeeperConnectionConfig(
+            self.zk_chroot_fixture.zookeeper_host,
+            self.zk_chroot_fixture.zookeeper_port,
+            self.zk_chroot_fixture.zookeeper_chroot)
 
     def copyDirToRepo(self, project, source_path):
         self.init_repo(project)
@@ -1295,6 +1364,8 @@
         self.rpc.stop()
         self.rpc.join()
         self.gearman_server.shutdown()
+        self.fake_nodepool.stop()
+        self.zk.disconnect()
         threads = threading.enumerate()
         if len(threads) > 1:
             self.log.error("More than one thread is running: %s" % threads)
@@ -1428,6 +1499,11 @@
                 return False
         return True
 
+    def areAllNodeRequestsComplete(self):
+        if self.sched.nodepool.requests:
+            return False
+        return True
+
     def eventQueuesEmpty(self):
         for queue in self.event_queues:
             yield queue.empty()
@@ -1461,7 +1537,8 @@
                 if (not self.merge_client.jobs and
                     all(self.eventQueuesEmpty()) and
                     self.haveAllBuildsReported() and
-                    self.areAllBuildsWaiting()):
+                    self.areAllBuildsWaiting() and
+                    self.areAllNodeRequestsComplete()):
                     self.sched.run_handler_lock.release()
                     self.launch_server.lock.release()
                     self.log.debug("...settled.")
diff --git a/zuul/model.py b/zuul/model.py
index 6e0176f..68ad603 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -395,11 +395,39 @@
         self.build_set = build_set
         self.job = job
         self.nodeset = nodeset
-        self.id = uuid4().hex
+        self._state = 'requested'
+        self.state_time = time.time()
+        self.stat = None
+        self.uid = uuid4().hex
+
+    @property
+    def state(self):
+        return self._state
+
+    @state.setter
+    def state(self, value):
+        # TODOv3(jeblair): reinstate
+        # if value not in STATES:
+        #     raise TypeError("'%s' is not a valid state" % value)
+        self._state = value
+        self.state_time = time.time()
 
     def __repr__(self):
         return '<NodeRequest %s>' % (self.nodeset,)
 
+    def toDict(self):
+        d = {}
+        nodes = [n.image for n in self.nodeset.getNodes()]
+        d['node_types'] = nodes
+        d['requestor'] = 'zuul'  # TODOv3(jeblair): better descriptor
+        d['state'] = self.state
+        d['state_time'] = self.state_time
+        return d
+
+    def updateFromDict(self, data):
+        self._state = data['state']
+        self.state_time = data['state_time']
+
 
 class Job(object):
     """A Job represents the defintion of actions to perform."""
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index addeaf3..2fa74c8 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -10,18 +10,26 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import logging
+
 from zuul.model import NodeRequest
 
 
 class Nodepool(object):
+    log = logging.getLogger('zuul.nodepool')
+
     def __init__(self, scheduler):
         self.requests = {}
         self.sched = scheduler
 
     def requestNodes(self, build_set, job):
         req = NodeRequest(build_set, job, job.nodeset)
-        self.requests[req.id] = req
-        self._requestComplete(req.id)
+        self.requests[req.uid] = req
+        self.log.debug("Submitting node request: %s" % (req,))
+
+        self.sched.zk.submitNodeRequest(req)
+        self._updateNodeRequest(req)
+
         return req
 
     def cancelRequest(self, request):
@@ -31,7 +39,16 @@
     def returnNodes(self, nodes, used=True):
         pass
 
-    def _requestComplete(self, id):
-        req = self.requests[id]
-        del self.requests[id]
-        self.sched.onNodesProvisioned(req)
+    def _updateNodeRequest(self, request):
+        self.log.debug("Updating node request: %s" % (request,))
+
+        def callback(event):
+            self._updateNodeRequest(request)
+        self.sched.zk.getNodeRequest(request, callback)
+
+        if request.uid not in self.requests:
+            return
+
+        if request.state == 'fulfilled':
+            self.sched.onNodesProvisioned(request)
+            del self.requests[request.uid]
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 3f78e8d..4a6cc93 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -285,6 +285,9 @@
     def setNodepool(self, nodepool):
         self.nodepool = nodepool
 
+    def setZooKeeper(self, zk):
+        self.zk = zk
+
     def addEvent(self, event):
         self.log.debug("Adding trigger event: %s" % event)
         try:
diff --git a/zuul/zk.py b/zuul/zk.py
new file mode 100644
index 0000000..2e87205
--- /dev/null
+++ b/zuul/zk.py
@@ -0,0 +1,309 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import six
+import time
+from kazoo.client import KazooClient, KazooState
+from kazoo import exceptions as kze
+
+# States:
+# We are building this node but it is not ready for use.
+BUILDING = 'building'
+# The node is ready for use.
+READY = 'ready'
+# The node should be deleted.
+DELETING = 'deleting'
+
+STATES = set([BUILDING, READY, DELETING])
+
+
+class ZooKeeperConnectionConfig(object):
+    '''
+    Represents the connection parameters for a ZooKeeper server.
+    '''
+
+    def __eq__(self, other):
+        if isinstance(other, ZooKeeperConnectionConfig):
+            if other.__dict__ == self.__dict__:
+                return True
+        return False
+
+    def __init__(self, host, port=2181, chroot=None):
+        '''Initialize the ZooKeeperConnectionConfig object.
+
+        :param str host: The hostname of the ZooKeeper server.
+        :param int port: The port on which ZooKeeper is listening.
+            Optional, default: 2181.
+        :param str chroot: A chroot for this connection.  All
+            ZooKeeper nodes will be underneath this root path.
+            Optional, default: None.
+
+        (one per server) defining the ZooKeeper cluster servers. Only
+        the 'host' attribute is required.'.
+
+        '''
+        self.host = host
+        self.port = port
+        self.chroot = chroot or ''
+
+
+def buildZooKeeperHosts(host_list):
+    '''
+    Build the ZK cluster host list for client connections.
+
+    :param list host_list: A list of
+        :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one
+        per server) defining the ZooKeeper cluster servers.
+    '''
+    if not isinstance(host_list, list):
+        raise Exception("'host_list' must be a list")
+    hosts = []
+    for host_def in host_list:
+        host = '%s:%s%s' % (host_def.host, host_def.port, host_def.chroot)
+        hosts.append(host)
+    return ",".join(hosts)
+
+
+class BaseModel(object):
+    def __init__(self, o_id):
+        if o_id:
+            self.id = o_id
+        self._state = None
+        self.state_time = None
+        self.stat = None
+
+    @property
+    def id(self):
+        return self._id
+
+    @id.setter
+    def id(self, value):
+        if not isinstance(value, six.string_types):
+            raise TypeError("'id' attribute must be a string type")
+        self._id = value
+
+    @property
+    def state(self):
+        return self._state
+
+    @state.setter
+    def state(self, value):
+        if value not in STATES:
+            raise TypeError("'%s' is not a valid state" % value)
+        self._state = value
+        self.state_time = time.time()
+
+    def toDict(self):
+        '''
+        Convert a BaseModel object's attributes to a dictionary.
+        '''
+        d = {}
+        d['state'] = self.state
+        d['state_time'] = self.state_time
+        return d
+
+    def fromDict(self, d):
+        '''
+        Set base attributes based on the given dict.
+
+        Unlike the derived classes, this should NOT return an object as it
+        assumes self has already been instantiated.
+        '''
+        if 'state' in d:
+            self.state = d['state']
+        if 'state_time' in d:
+            self.state_time = d['state_time']
+
+
+class NodeRequest(BaseModel):
+    '''
+    Class representing a node request.
+    '''
+
+    def __init__(self, id=None):
+        super(NodeRequest, self).__init__(id)
+
+    def __repr__(self):
+        d = self.toDict()
+        d['id'] = self.id
+        d['stat'] = self.stat
+        return '<NodeRequest %s>' % d
+
+    def toDict(self):
+        '''
+        Convert a NodeRequest object's attributes to a dictionary.
+        '''
+        d = super(NodeRequest, self).toDict()
+        return d
+
+    @staticmethod
+    def fromDict(d, o_id=None):
+        '''
+        Create a NodeRequest object from a dictionary.
+
+        :param dict d: The dictionary.
+        :param str o_id: The object ID.
+
+        :returns: An initialized ImageBuild object.
+        '''
+        o = NodeRequest(o_id)
+        super(NodeRequest, o).fromDict(d)
+        return o
+
+
+class ZooKeeper(object):
+    '''
+    Class implementing the ZooKeeper interface.
+
+    This class uses the facade design pattern to keep common interaction
+    with the ZooKeeper API simple and consistent for the caller, and
+    limits coupling between objects. It allows for more complex interactions
+    by providing direct access to the client connection when needed (though
+    that is discouraged). It also provides for a convenient entry point for
+    testing only ZooKeeper interactions.
+    '''
+
+    log = logging.getLogger("zuul.zk.ZooKeeper")
+
+    REQUEST_ROOT = '/nodepool/requests'
+
+    def __init__(self):
+        '''
+        Initialize the ZooKeeper object.
+        '''
+        self.client = None
+        self._became_lost = False
+
+    def _dictToStr(self, data):
+        return json.dumps(data)
+
+    def _strToDict(self, data):
+        return json.loads(data)
+
+    def _connection_listener(self, state):
+        '''
+        Listener method for Kazoo connection state changes.
+
+        .. warning:: This method must not block.
+        '''
+        if state == KazooState.LOST:
+            self.log.debug("ZooKeeper connection: LOST")
+            self._became_lost = True
+        elif state == KazooState.SUSPENDED:
+            self.log.debug("ZooKeeper connection: SUSPENDED")
+        else:
+            self.log.debug("ZooKeeper connection: CONNECTED")
+
+    @property
+    def connected(self):
+        return self.client.state == KazooState.CONNECTED
+
+    @property
+    def suspended(self):
+        return self.client.state == KazooState.SUSPENDED
+
+    @property
+    def lost(self):
+        return self.client.state == KazooState.LOST
+
+    @property
+    def didLoseConnection(self):
+        return self._became_lost
+
+    def resetLostFlag(self):
+        self._became_lost = False
+
+    def connect(self, host_list, read_only=False):
+        '''
+        Establish a connection with ZooKeeper cluster.
+
+        Convenience method if a pre-existing ZooKeeper connection is not
+        supplied to the ZooKeeper object at instantiation time.
+
+        :param list host_list: A list of
+            :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
+            (one per server) defining the ZooKeeper cluster servers.
+        :param bool read_only: If True, establishes a read-only connection.
+
+        '''
+        if self.client is None:
+            hosts = buildZooKeeperHosts(host_list)
+            self.client = KazooClient(hosts=hosts, read_only=read_only)
+            self.client.add_listener(self._connection_listener)
+            self.client.start()
+
+    def disconnect(self):
+        '''
+        Close the ZooKeeper cluster connection.
+
+        You should call this method if you used connect() to establish a
+        cluster connection.
+        '''
+        if self.client is not None and self.client.connected:
+            self.client.stop()
+            self.client.close()
+            self.client = None
+
+    def resetHosts(self, host_list):
+        '''
+        Reset the ZooKeeper cluster connection host list.
+
+        :param list host_list: A list of
+            :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
+            (one per server) defining the ZooKeeper cluster servers.
+        '''
+        if self.client is not None:
+            hosts = buildZooKeeperHosts(host_list)
+            self.client.set_hosts(hosts=hosts)
+
+    def submitNodeRequest(self, node_request):
+        '''
+        Submit a request for nodes to Nodepool.
+
+        :param NodeRequest node_request: A NodeRequest with the
+            contents of the request.
+        '''
+        priority = 100  # TODO(jeblair): integrate into nodereq
+
+        data = node_request.toDict()
+        data['created_time'] = time.time()
+
+        path = '%s/%s-' % (self.REQUEST_ROOT, priority)
+        self.log.debug(data)
+        path = self.client.create(path, self._dictToStr(data),
+                                  makepath=True,
+                                  sequence=True, ephemeral=True)
+        reqid = path.split("/")[-1]
+        node_request.id = reqid
+
+    def getNodeRequest(self, node_request, watcher):
+        '''
+        Read the specified node request and update its values.
+
+        :param NodeRequest node_request: A NodeRequest to be read.  It
+            will be updated with the results of the read.
+        :param callable watcher: A watch function to be called when the
+            node request is updated.
+        '''
+
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        try:
+            data, stat = self.client.get(path, watch=watcher)
+        except kze.NoNodeError:
+            return
+        data = self._strToDict(data)
+        node_request.updateFromDict(data)
+        # TODOv3(jeblair): re-register watches on disconnect