Merge "Re-submit node requests on ZooKeeper disconnect" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index cbb06b7..9ee5838 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -883,6 +883,7 @@
hosts='%s:%s%s' % (host, port, chroot))
self.client.start()
self._running = True
+ self.paused = False
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
@@ -899,6 +900,8 @@
time.sleep(0.1)
def _run(self):
+ if self.paused:
+ return
for req in self.getNodeRequests():
self.fulfillRequest(req)
@@ -1501,6 +1504,8 @@
return True
def areAllNodeRequestsComplete(self):
+ if self.fake_nodepool.paused:
+ return True
if self.sched.nodepool.requests:
return False
return True
diff --git a/tests/test_nodepool.py b/tests/test_nodepool.py
index 6a14192..3fb0335 100644
--- a/tests/test_nodepool.py
+++ b/tests/test_nodepool.py
@@ -69,3 +69,20 @@
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
+
+ def test_node_request_disconnect(self):
+ # Test that node requests are re-submitted after disconnect
+
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+ nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+ self.fake_nodepool.paused = True
+ request = self.nodepool.requestNodes(None, job)
+ self.zk.client.stop()
+ self.zk.client.start()
+ self.fake_nodepool.paused = False
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index d4fd1d8..bee703f 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -4529,6 +4529,23 @@
self.assertEqual(A.reported, 1)
self.assertIn('RETRY_LIMIT', A.messages[0])
+ def test_zookeeper_disconnect(self):
+ "Test that jobs are launched after a zookeeper disconnect"
+
+ self.fake_nodepool.paused = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('code-review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('approved', 1))
+ self.waitUntilSettled()
+
+ self.zk.client.stop()
+ self.zk.client.start()
+ self.fake_nodepool.paused = False
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+
class TestDuplicatePipeline(ZuulTestCase):
tenant_config_file = 'config/duplicate-pipeline/main.yaml'
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 2fa74c8..aa730b1 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -27,8 +27,7 @@
self.requests[req.uid] = req
self.log.debug("Submitting node request: %s" % (req,))
- self.sched.zk.submitNodeRequest(req)
- self._updateNodeRequest(req)
+ self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
return req
@@ -39,16 +38,19 @@
def returnNodes(self, nodes, used=True):
pass
- def _updateNodeRequest(self, request):
+ def _updateNodeRequest(self, request, deleted):
+ # Return False to indicate that we should stop watching the
+ # node.
self.log.debug("Updating node request: %s" % (request,))
- def callback(event):
- self._updateNodeRequest(request)
- self.sched.zk.getNodeRequest(request, callback)
-
if request.uid not in self.requests:
- return
+ return False
if request.state == 'fulfilled':
self.sched.onNodesProvisioned(request)
del self.requests[request.uid]
+ return False
+ elif deleted:
+ self.log.debug("Resubmitting lost node request %s" % (request,))
+ self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+ return True
diff --git a/zuul/zk.py b/zuul/zk.py
index 2e87205..190d7b4 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -17,7 +17,6 @@
import six
import time
from kazoo.client import KazooClient, KazooState
-from kazoo import exceptions as kze
# States:
# We are building this node but it is not ready for use.
@@ -269,12 +268,21 @@
hosts = buildZooKeeperHosts(host_list)
self.client.set_hosts(hosts=hosts)
- def submitNodeRequest(self, node_request):
+ def submitNodeRequest(self, node_request, watcher):
'''
Submit a request for nodes to Nodepool.
:param NodeRequest node_request: A NodeRequest with the
contents of the request.
+
+ :param callable watcher: A callable object that will be
+ invoked each time the request is updated. It is called
+ with two arguments: (node_request, deleted) where
+ node_request is the same argument passed to this method,
+ and deleted is a boolean which is True if the node no
+ longer exists (notably, this will happen on disconnection
+ from ZooKeeper). The watcher should return False when
+ further updates are no longer necessary.
'''
priority = 100 # TODO(jeblair): integrate into nodereq
@@ -282,28 +290,17 @@
data['created_time'] = time.time()
path = '%s/%s-' % (self.REQUEST_ROOT, priority)
- self.log.debug(data)
path = self.client.create(path, self._dictToStr(data),
makepath=True,
sequence=True, ephemeral=True)
reqid = path.split("/")[-1]
node_request.id = reqid
- def getNodeRequest(self, node_request, watcher):
- '''
- Read the specified node request and update its values.
+ def callback(data, stat):
+ if data:
+ data = self._strToDict(data)
+ node_request.updateFromDict(data)
+ deleted = (data is None) # data *are* none
+ return watcher(node_request, deleted)
- :param NodeRequest node_request: A NodeRequest to be read. It
- will be updated with the results of the read.
- :param callable watcher: A watch function to be called when the
- node request is updated.
- '''
-
- path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
- try:
- data, stat = self.client.get(path, watch=watcher)
- except kze.NoNodeError:
- return
- data = self._strToDict(data)
- node_request.updateFromDict(data)
- # TODOv3(jeblair): re-register watches on disconnect
+ self.client.DataWatch(path, callback)