Merge "Verify nodes and requests are not leaked" into feature/zuulv3
diff --git a/tests/base.py b/tests/base.py
index f10157d..56c83f2 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -874,6 +874,7 @@
 
 class FakeNodepool(object):
     REQUEST_ROOT = '/nodepool/requests'
+    NODE_ROOT = '/nodepool/nodes'
 
     log = logging.getLogger("zuul.test.FakeNodepool")
 
@@ -918,6 +919,28 @@
             reqs.append(data)
         return reqs
 
+    def getNodes(self):
+        try:
+            nodeids = self.client.get_children(self.NODE_ROOT)
+        except kazoo.exceptions.NoNodeError:
+            return []
+        nodes = []
+        for oid in sorted(nodeids):
+            path = self.NODE_ROOT + '/' + oid
+            data, stat = self.client.get(path)
+            data = json.loads(data)
+            data['_oid'] = oid
+            try:
+                lockfiles = self.client.get_children(path + '/lock')
+            except kazoo.exceptions.NoNodeError:
+                lockfiles = []
+            if lockfiles:
+                data['_lock'] = True
+            else:
+                data['_lock'] = False
+            nodes.append(data)
+        return nodes
+
     def makeNode(self, request_id, node_type):
         now = time.time()
         path = '/nodepool/nodes/'
@@ -1257,9 +1280,12 @@
         self.rpc.start()
         self.launch_client.gearman.waitForServer()
 
-        self.addCleanup(self.assertFinalState)
         self.addCleanup(self.shutdown)
 
+    def tearDown(self):
+        super(ZuulTestCase, self).tearDown()
+        self.assertFinalState()
+
     def configure_connections(self):
         # Register connections from the config
         self.smtp_messages = []
@@ -1366,6 +1392,17 @@
         self.addCommitToRepo(project, 'add content from fixture',
                              files, branch='master', tag='init')
 
+    def assertNodepoolState(self):
+        # Make sure that there are no pending requests
+
+        requests = self.fake_nodepool.getNodeRequests()
+        self.assertEqual(len(requests), 0)
+
+        nodes = self.fake_nodepool.getNodes()
+        for node in nodes:
+            self.assertFalse(node['_lock'], "Node %s is locked" %
+                             (node['_oid'],))
+
     def assertFinalState(self):
         # Make sure that git.Repo objects have been garbage collected.
         repos = []
@@ -1375,6 +1412,7 @@
                 repos.append(obj)
         self.assertEqual(len(repos), 0)
         self.assertEmptyQueues()
+        self.assertNodepoolState()
         ipm = zuul.manager.independent.IndependentPipelineManager
         for tenant in self.sched.abide.tenants.values():
             for pipeline in tenant.layout.pipelines.values():
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index 41f09de..2211d1b 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -25,14 +25,8 @@
 class TestWebapp(ZuulTestCase):
     tenant_config_file = 'config/single-tenant/main.yaml'
 
-    def _cleanup(self):
-        self.launch_server.hold_jobs_in_build = False
-        self.launch_server.release()
-        self.waitUntilSettled()
-
     def setUp(self):
         super(TestWebapp, self).setUp()
-        self.addCleanup(self._cleanup)
         self.launch_server.hold_jobs_in_build = True
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('code-review', 2)
@@ -43,6 +37,12 @@
         self.waitUntilSettled()
         self.port = self.webapp.server.socket.getsockname()[1]
 
+    def tearDown(self):
+        self.launch_server.hold_jobs_in_build = False
+        self.launch_server.release()
+        self.waitUntilSettled()
+        super(TestWebapp, self).tearDown()
+
     def test_webapp_status(self):
         "Test that we can filter to only certain changes in the webapp."
 
diff --git a/zuul/launcher/client.py b/zuul/launcher/client.py
index e17c83c..9e895ef 100644
--- a/zuul/launcher/client.py
+++ b/zuul/launcher/client.py
@@ -471,14 +471,13 @@
             data = getJobData(job)
             build.node_labels = data.get('node_labels', [])
             build.node_name = data.get('node_name')
-            if not build.canceled:
-                if result is None:
-                    result = data.get('result')
-                if result is None:
-                    build.retry = True
-                self.log.info("Build %s complete, result %s" %
-                              (job, result))
-                self.sched.onBuildCompleted(build, result)
+            if result is None:
+                result = data.get('result')
+            if result is None:
+                build.retry = True
+            self.log.info("Build %s complete, result %s" %
+                          (job, result))
+            self.sched.onBuildCompleted(build, result)
             # The test suite expects the build to be removed from the
             # internal dict after it's added to the report queue.
             del self.builds[job.unique]
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 7f64986..0d11316 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -395,6 +395,7 @@
         for req in old_build_set.node_requests.values():
             self.sched.nodepool.cancelRequest(req)
         old_build_set.node_requests = {}
+        canceled_jobs = set()
         for build in old_build_set.getBuilds():
             was_running = False
             try:
@@ -405,13 +406,18 @@
             if not was_running:
                 try:
                     nodeset = build.build_set.getJobNodeSet(build.job.name)
