Merge "Handle double node locking snafu" into feature/zuulv3
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
         self.assertEqual(request.state, 'fulfilled')
 
         # Accept the nodes
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request.id)
         nodeset = request.nodeset
 
         for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
 
         self.waitForRequests()
         self.assertEqual(len(self.provisioned_requests), 0)
+
+    def test_accept_nodes_resubmitted(self):
+        # Test that a resubmitted request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        # Accept the nodes, passing a different ID
+        self.nodepool.acceptNodes(request, "invalid")
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
+
+    def test_accept_nodes_lost_request(self):
+        # Test that a lost request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        self.zk.deleteNodeRequest(request)
+
+        # Accept the nodes
+        self.nodepool.acceptNodes(request, request.id)
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
             except Exception:
                 self.log.exception("Error unlocking node:")
 
-    def lockNodeSet(self, nodeset):
-        self._lockNodes(nodeset.getNodes())
+    def lockNodeSet(self, nodeset, request_id):
+        self._lockNodes(nodeset.getNodes(), request_id)
 
-    def _lockNodes(self, nodes):
+    def _lockNodes(self, nodes, request_id):
         # Try to lock all of the supplied nodes.  If any lock fails,
         # try to unlock any which have already been locked before
         # re-raising the error.
         locked_nodes = []
         try:
             for node in nodes:
+                if node.allocated_to != request_id:
+                    raise Exception("Node %s allocated to %s, not %s" %
+                                    (node.id, node.allocated_to, request_id))
                 self.log.debug("Locking node %s" % (node,))
                 self.sched.zk.lockNode(node, timeout=30)
                 locked_nodes.append(node)
@@ -141,7 +144,12 @@
             del self.requests[request.uid]
             return False
 
-        if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+        # TODOv3(jeblair): handle allocation failure
+        if deleted:
+            self.log.debug("Resubmitting lost node request %s" % (request,))
+            request.id = None
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
             self.log.info("Node request %s %s" % (request, request.state))
 
             # Give our results to the scheduler.
@@ -150,18 +158,29 @@
 
             # Stop watching this request node.
             return False
-        # TODOv3(jeblair): handle allocation failure
-        elif deleted:
-            self.log.debug("Resubmitting lost node request %s" % (request,))
-            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
         return True
 
-    def acceptNodes(self, request):
+    def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
         # nodes for (potential) use.
 
         self.log.info("Accepting node request %s" % (request,))
 
+        if request_id != request.id:
+            self.log.info("Skipping node accept for %s (resubmitted as %s)",
+                          request_id, request.id)
+            return
+
+        # Make sure the request still exists. It's possible it could have
+        # disappeared if we lost the ZK session between when the fulfillment
+        # response was added to our queue, and when we actually get around to
+        # processing it. Nodepool will automatically reallocate the assigned
+        # nodes in that situation.
+        if not self.sched.zk.nodeRequestExists(request):
+            self.log.info("Request %s no longer exists", request.id)
+            return
+
         if request.canceled:
             self.log.info("Ignoring canceled node request %s" % (request,))
             # The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
         if request.fulfilled:
             # If the request suceeded, try to lock the nodes.
             try:
-                self.lockNodeSet(request.nodeset)
+                self.lockNodeSet(request.nodeset, request.id)
                 locked = True
             except Exception:
                 self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..ab147ba 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
 
     def __init__(self, request):
         self.request = request
+        self.request_id = request.id
 
 
 def toList(item):
@@ -889,9 +890,10 @@
 
     def _doNodesProvisionedEvent(self, event):
         request = event.request
+        request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request_id)
         if request.canceled:
             return
 
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..41d0429 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -187,6 +187,19 @@
         except kze.NoNodeError:
             pass
 
+    def nodeRequestExists(self, node_request):
+        '''
+        See if a NodeRequest exists in ZooKeeper.
+
+        :param NodeRequest node_request: A NodeRequest to verify.
+
+        :returns: True if the request exists, False otherwise.
+        '''
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        if self.client.exists(path):
+            return True
+        return False
+
     def storeNode(self, node):
         '''Store the node.