Merge "Add Executor Merger and Ansible execution statsd counters"
diff --git a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
index 95ab870..ba35eb0 100644
--- a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
+++ b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
@@ -1,6 +1,7 @@
 - hosts: localhost
   tasks:
     - command: dd if=/dev/zero of=toobig bs=1M count=2
+    - command: sync
     - wait_for:
         delay: 10
         path: /
diff --git a/tests/unit/test_disk_accountant.py b/tests/unit/test_disk_accountant.py
index 7081b53..e12846d 100644
--- a/tests/unit/test_disk_accountant.py
+++ b/tests/unit/test_disk_accountant.py
@@ -10,6 +10,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import fixtures
 import os
 import tempfile
 import time
@@ -32,6 +33,10 @@
 
 
 class TestDiskAccountant(BaseTestCase):
+    def setUp(self):
+        super(TestDiskAccountant, self).setUp()
+        self.useFixture(fixtures.NestedTempfile())
+
     def test_disk_accountant(self):
         jobs_dir = tempfile.mkdtemp(
             dir=os.environ.get("ZUUL_TEST_ROOT", None))
@@ -47,6 +52,8 @@
             testfile = os.path.join(jobdir, 'tfile')
             with open(testfile, 'w') as tf:
                 tf.write(2 * 1024 * 1024 * '.')
+                tf.flush()
+                os.fsync(tf.fileno())
 
             # da should catch over-limit dir within 5 seconds
             for i in range(0, 50):
diff --git a/tests/unit/test_encryption.py b/tests/unit/test_encryption.py
index b424769..0a5c0a4 100644
--- a/tests/unit/test_encryption.py
+++ b/tests/unit/test_encryption.py
@@ -12,6 +12,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import fixtures
 import os
 import subprocess
 import tempfile
@@ -26,6 +27,10 @@
     def setUp(self):
         super(TestEncryption, self).setUp()
         self.private, self.public = encryption.generate_rsa_keypair()
+        # Because we set delete to False when using NamedTemporaryFile below
+        # we need to stick our usage of temporary files in the NestedTempfile
+        # fixture ensuring everything gets cleaned up when it is done.
+        self.useFixture(fixtures.NestedTempfile())
 
     def test_serialization(self):
         "Verify key serialization"
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9a2eb28..9b54084 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -37,6 +37,7 @@
     ZuulTestCase,
     repack_repo,
     simple_layout,
+    iterate_timeout,
 )
 
 
@@ -4397,6 +4398,54 @@
         self.assertEqual(A.data['status'], 'MERGED')
         self.assertEqual(A.reported, 2)
 
+    def test_zookeeper_disconnect2(self):
+        "Test that jobs are executed after a zookeeper disconnect"
+
+        # This tests receiving a ZK disconnect between the arrival of
+        # a fulfilled request and when we accept its nodes.
+        self.fake_nodepool.paused = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        # We're waiting on the nodepool request to complete.  Stop the
+        # scheduler from processing further events, then fulfill the
+        # nodepool request.
+        self.sched.run_handler_lock.acquire()
+
+        # Fulfill the nodepool request.
+        self.fake_nodepool.paused = False
+        requests = list(self.sched.nodepool.requests.values())
+        self.assertEqual(1, len(requests))
+        request = requests[0]
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        id1 = request.id
+
+        # The request is fulfilled, but the scheduler hasn't processed
+        # it yet.  Reconnect ZK.
+        self.zk.client.stop()
+        self.zk.client.start()
+
+        # Allow the scheduler to continue and process the (now
+        # out-of-date) notification that nodes are ready.
+        self.sched.run_handler_lock.release()
+
+        # It should resubmit the request, once it's fulfilled, we can
+        # wait for it to run jobs and settle.
+        for x in iterate_timeout(30, 'fulfill request'):
+            if request.fulfilled:
+                break
+        self.waitUntilSettled()
+
+        id2 = request.id
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        # Make sure it was resubmitted (the id's should be different).
+        self.assertNotEqual(id1, id2)
+
     def test_nodepool_failure(self):
         "Test that jobs are reported after a nodepool failure"
 
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b96d1ca..6e7064c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -165,6 +165,7 @@
         self.log.debug("Updating node request %s" % (request,))
 
         if request.uid not in self.requests:
+            self.log.debug("Request %s is unknown" % (request.uid,))
             return False
 
         if request.canceled:
@@ -193,14 +194,21 @@
 
     def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
-        # nodes for (potential) use.
+        # nodes for (potential) use.  Return False if there is a
+        # problem with the request (canceled or retrying), True if it
+        # is ready to be acted upon (success or failure).
 
         self.log.info("Accepting node request %s" % (request,))
 
         if request_id != request.id:
             self.log.info("Skipping node accept for %s (resubmitted as %s)",
                           request_id, request.id)
-            return
+            return False
+
+        if request.canceled:
+            self.log.info("Ignoring canceled node request %s" % (request,))
+            # The request was already deleted when it was canceled
+            return False
 
         # Make sure the request still exists. It's possible it could have
         # disappeared if we lost the ZK session between when the fulfillment
@@ -208,13 +216,13 @@
         # processing it. Nodepool will automatically reallocate the assigned
         # nodes in that situation.
         if not self.sched.zk.nodeRequestExists(request):
-            self.log.info("Request %s no longer exists", request.id)
-            return
-
-        if request.canceled:
-            self.log.info("Ignoring canceled node request %s" % (request,))
-            # The request was already deleted when it was canceled
-            return
+            self.log.info("Request %s no longer exists, resubmitting",
+                          request.id)
+            request.id = None
+            request.state = model.STATE_REQUESTED
+            self.requests[request.uid] = request
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+            return False
 
         locked = False
         if request.fulfilled:
@@ -239,3 +247,4 @@
             # them.
             if locked:
                 self.unlockNodeSet(request.nodeset)
+        return True
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2130ede..14ca029 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1035,8 +1035,8 @@
         request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request, request_id)
-        if request.canceled:
+        ready = self.nodepool.acceptNodes(request, request_id)
+        if not ready:
             return
 
         if build_set is not build_set.item.current_build_set: