Fix stuck node requests across ZK reconnection

When a request is fulfilled by nodepool, we add it to the scheduler's
event queue, and later, the scheduler processes the event and accepts
the nodes.  If there is a ZooKeeper disconnection in the interim, then
we will have noticed it and not locked the nodes, however, the scheduler
will still pass on the request to the pipeline manager and we will
attempt to run jobs on the unlocked nodes, which will continually
fail.

This change extends the handling of a lost request so that if it happens,
we retry the request (which is what would happen if the request is lucky
enough to have been lost before fulfillment).

This extends the fix in 94e95886e2179f4a6aeecad687509bc7b1ab7fd3.

Change-Id: If81a790ed8b16594f4f9186d9256200b8d5e707e
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9a2eb28..9b54084 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -37,6 +37,7 @@
     ZuulTestCase,
     repack_repo,
     simple_layout,
+    iterate_timeout,
 )
 
 
@@ -4397,6 +4398,54 @@
         self.assertEqual(A.data['status'], 'MERGED')
         self.assertEqual(A.reported, 2)
 
+    def test_zookeeper_disconnect2(self):
+        "Test that jobs are executed after a zookeeper disconnect"
+
+        # This tests receiving a ZK disconnect between the arrival of
+        # a fulfilled request and when we accept its nodes.
+        self.fake_nodepool.paused = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        # We're waiting on the nodepool request to complete.  Stop the
+        # scheduler from processing further events, then fulfill the
+        # nodepool request.
+        self.sched.run_handler_lock.acquire()
+
+        # Fulfill the nodepool request.
+        self.fake_nodepool.paused = False
+        requests = list(self.sched.nodepool.requests.values())
+        self.assertEqual(1, len(requests))
+        request = requests[0]
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        id1 = request.id
+
+        # The request is fulfilled, but the scheduler hasn't processed
+        # it yet.  Reconnect ZK.
+        self.zk.client.stop()
+        self.zk.client.start()
+
+        # Allow the scheduler to continue and process the (now
+        # out-of-date) notification that nodes are ready.
+        self.sched.run_handler_lock.release()
+
+        # It should resubmit the request, once it's fulfilled, we can
+        # wait for it to run jobs and settle.
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        self.waitUntilSettled()
+
+        id2 = request.id
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        # Make sure it was resubmitted (the id's should be different).
+        self.assertNotEqual(id1, id2)
+
     def test_nodepool_failure(self):
         "Test that jobs are reported after a nodepool failure"
 
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b96d1ca..6e7064c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -165,6 +165,7 @@
         self.log.debug("Updating node request %s" % (request,))
 
         if request.uid not in self.requests:
+            self.log.debug("Request %s is unknown" % (request.uid,))
             return False
 
         if request.canceled:
@@ -193,14 +194,21 @@
 
     def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
-        # nodes for (potential) use.
+        # nodes for (potential) use.  Return False if there is a
+        # problem with the request (canceled or retrying), True if it
+        # is ready to be acted upon (success or failure).
 
         self.log.info("Accepting node request %s" % (request,))
 
         if request_id != request.id:
             self.log.info("Skipping node accept for %s (resubmitted as %s)",
                           request_id, request.id)
-            return
+            return False
+
+        if request.canceled:
+            self.log.info("Ignoring canceled node request %s" % (request,))
+            # The request was already deleted when it was canceled
+            return False
 
         # Make sure the request still exists. It's possible it could have
         # disappeared if we lost the ZK session between when the fulfillment
@@ -208,13 +216,13 @@
         # processing it. Nodepool will automatically reallocate the assigned
         # nodes in that situation.
         if not self.sched.zk.nodeRequestExists(request):
-            self.log.info("Request %s no longer exists", request.id)
-            return
-
-        if request.canceled:
-            self.log.info("Ignoring canceled node request %s" % (request,))
-            # The request was already deleted when it was canceled
-            return
+            self.log.info("Request %s no longer exists, resubmitting",
+                          request.id)
+            request.id = None
+            request.state = model.STATE_REQUESTED
+            self.requests[request.uid] = request
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+            return False
 
         locked = False
         if request.fulfilled:
@@ -239,3 +247,4 @@
             # them.
             if locked:
                 self.unlockNodeSet(request.nodeset)
+        return True
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2130ede..14ca029 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1035,8 +1035,8 @@
         request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request, request_id)
-        if request.canceled:
+        ready = self.nodepool.acceptNodes(request, request_id)
+        if not ready:
             return
 
         if build_set is not build_set.item.current_build_set: