Merge "Add Executor Merger and Ansible execution statsd counters"
diff --git a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
index 95ab870..ba35eb0 100644
--- a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
+++ b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
@@ -1,6 +1,7 @@
- hosts: localhost
tasks:
- command: dd if=/dev/zero of=toobig bs=1M count=2
+ - command: sync
- wait_for:
delay: 10
path: /
diff --git a/tests/unit/test_disk_accountant.py b/tests/unit/test_disk_accountant.py
index 7081b53..e12846d 100644
--- a/tests/unit/test_disk_accountant.py
+++ b/tests/unit/test_disk_accountant.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import fixtures
import os
import tempfile
import time
@@ -32,6 +33,10 @@
class TestDiskAccountant(BaseTestCase):
+ def setUp(self):
+ super(TestDiskAccountant, self).setUp()
+ self.useFixture(fixtures.NestedTempfile())
+
def test_disk_accountant(self):
jobs_dir = tempfile.mkdtemp(
dir=os.environ.get("ZUUL_TEST_ROOT", None))
@@ -47,6 +52,8 @@
testfile = os.path.join(jobdir, 'tfile')
with open(testfile, 'w') as tf:
tf.write(2 * 1024 * 1024 * '.')
+ tf.flush()
+ os.fsync(tf.fileno())
# da should catch over-limit dir within 5 seconds
for i in range(0, 50):
diff --git a/tests/unit/test_encryption.py b/tests/unit/test_encryption.py
index b424769..0a5c0a4 100644
--- a/tests/unit/test_encryption.py
+++ b/tests/unit/test_encryption.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import fixtures
import os
import subprocess
import tempfile
@@ -26,6 +27,10 @@
def setUp(self):
super(TestEncryption, self).setUp()
self.private, self.public = encryption.generate_rsa_keypair()
+ # Because we set delete to False when using NamedTemporaryFile below
+ # we need to stick our usage of temporary files in the NestedTempfile
+ # fixture ensuring everything gets cleaned up when it is done.
+ self.useFixture(fixtures.NestedTempfile())
def test_serialization(self):
"Verify key serialization"
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9a2eb28..9b54084 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -37,6 +37,7 @@
ZuulTestCase,
repack_repo,
simple_layout,
+ iterate_timeout,
)
@@ -4397,6 +4398,54 @@
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
+ def test_zookeeper_disconnect2(self):
+ "Test that jobs are executed after a zookeeper disconnect"
+
+ # This tests receiving a ZK disconnect between the arrival of
+ # a fulfilled request and when we accept its nodes.
+ self.fake_nodepool.paused = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ # We're waiting on the nodepool request to complete. Stop the
+ # scheduler from processing further events, then fulfill the
+ # nodepool request.
+ self.sched.run_handler_lock.acquire()
+
+ # Fulfill the nodepool request.
+ self.fake_nodepool.paused = False
+ requests = list(self.sched.nodepool.requests.values())
+ self.assertEqual(1, len(requests))
+ request = requests[0]
+ for x in iterate_timeout(30, 'fulfill request'):
+ if request.fulfilled:
+ break
+ id1 = request.id
+
+ # The request is fulfilled, but the scheduler hasn't processed
+ # it yet. Reconnect ZK.
+ self.zk.client.stop()
+ self.zk.client.start()
+
+ # Allow the scheduler to continue and process the (now
+ # out-of-date) notification that nodes are ready.
+ self.sched.run_handler_lock.release()
+
+ # It should resubmit the request, once it's fulfilled, we can
+ # wait for it to run jobs and settle.
+ for x in iterate_timeout(30, 'fulfill request'):
+ if request.fulfilled:
+ break
+ self.waitUntilSettled()
+
+ id2 = request.id
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ # Make sure it was resubmitted (the id's should be different).
+ self.assertNotEqual(id1, id2)
+
def test_nodepool_failure(self):
"Test that jobs are reported after a nodepool failure"
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b96d1ca..6e7064c 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -165,6 +165,7 @@
self.log.debug("Updating node request %s" % (request,))
if request.uid not in self.requests:
+ self.log.debug("Request %s is unknown" % (request.uid,))
return False
if request.canceled:
@@ -193,14 +194,21 @@
def acceptNodes(self, request, request_id):
# Called by the scheduler when it wants to accept and lock
- # nodes for (potential) use.
+ # nodes for (potential) use. Return False if there is a
+ # problem with the request (canceled or retrying), True if it
+ # is ready to be acted upon (success or failure).
self.log.info("Accepting node request %s" % (request,))
if request_id != request.id:
self.log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
- return
+ return False
+
+ if request.canceled:
+ self.log.info("Ignoring canceled node request %s" % (request,))
+ # The request was already deleted when it was canceled
+ return False
# Make sure the request still exists. It's possible it could have
# disappeared if we lost the ZK session between when the fulfillment
@@ -208,13 +216,13 @@
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
if not self.sched.zk.nodeRequestExists(request):
- self.log.info("Request %s no longer exists", request.id)
- return
-
- if request.canceled:
- self.log.info("Ignoring canceled node request %s" % (request,))
- # The request was already deleted when it was canceled
- return
+ self.log.info("Request %s no longer exists, resubmitting",
+ request.id)
+ request.id = None
+ request.state = model.STATE_REQUESTED
+ self.requests[request.uid] = request
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ return False
locked = False
if request.fulfilled:
@@ -239,3 +247,4 @@
# them.
if locked:
self.unlockNodeSet(request.nodeset)
+ return True
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2130ede..14ca029 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1035,8 +1035,8 @@
request_id = event.request_id
build_set = request.build_set
- self.nodepool.acceptNodes(request, request_id)
- if request.canceled:
+ ready = self.nodepool.acceptNodes(request, request_id)
+ if not ready:
return
if build_set is not build_set.item.current_build_set: