Idempotent scheduler and QueueItems

Make the scheduler idempotent.  The idea is that after any event,
the scheduler should be able to run and examine the state of every
item in the queue and act accordingly.  This is a change from the
current state where most events are dealt with in context.  This
should ease maintenance as it should facilitate reasoning about
the different actions Zuul might take -- centralizing major
decisions into one function.

Also add a new class QueueItem, which represents a Change(ish)
in a queue.  Currently, Change objects themselves are placed
in the queue, which is confusing information about a change (for
instance: it's number and patchset) as well as information about
the processing of that change in the queue (e.g., the build
history, current build set, merge status, etc.).

Change objects are now cached, which should reduce the number of
queries to Gerrit (except the current algorithm to update them is
very naive and queries Gerrit again on any event relating to a
change).  Changes are expired from the cache when they are not
present or related to any change currently in a pipeline.

There are now two things that need to be asserted at the end of
each test, so use addCleanup in setUp to call a method that
performs those assertions after the test method completes.  Also,
move the existing shutdown method to use addCleanup as well,
because testr experts say that's a best practice.

Change-Id: Id2bf4c484c9e681456c69d99787e7a5b3a247690
Reviewed-on: https://review.openstack.org/34653
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 938bb50..01c82d2 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -56,6 +56,10 @@
     return ret
 
 
+class MergeFailure(Exception):
+    pass
+
+
 class Scheduler(threading.Thread):
     log = logging.getLogger("zuul.Scheduler")
 
@@ -479,12 +483,13 @@
             change = event.getChange(project, self.trigger)
             if event.type == 'patchset-created':
                 pipeline.manager.removeOldVersionsOfChange(change)
-            if not pipeline.manager.eventMatches(event):
-                self.log.debug("Event %s ignored by %s" % (event, pipeline))
-                continue
-            self.log.info("Adding %s, %s to %s" %
-                          (project, change, pipeline))
-            pipeline.manager.addChange(change)
+            if pipeline.manager.eventMatches(event):
+                self.log.info("Adding %s, %s to %s" %
+                              (project, change, pipeline))
+                pipeline.manager.addChange(change)
+            while pipeline.manager.processQueue():
+                pass
+
         self.trigger_event_queue.task_done()
 
     def process_result_queue(self):
@@ -664,6 +669,20 @@
     def enqueueChangesBehind(self, change):
         return True
 
+    def checkForChangesNeededBy(self, change):
+        return True
+
+    def getDependentItems(self, item):
+        orig_item = item
+        items = []
+        while item.item_ahead:
+            items.append(item.item_ahead)
+            item = item.item_ahead
+        self.log.info("Change %s depends on changes %s" %
+                      (orig_item.change,
+                       [x.change for x in items]))
+        return items
+
     def findOldVersionOfChangeAlreadyInQueue(self, change):
         for c in self.pipeline.getChangesInQueue():
             if change.isUpdateOf(c):
@@ -678,7 +697,6 @@
             self.log.debug("Change %s is a new version of %s, removing %s" %
                            (change, old_change, old_change))
             self.removeChange(old_change)
-            self.launchJobs()
 
     def addChange(self, change):
         self.log.debug("Considering adding change %s" % change)
@@ -705,80 +723,168 @@
                            (change, change_queue))
             if self.start_action:
                 self.reportStart(change)
-            change_queue.enqueueChange(change)
-            self.reportStats(change)
+            item = change_queue.enqueueChange(change)
+            self.reportStats(item)
             self.enqueueChangesBehind(change)
         else:
             self.log.error("Unable to find change queue for project %s" %
                            change.project)
             return False
-        self.launchJobs()
 
-    def cancelJobs(self, change, prime=True):
-        self.log.debug("Cancel jobs for change %s" % change)
-        to_remove = []
-        for build, build_change in self.building_jobs.items():
-            if build_change == change:
-                self.log.debug("Found build %s for change %s to cancel" %
-                               (build, change))
-                try:
-                    self.sched.launcher.cancel(build)
-                except:
-                    self.log.exception("Exception while canceling build %s "
-                                       "for change %s" % (build, change))
-                to_remove.append(build)
-        for build in to_remove:
-            self.log.debug("Removing build %s from running builds" % build)
-            build.result = 'CANCELED'
-            del self.building_jobs[build]
+    def dequeueItem(self, item, keep_severed_heads=True):
+        self.log.debug("Removing change %s from queue" % item.change)
+        item_ahead = item.item_ahead
+        change_queue = self.pipeline.getQueue(item.change.project)
+        change_queue.dequeueItem(item)
+        if (keep_severed_heads and not item_ahead and
+            (item.change.is_reportable and not item.reported)):
+            self.log.debug("Adding %s as a severed head" % item.change)
+            change_queue.addSeveredHead(item)
+        self.maintainTriggerCache()
 
