Handle double node locking snafu

If our queue processing is slow, and we lose the ZooKeeper session
after a node request has been fulfilled, but before we actually accept
the nodes, we need to be aware of this and not try to use the nodes
given to us.

Also pass the request ID along in the event queue since the actual
request object can have its ID changed out from underneath us on a
resubmit. Compare this ID with the request ID, and also verify that
the actual request still exists.

Furthermore, when we lock nodes, let's make sure that they are actually
allocated to the request we are processing.

Change-Id: Id89f6542afcf3f5d4a0b392b5cb8cf21ec3f6865
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
         self.assertEqual(request.state, 'fulfilled')
 
         # Accept the nodes
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request.id)
         nodeset = request.nodeset
 
         for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
 
         self.waitForRequests()
         self.assertEqual(len(self.provisioned_requests), 0)
+
+    def test_accept_nodes_resubmitted(self):
+        # Test that a resubmitted request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        # Accept the nodes, passing a different ID
+        self.nodepool.acceptNodes(request, "invalid")
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
+
+    def test_accept_nodes_lost_request(self):
+        # Test that a lost request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        self.zk.deleteNodeRequest(request)
+
+        # Accept the nodes
+        self.nodepool.acceptNodes(request, request.id)
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
             except Exception:
                 self.log.exception("Error unlocking node:")
 
-    def lockNodeSet(self, nodeset):
-        self._lockNodes(nodeset.getNodes())
+    def lockNodeSet(self, nodeset, request_id):
+        self._lockNodes(nodeset.getNodes(), request_id)
 
-    def _lockNodes(self, nodes):
+    def _lockNodes(self, nodes, request_id):
         # Try to lock all of the supplied nodes.  If any lock fails,
         # try to unlock any which have already been locked before
         # re-raising the error.
         locked_nodes = []
         try:
             for node in nodes:
+                if node.allocated_to != request_id:
+                    raise Exception("Node %s allocated to %s, not %s" %
+                                    (node.id, node.allocated_to, request_id))
                 self.log.debug("Locking node %s" % (node,))
                 self.sched.zk.lockNode(node, timeout=30)
                 locked_nodes.append(node)
@@ -141,7 +144,12 @@
             del self.requests[request.uid]
             return False
 
-        if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+        # TODOv3(jeblair): handle allocation failure
+        if deleted:
+            self.log.debug("Resubmitting lost node request %s" % (request,))
+            request.id = None
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
             self.log.info("Node request %s %s" % (request, request.state))
 
             # Give our results to the scheduler.
@@ -150,18 +158,29 @@
 
             # Stop watching this request node.
             return False
-        # TODOv3(jeblair): handle allocation failure
-        elif deleted:
-            self.log.debug("Resubmitting lost node request %s" % (request,))
-            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
         return True
 
-    def acceptNodes(self, request):
+    def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
         # nodes for (potential) use.
 
         self.log.info("Accepting node request %s" % (request,))
 
+        if request_id != request.id:
+            self.log.info("Skipping node accept for %s (resubmitted as %s)",
+                          request_id, request.id)
+            return
+
+        # Make sure the request still exists. It's possible it could have
+        # disappeared if we lost the ZK session between when the fulfillment
+        # response was added to our queue, and when we actually get around to
+        # processing it. Nodepool will automatically reallocate the assigned
+        # nodes in that situation.
+        if not self.sched.zk.nodeRequestExists(request):
+            self.log.info("Request %s no longer exists", request.id)
+            return
+
         if request.canceled:
             self.log.info("Ignoring canceled node request %s" % (request,))
             # The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
         if request.fulfilled:
             # If the request suceeded, try to lock the nodes.
             try:
-                self.lockNodeSet(request.nodeset)
+                self.lockNodeSet(request.nodeset, request.id)
                 locked = True
             except Exception:
                 self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..ab147ba 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
 
     def __init__(self, request):
         self.request = request
+        self.request_id = request.id
 
 
 def toList(item):
@@ -889,9 +890,10 @@
 
     def _doNodesProvisionedEvent(self, event):
         request = event.request
+        request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request_id)
         if request.canceled:
             return
 
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..41d0429 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -187,6 +187,19 @@
         except kze.NoNodeError:
             pass
 
+    def nodeRequestExists(self, node_request):
+        '''
+        See if a NodeRequest exists in ZooKeeper.
+
+        :param NodeRequest node_request: A NodeRequest to verify.
+
+        :returns: True if the request exists, False otherwise.
+        '''
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        if self.client.exists(path):
+            return True
+        return False
+
     def storeNode(self, node):
         '''Store the node.