Stop jobs when executor stops

If the executor stops while jobs are running, those jobs are not
explicitly aborted.  In production, the process exit would cause
all of the jobs to terminate and the gearman disconnection would
report a failure, however, in tests the python process may continue
and the ansible threads would essentially leak into a subsequent
test.  This is especially likely to happen if a test holds jobs in
build, and then fails while those jobs are still held.  Those threads
will continue to wait to be released while further tests continue
to run.  Because all tests assert that git.Repo objects are not
leaked, the outstanding reference that the leaked threads have
to a git.Repo object trips that assertion and all subsequent tests
in the same test runner fail.

This adds code to the executor shutdown to stop all jobs at the start
of the shutdown process.  It also adds a test which shuts down the
executor while jobs are held and asserts that after shutdown, those
threads are stopped, and no git repo objects are leaked.

Change-Id: I9d73775a13c289ef922c27b29162efcfca3950a9
diff --git a/tests/base.py b/tests/base.py
index 0a2c71a..3d2f4c1 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -769,6 +769,11 @@
                 build.release()
         super(RecordingExecutorServer, self).stopJob(job)
 
+    def stop(self):
+        for build in self.running_builds:
+            build.release()
+        super(RecordingExecutorServer, self).stop()
+
 
 class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
     def doMergeChanges(self, items):
@@ -1398,15 +1403,14 @@
         self.webapp.start()
         self.rpc.start()
         self.executor_client.gearman.waitForServer()
+        # Cleanups are run in reverse order
+        self.addCleanup(self.assertCleanShutdown)
         self.addCleanup(self.shutdown)
+        self.addCleanup(self.assertFinalState)
 
         self.sched.reconfigure(self.config)
         self.sched.resume()
 
-    def tearDown(self):
-        super(ZuulTestCase, self).tearDown()
-        self.assertFinalState()
-
     def configure_connections(self):
         # Set up gerrit related fakes
         # Set a changes database so multiple FakeGerrit's can report back to
@@ -1545,6 +1549,9 @@
                     self.assertEqual(test_key, f.read())
 
     def assertFinalState(self):
+        self.log.debug("Assert final state")
+        # Make sure no jobs are running
+        self.assertEqual({}, self.executor_server.job_workers)
         # Make sure that git.Repo objects have been garbage collected.
         repos = []
         gc.collect()
@@ -1585,6 +1592,9 @@
             self.log.error("More than one thread is running: %s" % threads)
         self.printHistory()
 
+    def assertCleanShutdown(self):
+        pass
+
     def init_repo(self, project):
         parts = project.split('/')
         path = os.path.join(self.upstream_root, *parts[:-1])
@@ -1675,7 +1685,9 @@
 
     def areAllBuildsWaiting(self):
         builds = self.executor_client.builds.values()
+        seen_builds = set()
         for build in builds:
+            seen_builds.add(build.uuid)
             client_job = None
             for conn in self.executor_client.gearman.active_connections:
                 for j in conn.related_jobs.values():
@@ -1713,6 +1725,11 @@
             else:
                 self.log.debug("%s is unassigned" % server_job)
                 return False
+        for (build_uuid, job_worker) in \
+            self.executor_server.job_workers.items():
+            if build_uuid not in seen_builds:
+                self.log.debug("%s is not finalized" % build_uuid)
+                return False
         return True
 
     def areAllNodeRequestsComplete(self):