-    def dequeueChange(self, change):
-        self.log.debug("Removing change %s from queue" % change)
-        change_queue = self.pipeline.getQueue(change.project)
-        change_queue.dequeueChange(change)
+    def maintainTriggerCache(self):
+        relevant = set()
+        for item in self.pipeline.getAllItems():
+            relevant.add(item.change)
+            relevant.update(item.change.getRelatedChanges())
+        self.sched.trigger.maintainCache(relevant)
 
     def removeChange(self, change):
         # Remove a change from the queue, probably because it has been
         # superceded by another change.
-        self.log.debug("Canceling builds behind change: %s because it is "
-                       "being removed." % change)
-        self.cancelJobs(change)
-        self.dequeueChange(change)
+        for item in self.pipeline.getAllItems():
+            if item.change == change:
+                self.log.debug("Canceling builds behind change: %s "
+                               "because it is being removed." % item.change)
+                self.cancelJobs(item)
+                self.dequeueItem(item, keep_severed_heads=False)
 
-    def _launchJobs(self, change, jobs):
-        self.log.debug("Launching jobs for change %s" % change)
-        ref = change.current_build_set.ref
-        if hasattr(change, 'refspec') and not ref:
-            change.current_build_set.setConfiguration()
-            ref = change.current_build_set.ref
-            mode = model.MERGE_IF_NECESSARY
-            commit = self.sched.merger.mergeChanges([change], ref, mode=mode)
+    def prepareRef(self, item):
+        # Returns False on success.
+        # Returns True if we were unable to prepare the ref.
+        ref = item.current_build_set.ref
+        if hasattr(item.change, 'refspec') and not ref:
+            self.log.debug("Preparing ref for: %s" % item.change)
+            item.current_build_set.setConfiguration()
+            ref = item.current_build_set.ref
+            dependent_items = self.getDependentItems(item)
+            dependent_items.reverse()
+            all_items = dependent_items + [item]
+            if (dependent_items and
+                not dependent_items[-1].current_build_set.commit):
+                self.pipeline.setUnableToMerge(item)
+                return True
+            commit = self.sched.merger.mergeChanges(all_items, ref)
+            item.current_build_set.commit = commit
             if not commit:
-                self.log.info("Unable to merge change %s" % change)
-                self.pipeline.setUnableToMerge(change)
-                self.possiblyReportChange(change)
-                return
-            change.current_build_set.commit = commit
-        for job in self.pipeline.findJobsToRun(change):
-            self.log.debug("Found job %s for change %s" % (job, change))
+                self.log.info("Unable to merge change %s" % item.change)
+                self.pipeline.setUnableToMerge(item)
+                return True
+        return False
+
+    def _launchJobs(self, item, jobs):
+        self.log.debug("Launching jobs for change %s" % item.change)
+        dependent_items = self.getDependentItems(item)
+        for job in jobs:
+            self.log.debug("Found job %s for change %s" % (job, item.change))
             try:
-                build = self.sched.launcher.launch(job, change, self.pipeline)
-                self.building_jobs[build] = change
-                self.log.debug("Adding build %s of job %s to change %s" %
-                               (build, job, change))
-                change.addBuild(build)
+                build = self.sched.launcher.launch(job, item,
+                                                   self.pipeline,
+                                                   dependent_items)
+                self.building_jobs[build] = item
+                self.log.debug("Adding build %s of job %s to item %s" %
+                               (build, job, item))
+                item.addBuild(build)
             except:
                 self.log.exception("Exception while launching job %s "
-                                   "for change %s:" % (job, change))
+                                   "for change %s:" % (job, item.change))
 
-    def launchJobs(self, change=None):
-        if not change:
-            for change in self.pipeline.getAllChanges():
-                self.launchJobs(change)
-            return
-        jobs = self.pipeline.findJobsToRun(change)
+    def launchJobs(self, item):
+        jobs = self.pipeline.findJobsToRun(item)
         if jobs:
