Fix stuck node requests across ZK reconnection

When a request is fulfilled by nodepool, we add it to the scheduler's
event queue, and later, the scheduler processes the event and accepts
the nodes.  If there is a ZooKeeper disconnection in the interim, then
we will have noticed it and not locked the nodes, however, the scheduler
will still pass on the request to the pipeline manager and we will
attempt to run jobs on the unlocked nodes, which will continually
fail.

This change extends the handling of a lost request so that if it happens,
we retry the request (which is what would happen if the request is lucky
enough to have been lost before fulfillment).

This extends the fix in 94e95886e2179f4a6aeecad687509bc7b1ab7fd3.

Change-Id: If81a790ed8b16594f4f9186d9256200b8d5e707e
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9a2eb28..9b54084 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -37,6 +37,7 @@
     ZuulTestCase,
     repack_repo,
     simple_layout,
+    iterate_timeout,
 )
 
 
@@ -4397,6 +4398,54 @@
         self.assertEqual(A.data['status'], 'MERGED')
         self.assertEqual(A.reported, 2)
 
+    def test_zookeeper_disconnect2(self):
+        "Test that jobs are executed after a zookeeper disconnect"
+
+        # This tests receiving a ZK disconnect between the arrival of
+        # a fulfilled request and when we accept its nodes.
+        self.fake_nodepool.paused = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        # We're waiting on the nodepool request to complete.  Stop the
+        # scheduler from processing further events, then fulfill the
+        # nodepool request.
+        self.sched.run_handler_lock.acquire()
+
+        # Fulfill the nodepool request.
+        self.fake_nodepool.paused = False
+        requests = list(self.sched.nodepool.requests.values())
+        self.assertEqual(1, len(requests))
+        request = requests[0]
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        id1 = request.id
+
+        # The request is fulfilled, but the scheduler hasn't processed
+        # it yet.  Reconnect ZK.
+        self.zk.client.stop()
+        self.zk.client.start()
+
+        # Allow the scheduler to continue and process the (now
+        # out-of-date) notification that nodes are ready.
+        self.sched.run_handler_lock.release()
+
+        # It should resubmit the request, once it's fulfilled, we can
+        # wait for it to run jobs and settle.
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        self.waitUntilSettled()
+
+        id2 = request.id
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        # Make sure it was resubmitted (the id's should be different).
+        self.assertNotEqual(id1, id2)
+
     def test_nodepool_failure(self):
         "Test that jobs are reported after a nodepool failure"