Merge "Fix stuck node requests across ZK reconnection"
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9a2eb28..9b54084 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -37,6 +37,7 @@
     ZuulTestCase,
     repack_repo,
     simple_layout,
+    iterate_timeout,
 )
 
 
@@ -4397,6 +4398,54 @@
         self.assertEqual(A.data['status'], 'MERGED')
         self.assertEqual(A.reported, 2)
 
+    def test_zookeeper_disconnect2(self):
+        "Test that jobs are executed after a zookeeper disconnect"
+
+        # This tests receiving a ZK disconnect between the arrival of
+        # a fulfilled request and when we accept its nodes.
+        self.fake_nodepool.paused = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        # We're waiting on the nodepool request to complete.  Stop the
+        # scheduler from processing further events, then fulfill the
+        # nodepool request.
+        self.sched.run_handler_lock.acquire()
+
+        # Fulfill the nodepool request.
+        self.fake_nodepool.paused = False
+        requests = list(self.sched.nodepool.requests.values())
+        self.assertEqual(1, len(requests))
+        request = requests[0]
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        id1 = request.id
+
+        # The request is fulfilled, but the scheduler hasn't processed
+        # it yet.  Reconnect ZK.
+        self.zk.client.stop()
+        self.zk.client.start()
+
+        # Allow the scheduler to continue and process the (now
+        # out-of-date) notification that nodes are ready.
+        self.sched.run_handler_lock.release()
+
+        # It should resubmit the request, once it's fulfilled, we can
+        # wait for it to run jobs and settle.
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        self.waitUntilSettled()
+
+        id2 = request.id
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        # Make sure it was resubmitted (the id's should be different).
+        self.assertNotEqual(id1, id2)
+
     def test_nodepool_failure(self):
         "Test that jobs are reported after a nodepool failure"
 
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b96d1ca..6e7064c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -165,6 +165,7 @@
         self.log.debug("Updating node request %s" % (request,))
 
         if request.uid not in self.requests:
+            self.log.debug("Request %s is unknown" % (request.uid,))
             return False
 
         if request.canceled:
@@ -193,14 +194,21 @@
 
     def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
-        # nodes for (potential) use.
+        # nodes for (potential) use.  Return False if there is a
+        # problem with the request (canceled or retrying), True if it
+        # is ready to be acted upon (success or failure).
 
         self.log.info("Accepting node request %s" % (request,))
 
         if request_id != request.id:
             self.log.info("Skipping node accept for %s (resubmitted as %s)",
                           request_id, request.id)
-            return
+            return False
+
+        if request.canceled:
+            self.log.info("Ignoring canceled node request %s" % (request,))
+            # The request was already deleted when it was canceled
+            return False
 
         # Make sure the request still exists. It's possible it could have
         # disappeared if we lost the ZK session between when the fulfillment
@@ -208,13 +216,13 @@
         # processing it. Nodepool will automatically reallocate the assigned
         # nodes in that situation.
         if not self.sched.zk.nodeRequestExists(request):
-            self.log.info("Request %s no longer exists", request.id)
-            return
-
-        if request.canceled:
-            self.log.info("Ignoring canceled node request %s" % (request,))
-            # The request was already deleted when it was canceled
-            return
+            self.log.info("Request %s no longer exists, resubmitting",
+                          request.id)
+            request.id = None
+            request.state = model.STATE_REQUESTED
+            self.requests[request.uid] = request
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+            return False
 
         locked = False
         if request.fulfilled:
@@ -239,3 +247,4 @@
             # them.
             if locked:
                 self.unlockNodeSet(request.nodeset)
+        return True
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2130ede..14ca029 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1035,8 +1035,8 @@
         request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request, request_id)
-        if request.canceled:
+        ready = self.nodepool.acceptNodes(request, request_id)
+        if not ready:
             return
 
         if build_set is not build_set.item.current_build_set: