Merge "Lock nodes when nodepool request is fulfilled" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index 9ee5838..9ec8e54 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -919,14 +919,45 @@
             reqs.append(data)
         return reqs
 
+    def makeNode(self, request_id, node_type):
+        now = time.time()
+        path = '/nodepool/nodes/'
+        data = dict(type=node_type,
+                    provider='test-provider',
+                    region='test-region',
+                    az=None,
+                    public_ipv4='127.0.0.1',
+                    private_ipv4=None,
+                    public_ipv6=None,
+                    allocated_to=request_id,
+                    state='ready',
+                    state_time=now,
+                    created_time=now,
+                    updated_time=now,
+                    image_id=None,
+                    launcher='fake-nodepool')
+        data = json.dumps(data)
+        path = self.client.create(path, data,
+                                  makepath=True,
+                                  sequence=True)
+        nodeid = path.split("/")[-1]
+        return nodeid
+
     def fulfillRequest(self, request):
         if request['state'] == 'fulfilled':
             return
         request = request.copy()
-        request['state'] = 'fulfilled'
-        request['state_time'] = time.time()
         oid = request['_oid']
         del request['_oid']
+
+        nodes = []
+        for node in request['node_types']:
+            nodeid = self.makeNode(oid, node)
+            nodes.append(nodeid)
+
+        request['state'] = 'fulfilled'
+        request['state_time'] = time.time()
+        request['nodes'] = nodes
         path = self.REQUEST_ROOT + '/' + oid
         data = json.dumps(request)
         self.log.debug("Fulfilling node request: %s %s" % (oid, data))
diff --git a/tests/test_nodepool.py b/tests/test_nodepool.py
index 3fb0335..b5b9b17 100644
--- a/tests/test_nodepool.py
+++ b/tests/test_nodepool.py
@@ -70,6 +70,14 @@
         self.assertEqual(len(self.provisioned_requests), 1)
         self.assertEqual(request.state, 'fulfilled')
 
+        # Accept the nodes
+        self.nodepool.acceptNodes(request)
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNotNone(node.lock)
+            self.assertEqual(node.state, 'ready')
+
     def test_node_request_disconnect(self):
         # Test that node requests are re-submitted after disconnect
 
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 7a4c7cc..213cfc4 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -615,6 +615,7 @@
             item.setUnableToMerge()
 
     def onNodesProvisioned(self, event):
