Merge "Remove push_change_refs from documentation"
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 8719c76..fcf1161 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -116,7 +116,7 @@
         if errors:
             job.sendWorkException(errors.encode('utf8'))
         else:
-            self.sched.addEvent(event)
+            self.sched.enqueue(event)
             job.sendWorkComplete()
 
     def handle_promote(self, job):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 9316a8c..15520ac 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -109,6 +109,18 @@
         self.change_ids = change_ids
 
 
+class EnqueueEvent(ManagementEvent):
+    """Enqueue a change into a pipeline
+
+    :arg TriggerEvent trigger_event: a TriggerEvent describing the
+        trigger, pipeline, and change to enqueue
+    """
+
+    def __init__(self, trigger_event):
+        super(EnqueueEvent, self).__init__()
+        self.trigger_event = trigger_event
+
+
 class ResultEvent(object):
     """An event that needs to modify the pipeline state due to a
     result from an external system."""
@@ -522,6 +534,14 @@
         event.wait()
         self.log.debug("Promotion complete")
 
+    def enqueue(self, trigger_event):
+        event = EnqueueEvent(trigger_event)
+        self.management_event_queue.put(event)
+        self.wake_event.set()
+        self.log.debug("Waiting for enqueue")
+        event.wait()
+        self.log.debug("Enqueue complete")
+
     def exit(self):
         self.log.debug("Prepare to exit")
         self._pause = True
@@ -688,7 +708,19 @@
             pipeline.manager.addChange(
                 item.change,
                 enqueue_time=item.enqueue_time,
-                quiet=True)
+                quiet=True,
+                ignore_requirements=True)
+
+    def _doEnqueueEvent(self, event):
+        project = self.layout.projects.get(event.project_name)
+        pipeline = self.layout.pipelines[event.forced_pipeline]
+        trigger = self.triggers.get(event.trigger_name)
+        change = event.getChange(project, trigger)
+        self.log.debug("Event %s for change %s was directly assigned "
+                       "to pipeline %s" % (event, change, self))
+        self.log.info("Adding %s, %s to %s" %
+                      (project, change, pipeline))
+        pipeline.manager.addChange(change, ignore_requirements=True)
 
     def _areAllBuildsComplete(self):
         self.log.debug("Checking if all builds are complete")
@@ -799,6 +831,8 @@
                 self._doReconfigureEvent(event)
             elif isinstance(event, PromoteEvent):
                 self._doPromoteEvent(event)
+            elif isinstance(event, EnqueueEvent):
+                self._doEnqueueEvent(event.trigger_event)
             else:
                 self.log.error("Unable to handle event %s" % event)
             event.done()
@@ -1020,10 +1054,10 @@
     def isChangeReadyToBeEnqueued(self, change):
         return True
 
-    def enqueueChangesAhead(self, change, quiet):
+    def enqueueChangesAhead(self, change, quiet, ignore_requirements):
         return True
 
-    def enqueueChangesBehind(self, change, quiet):
+    def enqueueChangesBehind(self, change, quiet, ignore_requirements):
         return True
 
     def checkForChangesNeededBy(self, change):
@@ -1081,7 +1115,8 @@
                            item.change.project)
             return False
 
-    def addChange(self, change, quiet=False, enqueue_time=None):
+    def addChange(self, change, quiet=False, enqueue_time=None,
+                  ignore_requirements=False):
         self.log.debug("Considering adding change %s" % change)
         if self.isChangeAlreadyInQueue(change):
             self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -1092,13 +1127,14 @@
                            change)
             return False
 
-        for f in self.changeish_filters:
-            if not f.matches(change):
-                self.log.debug("Change %s does not match pipeline "
-                               "requirement %s" % (change, f))
-                return False
+        if not ignore_requirements:
+            for f in self.changeish_filters:
+                if not f.matches(change):
+                    self.log.debug("Change %s does not match pipeline "
+                                   "requirement %s" % (change, f))
+                    return False
 
-        if not self.enqueueChangesAhead(change, quiet):
+        if not self.enqueueChangesAhead(change, quiet, ignore_requirements):
             self.log.debug("Failed to enqueue changes ahead of %s" % change)
             return False
 
@@ -1117,7 +1153,7 @@
             if enqueue_time:
                 item.enqueue_time = enqueue_time
             self.reportStats(item)
-            self.enqueueChangesBehind(change, quiet)
+            self.enqueueChangesBehind(change, quiet, ignore_requirements)
         else:
             self.log.error("Unable to find change queue for project %s" %
                            change.project)
@@ -1708,7 +1744,7 @@
             return False
         return True
 
-    def enqueueChangesBehind(self, change, quiet):
+    def enqueueChangesBehind(self, change, quiet, ignore_requirements):
         to_enqueue = []
         self.log.debug("Checking for changes needing %s:" % change)
         if not hasattr(change, 'needed_by_changes'):
@@ -1724,15 +1760,17 @@
             self.log.debug("  No changes need %s" % change)
 
         for other_change in to_enqueue:
-            self.addChange(other_change, quiet)
+            self.addChange(other_change, quiet=quiet,
+                           ignore_requirements=ignore_requirements)
 
-    def enqueueChangesAhead(self, change, quiet):
+    def enqueueChangesAhead(self, change, quiet, ignore_requirements):
         ret = self.checkForChangesNeededBy(change)
         if ret in [True, False]:
             return ret
         self.log.debug("  Change %s must be merged ahead of %s" %
                        (ret, change))
-        return self.addChange(ret, quiet)
+        return self.addChange(ret, quiet=quiet,
+                              ignore_requirements=ignore_requirements)
 
     def checkForChangesNeededBy(self, change):
         self.log.debug("Checking for changes needed by %s:" % change)