Remove the scheduler queue lock.

Instead, use queue task_done calls to indicate that the scheduler
has finished processing events.  This lets the tests know when
the queues are both empty and all requests have been handled.

Add a lock around reporting complete events in fake jenkins jobs
so that waitUntilSettled can be assured that no new events will
arrive.

Directly report LOST builds when a job doesn't exist, rather
than spawning a new thread (which was only done to work around
the lock).

Change-Id: I32ad46648c82d7458fb5be779c62ac5b57857674
Reviewed-on: https://review.openstack.org/19330
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index c199345..20ad4ec 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -459,6 +459,7 @@
 
         self.jenkins.fakeAddHistory(name=self.name, number=self.number,
                                     result=result)
+        self.jenkins.lock.acquire()
         self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name,
                                                         self.number,
                                                         self.parameters,
@@ -470,6 +471,7 @@
                                                         'FINISHED',
                                                         result))
         self.jenkins.all_jobs.remove(self)
+        self.jenkins.lock.release()
 
 
 class FakeJenkins(object):
@@ -485,6 +487,7 @@
         self.hold_jobs_in_build = False
         self.fail_tests = {}
         self.nonexistent_jobs = []
+        self.lock = threading.Lock()
 
     def fakeEnqueue(self, job):
         self.queue.append(job)
@@ -691,17 +694,22 @@
                 print self.sched.result_event_queue.empty(),
                 print self.fake_gerrit.event_queue.empty(),
                 raise Exception("Timeout waiting for Zuul to settle")
+            # Make sure our fake jenkins doesn't end any jobs
+            # (and therefore, emit events) while we're checking
+            self.fake_jenkins.lock.acquire()
+            # Join ensures that the queue is empty _and_ events have been
+            # processed
             self.fake_gerrit.event_queue.join()
-            self.sched.queue_lock.acquire()
+            self.sched.trigger_event_queue.join()
+            self.sched.result_event_queue.join()
             if (self.sched.trigger_event_queue.empty() and
                 self.sched.result_event_queue.empty() and
                 self.fake_gerrit.event_queue.empty() and
-                len(self.jenkins.lost_threads) == 0 and
                 self.fake_jenkins.fakeAllWaiting()):
-                self.sched.queue_lock.release()
+                self.fake_jenkins.lock.release()
                 self.log.debug("...settled.")
                 return
-            self.sched.queue_lock.release()
+            self.fake_jenkins.lock.release()
             self.sched.wake_event.wait(0.1)
 
     def countJobResults(self, jobs, result):
diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py
index ee73f8a..0a79e82 100644
--- a/zuul/launcher/jenkins.py
+++ b/zuul/launcher/jenkins.py
@@ -219,9 +219,6 @@
         self.callback_thread.start()
         self.cleanup_thread = JenkinsCleanup(self)
         self.cleanup_thread.start()
-        # Keep track of threads that will report a lost build in the future,
-        # in aid of testing
-        self.lost_threads = []
 
     def stop(self):
         self.cleanup_thread.stop()
@@ -327,18 +324,9 @@
                                "declaring lost" % build)
                 # To keep the queue moving, declare this as a lost build
                 # so that the change will get dropped.
-                t = threading.Thread(target=self.declareBuildLost,
-                                     args=(build,))
-                self.lost_threads.append(t)
-                t.start()
+                self.onBuildCompleted(build.uuid, 'LOST', None, None)
         return build
 
-    def declareBuildLost(self, build):
-        # Call this from a new thread to invoke onBuildCompleted from
-        # a thread that has the queue lock.
-        self.onBuildCompleted(build.uuid, 'LOST', None, None)
-        self.lost_threads.remove(threading.currentThread())
-
     def findBuildInQueue(self, build):
         for item in self.jenkins.get_queue_info():
             if 'actions' not in item:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8ed369d..e180355 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -37,7 +37,6 @@
         threading.Thread.__init__(self)
         self.wake_event = threading.Event()
         self.reconfigure_complete_event = threading.Event()
-        self.queue_lock = threading.Lock()
         self._pause = False
         self._reconfigure = False
         self._exit = False
@@ -223,18 +222,14 @@
                 statsd.incr('gerrit.event.%s' % event.type)
         except:
             self.log.exception("Exception reporting event stats")
-        self.queue_lock.acquire()
         self.trigger_event_queue.put(event)
-        self.queue_lock.release()
         self.wake_event.set()
         self.log.debug("Done adding trigger event: %s" % event)
 
     def onBuildStarted(self, build):
         self.log.debug("Adding start event for build: %s" % build)
         build.start_time = time.time()
-        self.queue_lock.acquire()
         self.result_event_queue.put(('started', build))
-        self.queue_lock.release()
         self.wake_event.set()
         self.log.debug("Done adding start event for build: %s" % build)
 
@@ -250,9 +245,7 @@
                 statsd.incr(key)
         except:
             self.log.exception("Exception reporting runtime stats")
-        self.queue_lock.acquire()
         self.result_event_queue.put(('completed', build))
-        self.queue_lock.release()
         self.wake_event.set()
         self.log.debug("Done adding complete event for build: %s" % build)
 
@@ -360,7 +353,6 @@
             if self._stopped:
                 return
             self.log.debug("Run handler awake")
-            self.queue_lock.acquire()
             try:
                 if not self._pause:
                     if not self.trigger_event_queue.empty():
@@ -381,7 +373,6 @@
                         self.wake_event.set()
             except:
                 self.log.exception("Exception in run handler:")
-            self.queue_lock.release()
 
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
@@ -390,6 +381,7 @@
         project = self.projects.get(event.project_name)
         if not project:
             self.log.warning("Project %s not found" % event.project_name)
+            self.trigger_event_queue.task_done()
             return
 
         for pipeline in self.pipelines.values():
@@ -400,6 +392,7 @@
             self.log.info("Adding %s, %s to %s" %
                           (project, change, pipeline))
             pipeline.manager.addChange(change)
+        self.trigger_event_queue.task_done()
 
     def process_result_queue(self):
         self.log.debug("Fetching result event")
@@ -408,11 +401,14 @@
         for pipeline in self.pipelines.values():
             if event_type == 'started':
                 if pipeline.manager.onBuildStarted(build):
+                    self.result_event_queue.task_done()
                     return
             elif event_type == 'completed':
                 if pipeline.manager.onBuildCompleted(build):
+                    self.result_event_queue.task_done()
                     return
         self.log.warning("Build %s not found by any queue manager" % (build))
+        self.result_event_queue.task_done()
 
     def formatStatusHTML(self):
         ret = '<html><pre>'