Remove the scheduler queue lock.
Instead, use queue task_done calls to indicate that the scheduler
has finished processing events. This lets the tests know when
the queues are both empty and all requests have been handled.
Add a lock around reporting complete events in fake jenkins jobs
so that waitUntilSettled can be assured that no new events will
arrive.
Directly report LOST builds when a job doesn't exist, rather
than spawning a new thread (which was only done to work around
the lock).
Change-Id: I32ad46648c82d7458fb5be779c62ac5b57857674
Reviewed-on: https://review.openstack.org/19330
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index c199345..20ad4ec 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -459,6 +459,7 @@
self.jenkins.fakeAddHistory(name=self.name, number=self.number,
result=result)
+ self.jenkins.lock.acquire()
self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name,
self.number,
self.parameters,
@@ -470,6 +471,7 @@
'FINISHED',
result))
self.jenkins.all_jobs.remove(self)
+ self.jenkins.lock.release()
class FakeJenkins(object):
@@ -485,6 +487,7 @@
self.hold_jobs_in_build = False
self.fail_tests = {}
self.nonexistent_jobs = []
+ self.lock = threading.Lock()
def fakeEnqueue(self, job):
self.queue.append(job)
@@ -691,17 +694,22 @@
print self.sched.result_event_queue.empty(),
print self.fake_gerrit.event_queue.empty(),
raise Exception("Timeout waiting for Zuul to settle")
+ # Make sure our fake jenkins doesn't end any jobs
+ # (and therefore, emit events) while we're checking
+ self.fake_jenkins.lock.acquire()
+ # Join ensures that the queue is empty _and_ events have been
+ # processed
self.fake_gerrit.event_queue.join()
- self.sched.queue_lock.acquire()
+ self.sched.trigger_event_queue.join()
+ self.sched.result_event_queue.join()
if (self.sched.trigger_event_queue.empty() and
self.sched.result_event_queue.empty() and
self.fake_gerrit.event_queue.empty() and
- len(self.jenkins.lost_threads) == 0 and
self.fake_jenkins.fakeAllWaiting()):
- self.sched.queue_lock.release()
+ self.fake_jenkins.lock.release()
self.log.debug("...settled.")
return
- self.sched.queue_lock.release()
+ self.fake_jenkins.lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py
index ee73f8a..0a79e82 100644
--- a/zuul/launcher/jenkins.py
+++ b/zuul/launcher/jenkins.py
@@ -219,9 +219,6 @@
self.callback_thread.start()
self.cleanup_thread = JenkinsCleanup(self)
self.cleanup_thread.start()
- # Keep track of threads that will report a lost build in the future,
- # in aid of testing
- self.lost_threads = []
def stop(self):
self.cleanup_thread.stop()
@@ -327,18 +324,9 @@
"declaring lost" % build)
# To keep the queue moving, declare this as a lost build
# so that the change will get dropped.
- t = threading.Thread(target=self.declareBuildLost,
- args=(build,))
- self.lost_threads.append(t)
- t.start()
+ self.onBuildCompleted(build.uuid, 'LOST', None, None)
return build
- def declareBuildLost(self, build):
- # Call this from a new thread to invoke onBuildCompleted from
- # a thread that has the queue lock.
- self.onBuildCompleted(build.uuid, 'LOST', None, None)
- self.lost_threads.remove(threading.currentThread())
-
def findBuildInQueue(self, build):
for item in self.jenkins.get_queue_info():
if 'actions' not in item:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8ed369d..e180355 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -37,7 +37,6 @@
threading.Thread.__init__(self)
self.wake_event = threading.Event()
self.reconfigure_complete_event = threading.Event()
- self.queue_lock = threading.Lock()
self._pause = False
self._reconfigure = False
self._exit = False
@@ -223,18 +222,14 @@
statsd.incr('gerrit.event.%s' % event.type)
except:
self.log.exception("Exception reporting event stats")
- self.queue_lock.acquire()
self.trigger_event_queue.put(event)
- self.queue_lock.release()
self.wake_event.set()
self.log.debug("Done adding trigger event: %s" % event)
def onBuildStarted(self, build):
self.log.debug("Adding start event for build: %s" % build)
build.start_time = time.time()
- self.queue_lock.acquire()
self.result_event_queue.put(('started', build))
- self.queue_lock.release()
self.wake_event.set()
self.log.debug("Done adding start event for build: %s" % build)
@@ -250,9 +245,7 @@
statsd.incr(key)
except:
self.log.exception("Exception reporting runtime stats")
- self.queue_lock.acquire()
self.result_event_queue.put(('completed', build))
- self.queue_lock.release()
self.wake_event.set()
self.log.debug("Done adding complete event for build: %s" % build)
@@ -360,7 +353,6 @@
if self._stopped:
return
self.log.debug("Run handler awake")
- self.queue_lock.acquire()
try:
if not self._pause:
if not self.trigger_event_queue.empty():
@@ -381,7 +373,6 @@
self.wake_event.set()
except:
self.log.exception("Exception in run handler:")
- self.queue_lock.release()
def process_event_queue(self):
self.log.debug("Fetching trigger event")
@@ -390,6 +381,7 @@
project = self.projects.get(event.project_name)
if not project:
self.log.warning("Project %s not found" % event.project_name)
+ self.trigger_event_queue.task_done()
return
for pipeline in self.pipelines.values():
@@ -400,6 +392,7 @@
self.log.info("Adding %s, %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change)
+ self.trigger_event_queue.task_done()
def process_result_queue(self):
self.log.debug("Fetching result event")
@@ -408,11 +401,14 @@
for pipeline in self.pipelines.values():
if event_type == 'started':
if pipeline.manager.onBuildStarted(build):
+ self.result_event_queue.task_done()
return
elif event_type == 'completed':
if pipeline.manager.onBuildCompleted(build):
+ self.result_event_queue.task_done()
return
self.log.warning("Build %s not found by any queue manager" % (build))
+ self.result_event_queue.task_done()
def formatStatusHTML(self):
ret = '<html><pre>'