Merge "Remove push_change_refs from documentation"
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 8719c76..fcf1161 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -116,7 +116,7 @@
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
- self.sched.addEvent(event)
+ self.sched.enqueue(event)
job.sendWorkComplete()
def handle_promote(self, job):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 9316a8c..15520ac 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -109,6 +109,18 @@
self.change_ids = change_ids
+class EnqueueEvent(ManagementEvent):
+ """Enqueue a change into a pipeline
+
+ :arg TriggerEvent trigger_event: a TriggerEvent describing the
+ trigger, pipeline, and change to enqueue
+ """
+
+ def __init__(self, trigger_event):
+ super(EnqueueEvent, self).__init__()
+ self.trigger_event = trigger_event
+
+
class ResultEvent(object):
"""An event that needs to modify the pipeline state due to a
result from an external system."""
@@ -522,6 +534,14 @@
event.wait()
self.log.debug("Promotion complete")
+ def enqueue(self, trigger_event):
+ event = EnqueueEvent(trigger_event)
+ self.management_event_queue.put(event)
+ self.wake_event.set()
+ self.log.debug("Waiting for enqueue")
+ event.wait()
+ self.log.debug("Enqueue complete")
+
def exit(self):
self.log.debug("Prepare to exit")
self._pause = True
@@ -688,7 +708,19 @@
pipeline.manager.addChange(
item.change,
enqueue_time=item.enqueue_time,
- quiet=True)
+ quiet=True,
+ ignore_requirements=True)
+
+ def _doEnqueueEvent(self, event):
+ project = self.layout.projects.get(event.project_name)
+ pipeline = self.layout.pipelines[event.forced_pipeline]
+ trigger = self.triggers.get(event.trigger_name)
+ change = event.getChange(project, trigger)
+ self.log.debug("Event %s for change %s was directly assigned "
+ "to pipeline %s" % (event, change, self))
+ self.log.info("Adding %s, %s to %s" %
+ (project, change, pipeline))
+ pipeline.manager.addChange(change, ignore_requirements=True)
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
@@ -799,6 +831,8 @@
self._doReconfigureEvent(event)
elif isinstance(event, PromoteEvent):
self._doPromoteEvent(event)
+ elif isinstance(event, EnqueueEvent):
+ self._doEnqueueEvent(event.trigger_event)
else:
self.log.error("Unable to handle event %s" % event)
event.done()
@@ -1020,10 +1054,10 @@
def isChangeReadyToBeEnqueued(self, change):
return True
- def enqueueChangesAhead(self, change, quiet):
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements):
return True
- def enqueueChangesBehind(self, change, quiet):
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements):
return True
def checkForChangesNeededBy(self, change):
@@ -1081,7 +1115,8 @@
item.change.project)
return False
- def addChange(self, change, quiet=False, enqueue_time=None):
+ def addChange(self, change, quiet=False, enqueue_time=None,
+ ignore_requirements=False):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -1092,13 +1127,14 @@
change)
return False
- for f in self.changeish_filters:
- if not f.matches(change):
- self.log.debug("Change %s does not match pipeline "
- "requirement %s" % (change, f))
- return False
+ if not ignore_requirements:
+ for f in self.changeish_filters:
+ if not f.matches(change):
+ self.log.debug("Change %s does not match pipeline "
+ "requirement %s" % (change, f))
+ return False
- if not self.enqueueChangesAhead(change, quiet):
+ if not self.enqueueChangesAhead(change, quiet, ignore_requirements):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
@@ -1117,7 +1153,7 @@
if enqueue_time:
item.enqueue_time = enqueue_time
self.reportStats(item)
- self.enqueueChangesBehind(change, quiet)
+ self.enqueueChangesBehind(change, quiet, ignore_requirements)
else:
self.log.error("Unable to find change queue for project %s" %
change.project)
@@ -1708,7 +1744,7 @@
return False
return True
- def enqueueChangesBehind(self, change, quiet):
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
@@ -1724,15 +1760,17 @@
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
- self.addChange(other_change, quiet)
+ self.addChange(other_change, quiet=quiet,
+ ignore_requirements=ignore_requirements)
- def enqueueChangesAhead(self, change, quiet):
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Change %s must be merged ahead of %s" %
(ret, change))
- return self.addChange(ret, quiet)
+ return self.addChange(ret, quiet=quiet,
+ ignore_requirements=ignore_requirements)
def checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)