Avoid leaking dynamic change queues
On an independent pipeline manager, getChangeQueue dynamically
creates change queues (they are static on dependent pipelines).
It is possible that one might call getChangeQueue() and then
decide not to actually use the change queue which was created.
Rather than asking the programmer to remember to clean up in
that case, make the interface to getChangeQueue() a context
manager so that it is cleaned up automatically.
No existing code-path was found that needed this, but this seems
like a sensible precaution for the future.
Change-Id: Ia2805cdb4d882fafa2decaf00573f657f412d983
diff --git a/tests/base.py b/tests/base.py
index 773f926..afc676b 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -980,6 +980,10 @@
repos.append(obj)
self.assertEqual(len(repos), 0)
self.assertEmptyQueues()
+ for pipeline in self.sched.layout.pipelines.values():
+ if isinstance(pipeline.manager,
+ zuul.scheduler.IndependentPipelineManager):
+ self.assertEqual(len(pipeline.queues), 0)
def shutdown(self):
self.log.debug("Shutting down after tests")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 4f3efde..1bf74f7 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1134,20 +1134,17 @@
self.removeItem(item)
def reEnqueueItem(self, item, last_head):
- if last_head.queue:
- change_queue = last_head.queue
- else:
- change_queue = self.getChangeQueue(item.change)
- if change_queue:
- self.log.debug("Re-enqueing change %s in queue %s" %
- (item.change, change_queue))
- change_queue.enqueueItem(item)
- self.reportStats(item)
- return True
- else:
- self.log.error("Unable to find change queue for project %s" %
- item.change.project)
- return False
+ with self.getChangeQueue(item.change, last_head.queue) as change_queue:
+ if change_queue:
+ self.log.debug("Re-enqueing change %s in queue %s" %
+ (item.change, change_queue))
+ change_queue.enqueueItem(item)
+ self.reportStats(item)
+ return True
+ else:
+ self.log.error("Unable to find change queue for project %s" %
+ item.change.project)
+ return False
def addChange(self, change, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
@@ -1174,39 +1171,40 @@
"requirement %s" % (change, f))
return False
- if not change_queue:
- change_queue = self.getChangeQueue(change)
+ with self.getChangeQueue(change, change_queue) as change_queue:
if not change_queue:
self.log.debug("Unable to find change queue for "
"change %s in project %s" %
(change, change.project))
return False
- if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
- change_queue):
- self.log.debug("Failed to enqueue changes ahead of %s" % change)
- return False
+ if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
+ change_queue):
+ self.log.debug("Failed to enqueue changes "
+ "ahead of %s" % change)
+ return False
- if self.isChangeAlreadyInQueue(change, change_queue):
- self.log.debug("Change %s is already in queue, ignoring" % change)
+ if self.isChangeAlreadyInQueue(change, change_queue):
+ self.log.debug("Change %s is already in queue, "
+ "ignoring" % change)
+ return True
+
+ self.log.debug("Adding change %s to queue %s" %
+ (change, change_queue))
+ if not quiet:
+ if len(self.pipeline.start_actions) > 0:
+ self.reportStart(change)
+ item = change_queue.enqueueChange(change)
+ if enqueue_time:
+ item.enqueue_time = enqueue_time
+ item.live = live
+ self.reportStats(item)
+ self.enqueueChangesBehind(change, quiet, ignore_requirements,
+ change_queue)
+ self.sched.triggers['zuul'].onChangeEnqueued(item.change,
+ self.pipeline)
return True
- self.log.debug("Adding change %s to queue %s" %
- (change, change_queue))
- if not quiet:
- if len(self.pipeline.start_actions) > 0:
- self.reportStart(change)
- item = change_queue.enqueueChange(change)
- if enqueue_time:
- item.enqueue_time = enqueue_time
- item.live = live
- self.reportStats(item)
- self.enqueueChangesBehind(change, quiet, ignore_requirements,
- change_queue)
- self.sched.triggers['zuul'].onChangeEnqueued(item.change,
- self.pipeline)
- return True
-
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
item.queue.dequeueItem(item)
@@ -1718,6 +1716,18 @@
self.log.exception("Exception reporting pipeline stats")
+class DynamicChangeQueueContextManager(object):
+ def __init__(self, change_queue):
+ self.change_queue = change_queue
+
+ def __enter__(self):
+ return self.change_queue
+
+ def __exit__(self, etype, value, tb):
+ if self.change_queue and not self.change_queue.queue:
+ self.change_queue.pipeline.removeQueue(self.change_queue.queue)
+
+
class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
changes_merge = False
@@ -1725,14 +1735,16 @@
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
- def getChangeQueue(self, change):
+ def getChangeQueue(self, change, existing=None):
# creates a new change queue for every change
+ if existing:
+ return DynamicChangeQueueContextManager(existing)
if change.project not in self.pipeline.getProjects():
- return None
+ return DynamicChangeQueueContextManager(None)
change_queue = ChangeQueue(self.pipeline)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
- return change_queue
+ return DynamicChangeQueueContextManager(change_queue)
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
@@ -1792,6 +1804,17 @@
self.pipeline.removeQueue(item.queue)
+class StaticChangeQueueContextManager(object):
+ def __init__(self, change_queue):
+ self.change_queue = change_queue
+
+ def __enter__(self):
+ return self.change_queue
+
+ def __exit__(self, etype, value, tb):
+ pass
+
+
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
changes_merge = True
@@ -1851,8 +1874,11 @@
new_change_queues.append(a)
return new_change_queues
- def getChangeQueue(self, change):
- return self.pipeline.getQueue(change.project)
+ def getChangeQueue(self, change, existing=None):
+ if existing:
+ return StaticChangeQueueContextManager(existing)
+ return StaticChangeQueueContextManager(
+ self.pipeline.getQueue(change.project))
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.source.canMerge(change,
@@ -1869,13 +1895,13 @@
self.log.debug(" Changeish does not support dependencies")
return
for other_change in change.needed_by_changes:
- other_change_queue = self.getChangeQueue(other_change)
- if other_change_queue != change_queue:
- self.log.debug(" Change %s in project %s can not be enqueued "
- "in the target queue %s" %
- (other_change, other_change.project,
- change_queue))
- continue
+ with self.getChangeQueue(other_change) as other_change_queue:
+ if other_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s can not be "
+ "enqueued in the target queue %s" %
+ (other_change, other_change.project,
+ change_queue))
+ continue
if self.pipeline.source.canMerge(other_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
@@ -1916,36 +1942,40 @@
self.log.debug(" No changes needed")
return True
changes_needed = []
- change_queue = self.getChangeQueue(change)
- for needed_change in change.needs_changes:
- self.log.debug(" Change %s needs change %s:" % (
- change, needed_change))
- if needed_change.is_merged:
- self.log.debug(" Needed change is merged")
- continue
- needed_change_queue = self.getChangeQueue(needed_change)
- if needed_change_queue != change_queue:
- self.log.debug(" Change %s in project %s does not share a "
- "change queue with %s in project %s" %
- (needed_change, needed_change.project,
- change, change.project))
- return False
- if not needed_change.is_current_patchset:
- self.log.debug(" Needed change is not the current patchset")
- return False
- if self.isChangeAlreadyInQueue(needed_change, change_queue):
- self.log.debug(" Needed change is already ahead in the queue")
- continue
- if self.pipeline.source.canMerge(needed_change,
- self.getSubmitAllowNeeds()):
- self.log.debug(" Change %s is needed" % needed_change)
- if needed_change not in changes_needed:
- changes_needed.append(needed_change)
+ # Ignore supplied change_queue
+ with self.getChangeQueue(change) as change_queue:
+ for needed_change in change.needs_changes:
+ self.log.debug(" Change %s needs change %s:" % (
+ change, needed_change))
+ if needed_change.is_merged:
+ self.log.debug(" Needed change is merged")
continue
- # The needed change can't be merged.
- self.log.debug(" Change %s is needed but can not be merged" %
- needed_change)
- return False
+ with self.getChangeQueue(needed_change) as needed_change_queue:
+ if needed_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s does not "
+ "share a change queue with %s "
+ "in project %s" %
+ (needed_change, needed_change.project,
+ change, change.project))
+ return False
+ if not needed_change.is_current_patchset:
+ self.log.debug(" Needed change is not the "
+ "current patchset")
+ return False
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
+ self.log.debug(" Needed change is already ahead "
+ "in the queue")
+ continue
+ if self.pipeline.source.canMerge(needed_change,
+ self.getSubmitAllowNeeds()):
+ self.log.debug(" Change %s is needed" % needed_change)
+ if needed_change not in changes_needed:
+ changes_needed.append(needed_change)
+ continue
+ # The needed change can't be merged.
+ self.log.debug(" Change %s is needed but can not be merged" %
+ needed_change)
+ return False
if changes_needed:
return changes_needed
return True