-            self._launchJobs(change, jobs)
+            self._launchJobs(item, jobs)
+
+    def cancelJobs(self, item, prime=True):
+        self.log.debug("Cancel jobs for change %s" % item.change)
+        canceled = False
+        to_remove = []
+        if prime and item.current_build_set.builds:
+            item.resetAllBuilds()
+        for build, build_item in self.building_jobs.items():
+            if build_item == item:
+                self.log.debug("Found build %s for change %s to cancel" %
+                               (build, item.change))
+                try:
+                    self.sched.launcher.cancel(build)
+                except:
+                    self.log.exception("Exception while canceling build %s "
+                                       "for change %s" % (build, item.change))
+                to_remove.append(build)
+                canceled = True
+        for build in to_remove:
+            self.log.debug("Removing build %s from running builds" % build)
+            build.result = 'CANCELED'
+            del self.building_jobs[build]
+        if item.item_behind:
+            self.log.debug("Canceling jobs for change %s, behind change %s" %
+                           (item.item_behind.change, item.change))
+            if self.cancelJobs(item.item_behind, prime=prime):
+                canceled = True
+        return canceled
+
+    def _processOneItem(self, item):
+        changed = False
+        item_ahead = item.item_ahead
+        item_behind = item.item_behind
+        if self.prepareRef(item):
+            changed = True
+        if self.checkForChangesNeededBy(item.change) is not True:
+            # It's not okay to enqueue this change, we should remove it.
+            self.log.info("Dequeuing change %s because "
+                          "it can no longer merge" % item.change)
+            self.cancelJobs(item)
+            self.dequeueItem(item, keep_severed_heads=False)
+            self.pipeline.setDequeuedNeedingChange(item)
+            try:
+                self.reportItem(item)
+            except MergeFailure:
+                pass
+            changed = True
+            return changed
+        if not item_ahead:
+            merge_failed = False
+            if self.pipeline.areAllJobsComplete(item):
+                try:
+                    self.reportItem(item)
+                except MergeFailure:
+                    merge_failed = True
+                self.dequeueItem(item)
+                changed = True
+            if merge_failed or self.pipeline.didAnyJobFail(item):
+                if item_behind:
+                    self.cancelJobs(item_behind)
+                    changed = True
+                    self.dequeueItem(item)
+        else:
+            if self.pipeline.didAnyJobFail(item):
+                if item_behind:
+                    if self.cancelJobs(item_behind, prime=False):
+                        changed = True
+                # don't restart yet; this change will eventually become
+                # the head
+        if self.launchJobs(item):
+            changed = True
+        return changed
+
+    def processQueue(self):
+        # Do whatever needs to be done for each change in the queue
+        self.log.debug("Starting queue processor: %s" % self.pipeline.name)
+        changed = False
+        for item in self.pipeline.getAllItems():
+            if self._processOneItem(item):
+                changed = True
+            self.reportStats(item)
+        return changed
 
     def updateBuildDescriptions(self, build_set):
         for build in build_set.getBuilds():
@@ -791,26 +897,24 @@
                 self.sched.launcher.setBuildDescription(build, desc)
 
     def onBuildStarted(self, build):
-        self.log.debug("Build %s started" % build)
         if build not in self.building_jobs:
-            self.log.debug("Build %s not found" % (build))
             # Or triggered externally, or triggered before zuul started,
             # or restarted
             return False
 
+        self.log.debug("Build %s started" % build)
         self.updateBuildDescriptions(build.build_set)
+        while self.processQueue():
+            pass
         return True
 
-    def handleFailedChange(self, change):
-        pass
-
     def onBuildCompleted(self, build):
-        self.log.debug("Build %s completed" % build)
         if build not in self.building_jobs:
-            self.log.debug("Build %s not found" % (build))
             # Or triggered externally, or triggered before zuul started,
             # or restarted
             return False
+
+        self.log.debug("Build %s completed" % build)
         change = self.building_jobs[build]
         self.log.debug("Found change %s which triggered completed build %s" %
                        (change, build))
@@ -820,97 +924,68 @@
         self.pipeline.setResult(change, build)
         self.log.info("Change %s status is now:\n %s" %
                       (change, self.pipeline.formatStatus(change)))
