Clear project config cache later
When a config change lands, we clear the cached configuration for
that project before reconfiguring. However, the reconfiguration
is not synchronous. On a busy system, it can be a long time between
the change landing and the reconfiguration. By clearing the cache
synchronously and performing the reconfiguration asynchronously,
any dynamic configurations created in the intervening time will
be missing data.
Instead, keep track of the project which needs to be cleared as
an attribute of the async event, and then clear the config only
immediately before reconfiguration. This happens within the
scheduler main loop, so no other configuration actions can happen
between these two steps now.
Note, there are several changes to tests included in this change.
I used them to create a test which illustrated the bug, however,
I was only able to do so by essentially re-creating the scheduler-
internal sequence in the test itself -- essentially it represented
only the prior erroneous behavior. So while it was useful to
confirm the source of the problem, it is not useful to confirm the
fix, and can not be included in the test suite since it always
fails. The amount of control needed to mimic this sequence in a
test is significantly beyond our facilities at the moment.
However, some of the additional facilities I created may be useful,
so I'm adding them along with this change.
Change-Id: Id7fdd21f1646ee53986be33bdb5f1437558833ba
diff --git a/tests/base.py b/tests/base.py
index 2e3d682..abec813 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1518,6 +1518,7 @@
def __init__(self, use_ssl=False):
self.hold_jobs_in_queue = False
+ self.hold_merge_jobs_in_queue = False
if use_ssl:
ssl_ca = os.path.join(FIXTURE_DIR, 'gearman/root-ca.pem')
ssl_cert = os.path.join(FIXTURE_DIR, 'gearman/server.pem')
@@ -1537,6 +1538,8 @@
if not hasattr(job, 'waiting'):
if job.name.startswith(b'executor:execute'):
job.waiting = self.hold_jobs_in_queue
+ elif job.name.startswith(b'merger:'):
+ job.waiting = self.hold_merge_jobs_in_queue
else:
job.waiting = False
if job.waiting:
@@ -1562,10 +1565,15 @@
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
- if job.name != b'executor:execute':
- continue
- parameters = json.loads(job.arguments.decode('utf8'))
- if not regex or re.match(regex, parameters.get('job')):
+ match = False
+ if job.name == b'executor:execute':
+ parameters = json.loads(job.arguments.decode('utf8'))
+ if not regex or re.match(regex, parameters.get('job')):
+ match = True
+ if job.name == b'merger:merge':
+ if not regex:
+ match = True
+ if match:
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
@@ -2556,6 +2564,26 @@
return False
return True
+ def areAllMergeJobsWaiting(self):
+ for client_job in list(self.merge_client.jobs):
+ if not client_job.handle:
+ self.log.debug("%s has no handle" % client_job)
+ return False
+ server_job = self.gearman_server.jobs.get(client_job.handle)
+ if not server_job:
+ self.log.debug("%s is not known to the gearman server" %
+ client_job)
+ return False
+ if not hasattr(server_job, 'waiting'):
+ self.log.debug("%s is being enqueued" % server_job)
+ return False
+ if server_job.waiting:
+ self.log.debug("%s is waiting" % server_job)
+ continue
+ self.log.debug("%s is not waiting" % server_job)
+ return False
+ return True
+
def eventQueuesEmpty(self):
for event_queue in self.event_queues:
yield event_queue.empty()
@@ -2592,7 +2620,7 @@
# processed
self.eventQueuesJoin()
self.sched.run_handler_lock.acquire()
- if (not self.merge_client.jobs and
+ if (self.areAllMergeJobsWaiting() and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting() and
self.areAllNodeRequestsComplete() and