Stop jobs when executor stops

If the executor stops while jobs are running, those jobs are not
explicitly aborted.  In production, the process exit would cause
all of the jobs to terminate and the gearman disconnection would
report a failure, however, in tests the python process may continue
and the ansible threads would essentially leak into a subsequent
test.  This is especially likely to happen if a test holds jobs in
build, and then fails while those jobs are still held.  Those threads
will continue to wait to be released while further tests continue
to run.  Because all tests assert that git.Repo objects are not
leaked, the outstanding reference that the leaked threads have
to a git.Repo object trips that assertion and all subsequent tests
in the same test runner fail.

This adds code to the executor shutdown to stop all jobs at the start
of the shutdown process.  It also adds a test which shuts down the
executor while jobs are held and asserts that after shutdown, those
threads are stopped, and no git repo objects are leaked.

Change-Id: I9d73775a13c289ef922c27b29162efcfca3950a9
diff --git a/tests/base.py b/tests/base.py
index 0a2c71a..3d2f4c1 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -769,6 +769,11 @@
                 build.release()
         super(RecordingExecutorServer, self).stopJob(job)
 
+    def stop(self):
+        for build in self.running_builds:
+            build.release()
+        super(RecordingExecutorServer, self).stop()
+
 
 class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
     def doMergeChanges(self, items):
@@ -1398,15 +1403,14 @@
         self.webapp.start()
         self.rpc.start()
         self.executor_client.gearman.waitForServer()
+        # Cleanups are run in reverse order
+        self.addCleanup(self.assertCleanShutdown)
         self.addCleanup(self.shutdown)
+        self.addCleanup(self.assertFinalState)
 
         self.sched.reconfigure(self.config)
         self.sched.resume()
 
-    def tearDown(self):
-        super(ZuulTestCase, self).tearDown()
-        self.assertFinalState()
-
     def configure_connections(self):
         # Set up gerrit related fakes
         # Set a changes database so multiple FakeGerrit's can report back to
@@ -1545,6 +1549,9 @@
                     self.assertEqual(test_key, f.read())
 
     def assertFinalState(self):
+        self.log.debug("Assert final state")
+        # Make sure no jobs are running
+        self.assertEqual({}, self.executor_server.job_workers)
         # Make sure that git.Repo objects have been garbage collected.
         repos = []
         gc.collect()
@@ -1585,6 +1592,9 @@
             self.log.error("More than one thread is running: %s" % threads)
         self.printHistory()
 
+    def assertCleanShutdown(self):
+        pass
+
     def init_repo(self, project):
         parts = project.split('/')
         path = os.path.join(self.upstream_root, *parts[:-1])
@@ -1675,7 +1685,9 @@
 
     def areAllBuildsWaiting(self):
         builds = self.executor_client.builds.values()
+        seen_builds = set()
         for build in builds:
+            seen_builds.add(build.uuid)
             client_job = None
             for conn in self.executor_client.gearman.active_connections:
                 for j in conn.related_jobs.values():
@@ -1713,6 +1725,11 @@
             else:
                 self.log.debug("%s is unassigned" % server_job)
                 return False
+        for (build_uuid, job_worker) in \
+            self.executor_server.job_workers.items():
+            if build_uuid not in seen_builds:
+                self.log.debug("%s is not finalized" % build_uuid)
+                return False
         return True
 
     def areAllNodeRequestsComplete(self):
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index d3e6094..4a295c1 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -14,6 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import gc
 import json
 import textwrap
 
@@ -4638,6 +4639,39 @@
         self.assertIn('project-test2 : SKIPPED', A.messages[1])
 
 
+class TestExecutor(ZuulTestCase):
+    tenant_config_file = 'config/single-tenant/main.yaml'
+
+    def assertFinalState(self):
+        # In this test, we expect to shut down in a non-final state,
+        # so skip these checks.
+        pass
+
+    def assertCleanShutdown(self):
+        self.log.debug("Assert clean shutdown")
+
+        # After shutdown, make sure no jobs are running
+        self.assertEqual({}, self.executor_server.job_workers)
+
+        # Make sure that git.Repo objects have been garbage collected.
+        repos = []
+        gc.collect()
+        for obj in gc.get_objects():
+            if isinstance(obj, git.Repo):
+                self.log.debug("Leaked git repo object: %s" % repr(obj))
+                repos.append(obj)
+        self.assertEqual(len(repos), 0)
+
+    def test_executor_shutdown(self):
+        "Test that the executor can shut down with jobs running"
+
+        self.executor_server.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('code-review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('approved', 1))
+        self.waitUntilSettled()
+
+
 class TestDependencyGraph(ZuulTestCase):
     tenant_config_file = 'config/dependency-graph/main.yaml'
 
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 0adb6de..2658556 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -344,10 +344,17 @@
     def stop(self):
         self.log.debug("Stopping")
         self._running = False
-        self.worker.shutdown()
         self._command_running = False
         self.command_socket.stop()
         self.update_queue.put(None)
+
+        for job_worker in self.job_workers.values():
+            try:
+                job_worker.stop()
+            except Exception:
+                self.log.exception("Exception sending stop command "
+                                   "to worker:")
+        self.worker.shutdown()
         self.log.debug("Stopped")
 
     def pause(self):