-
-        if self.pipeline.didAnyJobFail(change):
-            self.handleFailedChange(change)
-
-        self.reportChanges()
-        self.launchJobs()
         self.updateBuildDescriptions(build.build_set)
+        while self.processQueue():
+            pass
         return True
 
-    def reportChanges(self):
-        self.log.debug("Searching for changes to report")
-        reported = False
-        for change in self.pipeline.getAllChanges():
-            self.log.debug("  checking %s" % change)
-            if self.pipeline.areAllJobsComplete(change):
-                self.log.debug("  possibly reporting %s" % change)
-                if self.possiblyReportChange(change):
-                    reported = True
-        if reported:
-            self.reportChanges()
-        self.log.debug("Done searching for changes to report")
+    def reportItem(self, item):
+        if item.change.is_reportable and item.reported:
+            raise Exception("Already reported change %s" % item.change)
+        ret = self._reportItem(item)
+        if self.changes_merge:
+            succeeded = self.pipeline.didAllJobsSucceed(item)
+            merged = (not ret)
+            if merged:
+                merged = self.sched.trigger.isMerged(item.change,
+                                                     item.change.branch)
+            self.log.info("Reported change %s status: all-succeeded: %s, "
+                          "merged: %s" % (item.change, succeeded, merged))
+            if not (succeeded and merged):
+                self.log.debug("Reported change %s failed tests or failed "
+                               "to merge" % (item.change))
+                raise MergeFailure("Change %s failed to merge" % item.change)
 
-    def possiblyReportChange(self, change):
-        self.log.debug("Possibly reporting change %s" % change)
-        # Even if a change isn't reportable, keep going so that it
-        # gets dequeued in the normal manner.
-        if change.is_reportable and change.reported:
-            self.log.debug("Change %s already reported" % change)
+    def _reportItem(self, item):
+        if not item.change.is_reportable:
             return False
-        change_ahead = change.change_ahead
-        if not change_ahead:
-            self.log.debug("Change %s is at the front of the queue, "
-                           "reporting" % (change))
-            ret = self.reportChange(change)
-            if self.changes_merge:
-                succeeded = self.pipeline.didAllJobsSucceed(change)
-                merged = (not ret)
-                if merged:
-                    merged = self.sched.trigger.isMerged(change, change.branch)
-                self.log.info("Reported change %s status: all-succeeded: %s, "
-                              "merged: %s" % (change, succeeded, merged))
-                if not (succeeded and merged):
-                    self.log.debug("Reported change %s failed tests or failed "
-                                   "to merge" % (change))
-                    self.handleFailedChange(change)
-                    return True
-            self.log.debug("Removing reported change %s from queue" %
-                           change)
-            change_queue = self.pipeline.getQueue(change.project)
-            change_queue.dequeueChange(change)
-            self.reportStats(change)
-            return True
-
-    def reportChange(self, change):
-        if not change.is_reportable:
-            return False
-        if change.reported:
+        if item.change.is_reportable and item.reported:
             return 0
-        self.log.debug("Reporting change %s" % change)
+        self.log.debug("Reporting change %s" % item.change)
         ret = None
-        if self.pipeline.didAllJobsSucceed(change):
+        if self.pipeline.didAllJobsSucceed(item):
+            self.log.debug("success %s %s" % (self.success_action,
+                                              self.failure_action))
             action = self.success_action
-            change.setReportedResult('SUCCESS')
+            item.setReportedResult('SUCCESS')
         else:
             action = self.failure_action
-            change.setReportedResult('FAILURE')
-        report = self.formatReport(change)
-        change.reported = True
+            item.setReportedResult('FAILURE')
+        report = self.formatReport(item)
+        item.reported = True
         try:
             self.log.info("Reporting change %s, action: %s" %
-                          (change, action))
-            ret = self.sched.trigger.report(change, report, action)
+                          (item.change, action))
+            ret = self.sched.trigger.report(item.change, report, action)
             if ret:
                 self.log.error("Reporting change %s received: %s" %
-                               (change, ret))
+                               (item.change, ret))
         except:
             self.log.exception("Exception while reporting:")
-            change.setReportedResult('ERROR')
-        self.updateBuildDescriptions(change.current_build_set)
+            item.setReportedResult('ERROR')
+        self.updateBuildDescriptions(item.current_build_set)
         return ret
 