-                    self.nodepool.returnNodeset(nodeset)
+                    self.sched.nodepool.returnNodeset(nodeset)
                 except Exception:
                     self.log.exception("Unable to return nodeset %s for "
                                        "canceled build request %s" %
                                        (nodeset, build))
             build.result = 'CANCELED'
             canceled = True
+            canceled_jobs.add(build.job.name)
+        for jobname, nodeset in old_build_set.nodesets.items()[:]:
+            if jobname in canceled_jobs:
+                continue
+            self.sched.nodepool.returnNodeset(nodeset)
         for item_behind in item.items_behind:
             self.log.debug("Canceling jobs for change %s, behind change %s" %
                            (item_behind.change, item.change))
@@ -609,11 +615,15 @@
         self.log.debug("Item %s status is now:\n %s" %
                        (item, item.formatStatus()))
 
-        try:
-            nodeset = build.build_set.getJobNodeSet(build.job.name)
-            self.nodepool.returnNodeset(nodeset)
-        except Exception:
-            self.log.exception("Unable to return nodeset %s" % (nodeset,))
+        if build.retry:
+            build.build_set.removeJobNodeSet(build.job.name)
+
+        # If any jobs were skipped as a result of this build, return
+        # their nodes.
+        for build in build.build_set.getBuilds():
+            if build.result == 'SKIPPED':
+                nodeset = build.build_set.getJobNodeSet(build.job.name)
+                self.sched.nodepool.returnNodeset(nodeset)
 
         return True
 
diff --git a/zuul/model.py b/zuul/model.py
index 1500661..af726dd 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -762,6 +762,11 @@
         # required
         return self.nodesets.get(job_name)
 
+    def removeJobNodeSet(self, job_name):
+        if job_name not in self.nodesets:
+            raise Exception("No job set for %s" % (job_name))
+        del self.nodesets[job_name]
+
     def setJobNodeRequest(self, job_name, req):
         if job_name in self.node_requests:
             raise Exception("Prior node request for %s" % (job_name))
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 4d0442f..dead411 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -28,14 +28,15 @@
         nodeset = job.nodeset.copy()
         req = NodeRequest(build_set, job, nodeset)
         self.requests[req.uid] = req
-        self.log.debug("Submitting node request: %s" % (req,))
 
         self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
+        # Logged after submission so that we have the request id
+        self.log.info("Submited node request %s" % (req,))
 
         return req
 
     def cancelRequest(self, request):
-        self.log.debug("Canceling node request: %s" % (request,))
+        self.log.info("Canceling node request %s" % (request,))
         if request.uid in self.requests:
             try:
                 self.sched.zk.deleteNodeRequest(request)
@@ -44,6 +45,7 @@
             del self.requests[request.uid]
 
     def useNodeset(self, nodeset):
+        self.log.info("Setting nodeset %s in use" % (nodeset,))
         for node in nodeset.getNodes():
             if node.lock is None:
                 raise Exception("Node %s is not locked" % (node,))
@@ -51,6 +53,7 @@
             self.sched.zk.storeNode(node)
 
     def returnNodeset(self, nodeset):
+        self.log.info("Returning nodeset %s" % (nodeset,))
         for node in nodeset.getNodes():
             if node.lock is None:
                 raise Exception("Node %s is not locked" % (node,))
@@ -79,7 +82,7 @@
         locked_nodes = []
         try:
             for node in nodes:
-                self.log.debug("Locking node: %s" % (node,))
+                self.log.debug("Locking node %s" % (node,))
                 self.sched.zk.lockNode(node)
                 locked_nodes.append(node)
         except Exception:
@@ -90,7 +93,7 @@
     def _updateNodeRequest(self, request, deleted):
         # Return False to indicate that we should stop watching the
         # node.
-        self.log.debug("Updating node request: %s" % (request,))
+        self.log.debug("Updating node request %s" % (request,))
 
         if request.uid not in self.requests:
             return False
@@ -114,7 +117,7 @@
         # Called by the scheduler when it wants to accept and lock
         # nodes for (potential) use.
 
-        self.log.debug("Accepting node request: %s" % (request,))
+        self.log.info("Accepting node request %s" % (request,))
 
         # First, try to lock the nodes.
         locked = False
@@ -127,7 +130,7 @@
 
         # Regardless of whether locking succeeded, delete the
         # request.
-        self.log.debug("Deleting node request: %s" % (request,))
+        self.log.debug("Deleting node request %s" % (request,))
         try:
             self.sched.zk.deleteNodeRequest(request)
         except Exception:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 4a0a9eb..5f51cbf 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -767,9 +767,19 @@
 
     def _doBuildCompletedEvent(self, event):
         build = event.build
+
+        # Regardless of any other conditions which might cause us not
+        # to pass this on to the pipeline manager, make sure we return
+        # the nodes to nodepool.
+        try:
+            nodeset = build.build_set.getJobNodeSet(build.job.name)
+            self.nodepool.returnNodeset(nodeset)
+        except Exception:
+            self.log.exception("Unable to return nodeset %s" % (nodeset,))
+
         if build.build_set is not build.build_set.item.current_build_set:
-            self.log.warning("Build %s is not in the current build set" %
-                             (build,))
+            self.log.debug("Build %s is not in the current build set" %
+                           (build,))
             return
         pipeline = build.build_set.item.pipeline
         if not pipeline: