Merge "Re-submit node requests on ZooKeeper disconnect" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index cbb06b7..9ee5838 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -883,6 +883,7 @@
             hosts='%s:%s%s' % (host, port, chroot))
         self.client.start()
         self._running = True
+        self.paused = False
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
@@ -899,6 +900,8 @@
             time.sleep(0.1)
 
     def _run(self):
+        if self.paused:
+            return
         for req in self.getNodeRequests():
             self.fulfillRequest(req)
 
@@ -1501,6 +1504,8 @@
         return True
 
     def areAllNodeRequestsComplete(self):
+        if self.fake_nodepool.paused:
+            return True
         if self.sched.nodepool.requests:
             return False
         return True
diff --git a/tests/test_nodepool.py b/tests/test_nodepool.py
index 6a14192..3fb0335 100644
--- a/tests/test_nodepool.py
+++ b/tests/test_nodepool.py
@@ -69,3 +69,20 @@
         self.waitForRequests()
         self.assertEqual(len(self.provisioned_requests), 1)
         self.assertEqual(request.state, 'fulfilled')
+
+    def test_node_request_disconnect(self):
+        # Test that node requests are re-submitted after disconnect
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        self.fake_nodepool.paused = True
+        request = self.nodepool.requestNodes(None, job)
+        self.zk.client.stop()
+        self.zk.client.start()
+        self.fake_nodepool.paused = False
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index d4fd1d8..bee703f 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -4529,6 +4529,23 @@
         self.assertEqual(A.reported, 1)
         self.assertIn('RETRY_LIMIT', A.messages[0])
 
+    def test_zookeeper_disconnect(self):
+        "Test that jobs are launched after a zookeeper disconnect"
+
+        self.fake_nodepool.paused = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('code-review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('approved', 1))
+        self.waitUntilSettled()
+
+        self.zk.client.stop()
+        self.zk.client.start()
+        self.fake_nodepool.paused = False
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+
 
 class TestDuplicatePipeline(ZuulTestCase):
     tenant_config_file = 'config/duplicate-pipeline/main.yaml'
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 2fa74c8..aa730b1 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -27,8 +27,7 @@
         self.requests[req.uid] = req
         self.log.debug("Submitting node request: %s" % (req,))
 
-        self.sched.zk.submitNodeRequest(req)
-        self._updateNodeRequest(req)
+        self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
 
         return req
 
@@ -39,16 +38,19 @@
     def returnNodes(self, nodes, used=True):
         pass
 
-    def _updateNodeRequest(self, request):
+    def _updateNodeRequest(self, request, deleted):
+        # Return False to indicate that we should stop watching the
+        # node.
         self.log.debug("Updating node request: %s" % (request,))
 
-        def callback(event):
-            self._updateNodeRequest(request)
-        self.sched.zk.getNodeRequest(request, callback)
-
         if request.uid not in self.requests:
-            return
+            return False
 
         if request.state == 'fulfilled':
             self.sched.onNodesProvisioned(request)
             del self.requests[request.uid]
+            return False
+        elif deleted:
+            self.log.debug("Resubmitting lost node request %s" % (request,))
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+        return True
diff --git a/zuul/zk.py b/zuul/zk.py
index 2e87205..190d7b4 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -17,7 +17,6 @@
 import six
 import time
 from kazoo.client import KazooClient, KazooState
-from kazoo import exceptions as kze
 
 # States:
 # We are building this node but it is not ready for use.
@@ -269,12 +268,21 @@
             hosts = buildZooKeeperHosts(host_list)
             self.client.set_hosts(hosts=hosts)
 
-    def submitNodeRequest(self, node_request):
+    def submitNodeRequest(self, node_request, watcher):
         '''
         Submit a request for nodes to Nodepool.
 
         :param NodeRequest node_request: A NodeRequest with the
             contents of the request.
+
+        :param callable watcher: A callable object that will be
+            invoked each time the request is updated.  It is called
+            with two arguments: (node_request, deleted) where
+            node_request is the same argument passed to this method,
+            and deleted is a boolean which is True if the node no
+            longer exists (notably, this will happen on disconnection
+            from ZooKeeper).  The watcher should return False when
+            further updates are no longer necessary.
         '''
         priority = 100  # TODO(jeblair): integrate into nodereq
 
@@ -282,28 +290,17 @@
         data['created_time'] = time.time()
 
         path = '%s/%s-' % (self.REQUEST_ROOT, priority)
-        self.log.debug(data)
         path = self.client.create(path, self._dictToStr(data),
                                   makepath=True,
                                   sequence=True, ephemeral=True)
         reqid = path.split("/")[-1]
         node_request.id = reqid
 
-    def getNodeRequest(self, node_request, watcher):
-        '''
-        Read the specified node request and update its values.
+        def callback(data, stat):
+            if data:
+                data = self._strToDict(data)
+                node_request.updateFromDict(data)
+            deleted = (data is None)  # data *are* none
+            return watcher(node_request, deleted)
 
-        :param NodeRequest node_request: A NodeRequest to be read.  It
-            will be updated with the results of the read.
-        :param callable watcher: A watch function to be called when the
-            node request is updated.
-        '''
-
-        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
-        try:
-            data, stat = self.client.get(path, watch=watcher)
-        except kze.NoNodeError:
-            return
-        data = self._strToDict(data)
-        node_request.updateFromDict(data)
-        # TODOv3(jeblair): re-register watches on disconnect
+        self.client.DataWatch(path, callback)