-    def formatReport(self, changeish):
+    def formatReport(self, item):
         ret = ''
-        if self.pipeline.didAllJobsSucceed(changeish):
+        if self.pipeline.didAllJobsSucceed(item):
             ret += self.pipeline.success_message + '\n\n'
         else:
             ret += self.pipeline.failure_message + '\n\n'
 
-        if changeish.dequeued_needing_change:
+        if item.dequeued_needing_change:
             ret += "This change depends on a change that failed to merge."
-        elif changeish.current_build_set.unable_to_merge:
+        elif item.current_build_set.unable_to_merge:
             ret += "This change was unable to be automatically merged "\
                    "with the current state of the repository. Please "\
                    "rebase your change and upload a new patchset."
@@ -919,8 +994,8 @@
                 url_pattern = self.sched.config.get('zuul', 'url_pattern')
             else:
                 url_pattern = None
-            for job in self.pipeline.getJobs(changeish):
-                build = changeish.current_build_set.getBuild(job.name)
+            for job in self.pipeline.getJobs(item.change):
+                build = item.current_build_set.getBuild(job.name)
                 result = build.result
                 pattern = url_pattern
                 if result == 'SUCCESS':
@@ -934,7 +1009,7 @@
                     if job.failure_pattern:
                         pattern = job.failure_pattern
                 if pattern:
-                    url = pattern.format(change=changeish,
+                    url = pattern.format(change=item.change,
                                          pipeline=self.pipeline,
                                          job=job,
                                          build=build)
@@ -969,7 +1044,7 @@
               {change.number},{change.patchset}</a></li>'.format(
                 change=change)
 
-        change = build.build_set.change
+        change = build.build_set.item.change
 
         for build in build.build_set.getBuilds():
             if build.url:
@@ -1059,30 +1134,30 @@
         ret = ret.format(**locals())
         return ret
 
-    def reportStats(self, change):
+    def reportStats(self, item):
         if not statsd:
             return
         try:
-            # Update the guage on enqueue and dequeue, but timers only
+            # Update the gauge on enqueue and dequeue, but timers only
             # when dequeing.
-            if change.dequeue_time:
-                dt = int((change.dequeue_time - change.enqueue_time) * 1000)
+            if item.dequeue_time:
+                dt = int((item.dequeue_time - item.enqueue_time) * 1000)
             else:
                 dt = None
-            changes = len(self.pipeline.getAllChanges())
+            items = len(self.pipeline.getAllItems())
 
             # stats.timers.zuul.pipeline.NAME.resident_time
             # stats_counts.zuul.pipeline.NAME.total_changes
             # stats.gauges.zuul.pipeline.NAME.current_changes
             key = 'zuul.pipeline.%s' % self.pipeline.name
-            statsd.gauge(key + '.current_changes', changes)
+            statsd.gauge(key + '.current_changes', items)
             if dt:
                 statsd.timing(key + '.resident_time', dt)
                 statsd.incr(key + '.total_changes')
 
             # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
             # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
-            project_name = change.project.name.replace('/', '.')
+            project_name = item.change.project.name.replace('/', '.')
             key += '.%s' % project_name
             if dt:
                 statsd.timing(key + '.resident_time', dt)
@@ -1206,133 +1281,3 @@
         self.log.debug("  Change %s is needed but can not be merged" %
                        change.needs_change)
         return False