+        # TODOv3(jeblair): handle provisioning failure here
         request = event.request
         build_set = request.build_set
         build_set.jobNodeRequestComplete(request.job.name, request,
diff --git a/zuul/model.py b/zuul/model.py
index 745deff..48ff982 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -354,10 +354,36 @@
         self.name = name
         self.image = image
         self.id = None
+        self.lock = None
+        # Attributes from Nodepool
+        self._state = 'unknown'
+        self.state_time = time.time()
+        self.public_ipv4 = None
+        self.private_ipv4 = None
+        self.public_ipv6 = None
+
+    @property
+    def state(self):
+        return self._state
+
+    @state.setter
+    def state(self, value):
+        # TODOv3(jeblair): reinstate
+        # if value not in STATES:
+        #     raise TypeError("'%s' is not a valid state" % value)
+        self._state = value
+        self.state_time = time.time()
 
     def __repr__(self):
         return '<Node %s %s:%s>' % (self.id, self.name, self.image)
 
+    def updateFromDict(self, data):
+        self._state = data['state']
+        self.state_time = data['state_time']
+        self.public_ipv4 = data.get('public_ipv4')
+        self.private_ipv4 = data.get('private_ipv4')
+        self.public_ipv6 = data.get('public_ipv6')
+
 
 class NodeSet(object):
     """A set of nodes.
@@ -407,6 +433,9 @@
         self.stat = None
         self.uid = uuid4().hex
         self.id = None
+        # Zuul internal failure flag (not stored in ZK so it's not
+        # overwritten).
+        self.failed = False
 
     @property
     def state(self):
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9d0d803..903d90c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -41,6 +41,34 @@
     def returnNodes(self, nodes, used=True):
         pass
 
+    def unlockNodeset(self, nodeset):
+        self._unlockNodes(nodeset.getNodes())
+
+    def _unlockNodes(self, nodes):
+        for node in nodes:
+            try:
+                self.sched.zk.unlockNode(node)
+            except Exception:
+                self.log.exception("Error unlocking node:")
+
+    def lockNodeset(self, nodeset):
+        self._lockNodes(nodeset.getNodes())
+
+    def _lockNodes(self, nodes):
+        # Try to lock all of the supplied nodes.  If any lock fails,
+        # try to unlock any which have already been locked before
+        # re-raising the error.
+        locked_nodes = []
+        try:
+            for node in nodes:
+                self.log.debug("Locking node: %s" % (node,))
+                self.sched.zk.lockNode(node)
+                locked_nodes.append(node)
+        except Exception:
+            self.log.exception("Error locking nodes:")
+            self._unlockNodes(locked_nodes)
+            raise
+
     def _updateNodeRequest(self, request, deleted):
         # Return False to indicate that we should stop watching the
         # node.
@@ -50,10 +78,45 @@
             return False
 
         if request.state == 'fulfilled':
+            self.log.info("Node request %s fulfilled" % (request,))
+
+            # Give our results to the scheduler.
             self.sched.onNodesProvisioned(request)
             del self.requests[request.uid]
+
+            # Stop watching this request node.
             return False
+        # TODOv3(jeblair): handle allocation failure
         elif deleted:
             self.log.debug("Resubmitting lost node request %s" % (request,))
             self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
         return True
+
+    def acceptNodes(self, request):
+        # Called by the scheduler when it wants to accept and lock
+        # nodes for (potential) use.
+
+        self.log.debug("Accepting node request: %s" % (request,))
+
+        # First, try to lock the nodes.
+        locked = False
+        try:
+            self.lockNodeset(request.nodeset)
+            locked = True
+        except Exception:
+            self.log.exception("Error locking nodes:")
+            request.failed = True
+
+        # Regardless of whether locking succeeded, delete the
+        # request.
+        self.log.debug("Deleting node request: %s" % (request,))
+        try:
+            self.sched.zk.deleteNodeRequest(request)
+        except Exception:
+            self.log.exception("Error deleting node request:")
+            request.failed = True
+            # If deleting the request failed, and we did lock the
+            # nodes, unlock the nodes since we're not going to use
+            # them.
+            if locked:
+                self.unlockNodeset(request.nodeset)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 4a6cc93..270e055 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -800,6 +800,14 @@
     def _doNodesProvisionedEvent(self, event):
         request = event.request
         build_set = request.build_set
+
+        try:
+            self.nodepool.acceptNodes(request)
+        except Exception:
+            self.log.exception("Unable to accept nodes from request %s:"
+                               % (request,))
+            return
+
         if build_set is not build_set.item.current_build_set:
             self.log.warning("Build set %s is not current" % (build_set,))
             self.nodepool.returnNodes(request.nodes, used=False)
diff --git a/zuul/zk.py b/zuul/zk.py
index 190d7b4..4f7d736 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -17,6 +17,8 @@
 import six
 import time
 from kazoo.client import KazooClient, KazooState
+from kazoo import exceptions as kze
+from kazoo.recipe.lock import Lock
 
 # States:
 # We are building this node but it is not ready for use.
@@ -29,6 +31,10 @@
 STATES = set([BUILDING, READY, DELETING])
 
 
+class LockException(Exception):
+    pass
+
+
 class ZooKeeperConnectionConfig(object):
     '''
     Represents the connection parameters for a ZooKeeper server.
@@ -178,6 +184,7 @@
     log = logging.getLogger("zuul.zk.ZooKeeper")
 
     REQUEST_ROOT = '/nodepool/requests'
+    NODE_ROOT = '/nodepool/nodes'
 
     def __init__(self):
         '''
@@ -300,7 +307,69 @@
             if data:
                 data = self._strToDict(data)
                 node_request.updateFromDict(data)
+                request_nodes = node_request.nodeset.getNodes()
+                for i, nodeid in enumerate(data.get('nodes', [])):
+                    node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
+                    node_data, node_stat = self.client.get(node_path)
+                    node_data = self._strToDict(node_data)
+                    request_nodes[i].id = nodeid
+                    request_nodes[i].updateFromDict(node_data)
             deleted = (data is None)  # data *are* none
             return watcher(node_request, deleted)
 
         self.client.DataWatch(path, callback)
+
+    def deleteNodeRequest(self, node_request):
+        '''
+        Delete a request for nodes.
+
+        :param NodeRequest node_request: A NodeRequest with the
+            contents of the request.
+        '''
+
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        try:
+            self.client.delete(path)
+        except kze.NoNodeError:
+            pass
+
+    def lockNode(self, node, blocking=True, timeout=None):
+        '''
+        Lock a node.
+
+        This should be called as soon as a request is fulfilled and
+        the lock held for as long as the node is in-use.  It can be
+        used by nodepool to detect if Zuul has gone offline and the
+        node should be reclaimed.
+
+        :param Node node: The node which should be locked.
+        '''
+
+        lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id)
+        try:
+            lock = Lock(self.client, lock_path)
+            have_lock = lock.acquire(blocking, timeout)
+        except kze.LockTimeout:
+            raise LockException(
+                "Timeout trying to acquire lock %s" % lock_path)
+
+        # If we aren't blocking, it's possible we didn't get the lock
+        # because someone else has it.
+        if not have_lock:
+            raise LockException("Did not get lock on %s" % lock_path)
+
+        node.lock = lock
+
+    def unlockNode(self, node):
+        '''
+        Unlock a node.
+
+        The node must already have been locked.
+
+        :param Node node: The node which should be unlocked.
+        '''
+
+        if node.lock is None:
+            raise LockException("Node %s does not hold a lock" % (node,))
+        node.lock.release()
+        node.lock = None