-
-    def _getDependentChanges(self, change):
-        orig_change = change
-        changes = []
-        while change.change_ahead:
-            changes.append(change.change_ahead)
-            change = change.change_ahead
-        self.log.info("Change %s depends on changes %s" % (orig_change,
-                                                           changes))
-        return changes
-
-    def _unableToMerge(self, change, all_changes):
-        self.log.info("Unable to merge changes %s" % all_changes)
-        self.pipeline.setUnableToMerge(change)
-        self.possiblyReportChange(change)
-
-    def _launchJobs(self, change, jobs):
-        self.log.debug("Launching jobs for change %s" % change)
-        ref = change.current_build_set.ref
-        if hasattr(change, 'refspec') and not ref:
-            change.current_build_set.setConfiguration()
-            ref = change.current_build_set.ref
-            dependent_changes = self._getDependentChanges(change)
-            dependent_changes.reverse()
-            all_changes = dependent_changes + [change]
-            if (dependent_changes and
-                not dependent_changes[-1].current_build_set.commit):
-                self._unableToMerge(change, all_changes)
-                return
-            commit = self.sched.merger.mergeChanges(all_changes, ref)
-            change.current_build_set.commit = commit
-            if not commit:
-                self._unableToMerge(change, all_changes)
-                return
-        #TODO: remove this line after GERRIT_CHANGES is gone
-        dependent_changes = self._getDependentChanges(change)
-        for job in jobs:
-            self.log.debug("Found job %s for change %s" % (job, change))
-            try:
-                #TODO: remove dependent_changes after GERRIT_CHANGES is gone
-                build = self.sched.launcher.launch(job, change, self.pipeline,
-                                                   dependent_changes)
-                self.building_jobs[build] = change
-                self.log.debug("Adding build %s of job %s to change %s" %
-                               (build, job, change))
-                change.addBuild(build)
-            except:
-                self.log.exception("Exception while launching job %s "
-                                   "for change %s:" % (job, change))
-
-    def cancelJobs(self, change, prime=True):
-        self.log.debug("Cancel jobs for change %s" % change)
-        to_remove = []
-        if prime:
-            change.resetAllBuilds()
-        for build, build_change in self.building_jobs.items():
-            if build_change == change:
-                self.log.debug("Found build %s for change %s to cancel" %
-                               (build, change))
-                try:
-                    self.sched.launcher.cancel(build)
-                except:
-                    self.log.exception("Exception while canceling build %s "
-                                       "for change %s" % (build, change))
-                to_remove.append(build)
-        for build in to_remove:
-            self.log.debug("Removing build %s from running builds" % build)
-            build.result = 'CANCELED'
-            del self.building_jobs[build]
-        if change.change_behind:
-            self.log.debug("Canceling jobs for change %s, behind change %s" %
-                           (change.change_behind, change))
-            self.cancelJobs(change.change_behind, prime=prime)
-
-    def removeChange(self, change):
-        # Remove a change from the queue (even the middle), probably
-        # because it has been superceded by another change (or
-        # otherwise will not merge).
-        self.log.debug("Canceling builds behind change: %s because it is "
-                       "being removed." % change)
-        self.cancelJobs(change)
-        self.dequeueChange(change, keep_severed_heads=False)
-
-    def handleFailedChange(self, change):
-        # A build failed. All changes behind this change will need to
-        # be retested. To free up resources cancel the builds behind
-        # this one as they will be rerun anyways.
-        change_ahead = change.change_ahead
-        change_behind = change.change_behind
-        if not change_ahead:
-            # If we're at the head of the queue, allow changes to relaunch
-            if change_behind:
-                self.log.info("Canceling/relaunching jobs for change %s "
-                              "behind failed change %s" %
-                              (change_behind, change))
-                self.cancelJobs(change_behind)
-            self.dequeueChange(change)
-        elif change_behind:
-            self.log.debug("Canceling builds behind change: %s due to "
-                           "failure." % change)
-            self.cancelJobs(change_behind, prime=False)
-
-    def dequeueChange(self, change, keep_severed_heads=True):
-        self.log.debug("Removing change %s from queue" % change)
-        change_ahead = change.change_ahead
-        change_behind = change.change_behind
-        change_queue = self.pipeline.getQueue(change.project)
-        change_queue.dequeueChange(change)
-        if keep_severed_heads and not change_ahead and not change.reported:
-            self.log.debug("Adding %s as a severed head" % change)
-            change_queue.addSeveredHead(change)
-        self.dequeueDependentChanges(change_behind)
-
-    def dequeueDependentChanges(self, change):
-        # When a change is dequeued after failing, dequeue any changes that
-        # depend on it.
-        while change:
-            change_behind = change.change_behind
-            if self.checkForChangesNeededBy(change) is not True:
-                # It's not okay to enqueue this change, we should remove it.
-                self.log.info("Dequeuing change %s because "
-                              "it can no longer merge" % change)
-                change_queue = self.pipeline.getQueue(change.project)
-                change_queue.dequeueChange(change)
-                self.pipeline.setDequeuedNeedingChange(change)
-                self.reportChange(change)
-                # We don't need to recurse, because any changes that might
-                # be affected by the removal of this change are behind us
-                # in the queue, so we can continue walking backwards.
-            change = change_behind