Merge "Use NNFI scheduler algorithm"
diff --git a/NEWS.rst b/NEWS.rst
index db269a4..c4901a8 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -25,12 +25,6 @@
   triggers later).  See the sample layout.yaml and Zuul section of the
   documentation.
 
-* The default behavior is now to immediately dequeue changes that have
-  merge conflicts, even those not at the head of the queue.  To enable
-  the old behavior (which would wait until the conflicting change was
-  at the head before dequeuing it), see the new "dequeue-on-conflict"
-  option.
-
 * Some statsd keys have changed in a backwards incompatible way:
   * The counters and timers of the form zuul.job.{name} is now split
     into several keys of the form:
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index f8e070c..6adfa30 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -346,14 +346,6 @@
   well.  To suppress this behavior (and allow jobs to continue
   running), set this to ``false``.  Default: ``true``.
 
-**dequeue-on-conflict**
-  Normally, if there is a merge conflict or similar error with a
-  change, Zuul will immediately remove it from the queue, even if the
-  error is only due to a change that happened to be enqueued ahead of
-  it.  If you would like to keep the change in the queue until it is
-  at the head to be certain that the merge conflict is intrinsic to
-  the change, set this to ``false``.  Default: ``true``.
-
 **success**
   Describes where Zuul should report to if all the jobs complete
   successfully.
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 1cb8688..dc659fb 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -75,7 +75,6 @@
         verified: -1
 
   - name: conflict
-    dequeue-on-conflict: false
     manager: DependentPipelineManager
     failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
     trigger:
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 70b68c5..395ff25 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1068,9 +1068,6 @@
                     print 'pipeline %s queue %s contents %s' % (
                         pipeline.name, queue.name, queue.queue)
                 self.assertEqual(len(queue.queue), 0)
-                if len(queue.severed_heads) != 0:
-                    print 'heads', queue.severed_heads
-                self.assertEqual(len(queue.severed_heads), 0)
 
     def assertReportedStat(self, key, value=None, kind=None):
         start = time.time()
@@ -1403,11 +1400,20 @@
         self.release(self.builds[2])
         self.waitUntilSettled()
 
-        # project-test1 and project-test2 for A, project-test2 for B
-        self.assertEqual(len(self.builds), 3)
+        # project-test1 and project-test2 for A
+        # project-test2 for B
+        # project-merge for C (without B)
+        self.assertEqual(len(self.builds), 4)
         self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
 
-        # check that build status of aborted jobs are masked ('CANCELED')
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # project-test1 and project-test2 for A
+        # project-test2 for B
+        # project-test1 and project-test2 for C
+        self.assertEqual(len(self.builds), 5)
+
         items = self.sched.layout.pipelines['gate'].getAllItems()
         builds = items[0].current_build_set.getBuilds()
         self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
@@ -1418,7 +1424,7 @@
         self.assertEqual(self.countJobResults(builds, None), 1)
         builds = items[2].current_build_set.getBuilds()
         self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
-        self.assertEqual(self.countJobResults(builds, 'CANCELED'), 2)
+        self.assertEqual(self.countJobResults(builds, None), 2)
 
         self.worker.hold_jobs_in_build = False
         self.worker.release()
@@ -1667,8 +1673,9 @@
         self.waitUntilSettled()
         self.gearman_server.release('.*-merge')
         self.waitUntilSettled()
-        queue = self.gearman_server.getQueue()
-        self.getParameter(queue[-1], 'ZUUL_REF')
+
+        self.assertEqual(len(self.history), 2)  # A and C merge jobs
+
         self.gearman_server.hold_jobs_in_queue = False
         self.gearman_server.release()
         self.waitUntilSettled()
@@ -1679,32 +1686,7 @@
         self.assertEqual(A.reported, 2)
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
-
-    def test_dequeue_conflict(self):
-        "Test that the option to dequeue merge conflicts works"
-
-        self.gearman_server.hold_jobs_in_queue = True
-        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        A.addPatchset(['conflict'])
-        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
-        B.addPatchset(['conflict'])
-        A.addApproval('CRVW', 2)
-        B.addApproval('CRVW', 2)
-        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
-        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
-        self.waitUntilSettled()
-
-        self.assertEqual(A.reported, 1)
-        self.assertEqual(B.reported, 2)
-
-        self.gearman_server.hold_jobs_in_queue = False
-        self.gearman_server.release()
-        self.waitUntilSettled()
-
-        self.assertEqual(A.data['status'], 'MERGED')
-        self.assertEqual(B.data['status'], 'NEW')
-        self.assertEqual(A.reported, 2)
-        self.assertEqual(B.reported, 2)
+        self.assertEqual(len(self.history), 6)
 
     def test_post(self):
         "Test that post jobs run"
@@ -1888,6 +1870,66 @@
         self.assertEqual(C.reported, 2)
         self.assertEqual(len(self.history), 1)
 
+    def test_failing_dependent_changes(self):
+        "Test that failing dependent patches are taken out of stream"
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+        E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+        D.addApproval('CRVW', 2)
+        E.addApproval('CRVW', 2)
+
+        # E, D -> C -> B, A
+
+        D.setDependsOn(C, 1)
+        C.setDependsOn(B, 1)
+
+        self.worker.addFailTest('project-test1', B)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(E.addApproval('APRV', 1))
+
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.worker.hold_jobs_in_build = False
+        for build in self.builds:
+            if build.parameters['ZUUL_CHANGE'] != '1':
+                build.release()
+                self.waitUntilSettled()
+
+        self.worker.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(B.reported, 2)
+        self.assertEqual(C.data['status'], 'NEW')
+        self.assertEqual(C.reported, 2)
+        self.assertEqual(D.data['status'], 'NEW')
+        self.assertEqual(D.reported, 2)
+        self.assertEqual(E.data['status'], 'MERGED')
+        self.assertEqual(E.reported, 2)
+        self.assertEqual(len(self.history), 18)
+
     def test_head_is_dequeued_once(self):
         "Test that if a change at the head fails it is dequeued only once"
         # If it's dequeued more than once, we should see extra
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 00900a0..0d08f1b 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -64,7 +64,6 @@
                 'success-message': str,
                 'failure-message': str,
                 'dequeue-on-new-patchset': bool,
-                'dequeue-on-conflict': bool,
                 'trigger': trigger,
                 'success': report_actions,
                 'failure': report_actions,
diff --git a/zuul/model.py b/zuul/model.py
index d68ac91..ffbad01 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -51,7 +51,6 @@
         self.failure_message = None
         self.success_message = None
         self.dequeue_on_new_patchset = True
-        self.dequeue_on_conflict = True
         self.job_trees = {}  # project -> JobTree
         self.manager = None
         self.queues = []
@@ -169,6 +168,7 @@
                 return True
             if build.result != 'SUCCESS':
                 return True
+
         if not item.item_ahead:
             return False
         return self.isHoldingFollowingChanges(item.item_ahead)
@@ -212,7 +212,6 @@
         items = []
         for shared_queue in self.queues:
             items.extend(shared_queue.queue)
-            items.extend(shared_queue.severed_heads)
         return items
 
     def formatStatusHTML(self):
@@ -222,8 +221,8 @@
                 s = 'Change queue: %s' % queue.name
                 ret += s + '\n'
                 ret += '-' * len(s) + '\n'
-            for head in queue.getHeads():
-                ret += self.formatStatus(head, html=True)
+            for item in queue.queue:
+                ret += self.formatStatus(item, html=True)
         return ret
 
     def formatStatusJSON(self):
@@ -235,18 +234,21 @@
             j_queue = dict(name=queue.name)
             j_queues.append(j_queue)
             j_queue['heads'] = []
-            for head in queue.getHeads():
-                j_changes = []
-                e = head
-                while e:
-                    j_changes.append(self.formatItemJSON(e))
-                    if (len(j_changes) > 1 and
-                        (j_changes[-2]['remaining_time'] is not None) and
-                        (j_changes[-1]['remaining_time'] is not None)):
-                        j_changes[-1]['remaining_time'] = max(
-                            j_changes[-2]['remaining_time'],
-                            j_changes[-1]['remaining_time'])
-                    e = e.item_behind
+
+            j_changes = []
+            for e in queue.queue:
+                if not e.item_ahead:
+                    if j_changes:
+                        j_queue['heads'].append(j_changes)
+                    j_changes = []
+                j_changes.append(self.formatItemJSON(e))
+                if (len(j_changes) > 1 and
+                    (j_changes[-2]['remaining_time'] is not None) and
+                    (j_changes[-1]['remaining_time'] is not None)):
+                    j_changes[-1]['remaining_time'] = max(
+                        j_changes[-2]['remaining_time'],
+                        j_changes[-1]['remaining_time'])
+            if j_changes:
                 j_queue['heads'].append(j_changes)
         return j_pipeline
 
@@ -261,9 +263,11 @@
                 changeish.url,
                 changeish._id())
         else:
-            ret += '%sProject %s change %s\n' % (indent_str,
-                                                 changeish.project.name,
-                                                 changeish._id())
+            ret += '%sProject %s change %s based on %s\n' % (
+                indent_str,
+                changeish.project.name,
+                changeish._id(),
+                item.item_ahead)
         for job in self.getJobs(changeish):
             build = item.current_build_set.getBuild(job.name)
             if build:
@@ -284,9 +288,6 @@
                     job_name = '<a href="%s">%s</a>' % (url, job_name)
             ret += '%s  %s: %s%s' % (indent_str, job_name, result, voting)
             ret += '\n'
-        if item.item_behind:
-            ret += '%sFollowed by:\n' % (indent_str)
-            ret += self.formatStatus(item.item_behind, indent + 2, html)
         return ret
 
     def formatItemJSON(self, item):
@@ -333,19 +334,7 @@
                     result=result,
                     voting=job.voting))
         if self.haveAllJobsStarted(item):
-            # if a change ahead has failed, we are unknown.
-            item_ahead_failed = False
-            i = item.item_ahead
-            while i:
-                if self.didAnyJobFail(i):
-                    item_ahead_failed = True
-                    i = None  # safe to stop looking
-                else:
-                    i = i.item_ahead
-            if item_ahead_failed:
-                ret['remaining_time'] = None
-            else:
-                ret['remaining_time'] = max_remaining
+            ret['remaining_time'] = max_remaining
         else:
             ret['remaining_time'] = None
         return ret
@@ -385,7 +374,6 @@
         self.projects = []
         self._jobs = set()
         self.queue = []
-        self.severed_heads = []
         self.dependent = dependent
 
     def __repr__(self):
@@ -411,45 +399,44 @@
     def enqueueItem(self, item):
         if self.dependent and self.queue:
             item.item_ahead = self.queue[-1]
-            item.item_ahead.item_behind = item
+            item.item_ahead.items_behind.append(item)
         self.queue.append(item)
 
     def dequeueItem(self, item):
         if item in self.queue:
             self.queue.remove(item)
-        if item in self.severed_heads:
-            self.severed_heads.remove(item)
         if item.item_ahead:
-            item.item_ahead.item_behind = item.item_behind
-        if item.item_behind:
-            item.item_behind.item_ahead = item.item_ahead
+            item.item_ahead.items_behind.remove(item)
+        for item_behind in item.items_behind:
+            if item.item_ahead:
+                item.item_ahead.items_behind.append(item_behind)
+            item_behind.item_ahead = item.item_ahead
         item.item_ahead = None
-        item.item_behind = None
+        item.items_behind = []
         item.dequeue_time = time.time()
 
-    def addSeveredHead(self, item):
-        self.severed_heads.append(item)
+    def moveItem(self, item, item_ahead):
+        if not self.dependent:
+            return False
+        if item.item_ahead == item_ahead:
+            return False
+        # Remove from current location
+        if item.item_ahead:
+            item.item_ahead.items_behind.remove(item)
+        for item_behind in item.items_behind:
+            if item.item_ahead:
+                item.item_ahead.items_behind.append(item_behind)
+            item_behind.item_ahead = item.item_ahead
+        # Add to new location
+        item.item_ahead = item_ahead
+        if item.item_ahead:
+            item.item_ahead.items_behind.append(item)
+        return True
 
     def mergeChangeQueue(self, other):
         for project in other.projects:
             self.addProject(project)
 
-    def getHead(self):
-        if not self.queue:
-            return None
-        return self.queue[0]
-
-    def getHeads(self):
-        heads = []
-        if self.dependent:
-            h = self.getHead()
-            if h:
-                heads.append(h)
-        else:
-            heads.extend(self.queue)
-        heads.extend(self.severed_heads)
-        return heads
-
 
 class Project(object):
     def __init__(self, name):
@@ -592,6 +579,7 @@
         self.commit = None
         self.unable_to_merge = False
         self.unable_to_merge_message = None
+        self.failing_reasons = []
 
     def setConfiguration(self):
         # The change isn't enqueued until after it's created
@@ -632,11 +620,19 @@
         self.current_build_set = BuildSet(self)
         self.build_sets.append(self.current_build_set)
         self.item_ahead = None
-        self.item_behind = None
+        self.items_behind = []
         self.enqueue_time = None
         self.dequeue_time = None
         self.reported = False
 
+    def __repr__(self):
+        if self.pipeline:
+            pipeline = self.pipeline.name
+        else:
+            pipeline = None
+        return '<QueueItem 0x%x for %s in %s>' % (
+            id(self), self.change, pipeline)
+
     def resetAllBuilds(self):
         old = self.current_build_set
         self.current_build_set.result = 'CANCELED'
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8a4d942..514be2f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -133,8 +133,6 @@
                                                          "Build succeeded.")
             pipeline.dequeue_on_new_patchset = conf_pipeline.get(
                 'dequeue-on-new-patchset', True)
-            pipeline.dequeue_on_conflict = conf_pipeline.get(
-                'dequeue-on-conflict', True)
 
             action_reporters = {}
             for action in ['start', 'success', 'failure']:
@@ -456,10 +454,9 @@
                                name)
                 items_to_remove = []
                 for shared_queue in old_pipeline.queues:
-                    for item in (shared_queue.queue +
-                                 shared_queue.severed_heads):
+                    for item in shared_queue.queue:
                         item.item_ahead = None
-                        item.item_behind = None
+                        item.items_behind = []
                         item.pipeline = None
                         project = layout.projects.get(item.change.project.name)
                         if not project:
@@ -470,9 +467,7 @@
                             items_to_remove.append(item)
                             continue
                         item.change.project = project
-                        severed = item in shared_queue.severed_heads
-                        if not new_pipeline.manager.reEnqueueItem(
-                            item, severed=severed):
+                        if not new_pipeline.manager.reEnqueueItem(item):
                             items_to_remove.append(item)
                 builds_to_remove = []
                 for build, item in old_pipeline.manager.building_jobs.items():
@@ -794,6 +789,9 @@
     def checkForChangesNeededBy(self, change):
         return True
 
+    def getFailingDependentItem(self, item):
+        return None
+
     def getDependentItems(self, item):
         orig_item = item
         items = []
@@ -805,6 +803,12 @@
                        [x.change for x in items]))
         return items
 
+    def getItemForChange(self, change):
+        for item in self.pipeline.getAllItems():
+            if item.change.equals(change):
+                return item
+        return None
+
     def findOldVersionOfChangeAlreadyInQueue(self, change):
         for c in self.pipeline.getChangesInQueue():
             if change.isUpdateOf(c):
@@ -820,15 +824,12 @@
                            (change, old_change, old_change))
             self.removeChange(old_change)
 
-    def reEnqueueItem(self, item, severed=False):
+    def reEnqueueItem(self, item):
         change_queue = self.pipeline.getQueue(item.change.project)
         if change_queue:
             self.log.debug("Re-enqueing change %s in queue %s" %
                            (item.change, change_queue))
-            if severed:
-                change_queue.addSeveredHead(item)
-            else:
-                change_queue.enqueueItem(item)
+            change_queue.enqueueItem(item)
             self.reportStats(item)
             return True
         else:
@@ -869,15 +870,10 @@
                            change.project)
             return False
 
-    def dequeueItem(self, item, keep_severed_heads=True):
+    def dequeueItem(self, item):
         self.log.debug("Removing change %s from queue" % item.change)
-        item_ahead = item.item_ahead
         change_queue = self.pipeline.getQueue(item.change.project)
         change_queue.dequeueItem(item)
-        if (keep_severed_heads and not item_ahead and
-            (item.change.is_reportable and not item.reported)):
-            self.log.debug("Adding %s as a severed head" % item.change)
-            change_queue.addSeveredHead(item)
         self.sched._maintain_trigger_cache = True
 
     def removeChange(self, change):
@@ -888,7 +884,7 @@
                 self.log.debug("Canceling builds behind change: %s "
                                "because it is being removed." % item.change)
                 self.cancelJobs(item)
-                self.dequeueItem(item, keep_severed_heads=False)
+                self.dequeueItem(item)
                 self.reportStats(item)
 
     def prepareRef(self, item):
@@ -901,29 +897,14 @@
             ref = item.current_build_set.ref
             dependent_items = self.getDependentItems(item)
             dependent_items.reverse()
-            dependent_str = ', '.join(
-                ['%s' % i.change.number for i in dependent_items
-                 if i.change.project == item.change.project])
-            if dependent_str:
-                msg = \
-                    "This change was unable to be automatically merged "\
-                    "with the current state of the repository and the "\
-                    "following changes which were enqueued ahead of it: "\
-                    "%s. Please rebase your change and upload a new "\
-                    "patchset." % dependent_str
-            else:
-                msg = "This change was unable to be automatically merged "\
-                    "with the current state of the repository. Please "\
-                    "rebase your change and upload a new patchset."
             all_items = dependent_items + [item]
-            if (dependent_items and
-                not dependent_items[-1].current_build_set.commit):
-                self.pipeline.setUnableToMerge(item, msg)
-                return True
             commit = self.sched.merger.mergeChanges(all_items, ref)
             item.current_build_set.commit = commit
             if not commit:
                 self.log.info("Unable to merge change %s" % item.change)
+                msg = ("This change was unable to be automatically merged "
+                       "with the current state of the repository. Please "
+                       "rebase your change and upload a new patchset.")
                 self.pipeline.setUnableToMerge(item, msg)
                 return True
         return False
@@ -971,74 +952,94 @@
             self.log.debug("Removing build %s from running builds" % build)
             build.result = 'CANCELED'
             del self.building_jobs[build]
-        if item.item_behind:
+        for item_behind in item.items_behind:
             self.log.debug("Canceling jobs for change %s, behind change %s" %
-                           (item.item_behind.change, item.change))
-            if self.cancelJobs(item.item_behind, prime=prime):
+                           (item_behind.change, item.change))
+            if self.cancelJobs(item_behind, prime=prime):
                 canceled = True
         return canceled
 
-    def _processOneItem(self, item):
+    def _processOneItem(self, item, nnfi):
         changed = False
         item_ahead = item.item_ahead
-        item_behind = item.item_behind
-        if self.prepareRef(item):
-            changed = True
-            if self.pipeline.dequeue_on_conflict:
-                self.log.info("Dequeuing change %s because "
-                              "of a git merge error" % item.change)
-                self.dequeueItem(item, keep_severed_heads=False)
-                try:
-                    self.reportItem(item)
-                except MergeFailure:
-                    pass
-                return changed
+        change_queue = self.pipeline.getQueue(item.change.project)
+        failing_reasons = []  # Reasons this item is failing
+
         if self.checkForChangesNeededBy(item.change) is not True:
             # It's not okay to enqueue this change, we should remove it.
             self.log.info("Dequeuing change %s because "
                           "it can no longer merge" % item.change)
             self.cancelJobs(item)
-            self.dequeueItem(item, keep_severed_heads=False)
+            self.dequeueItem(item)
             self.pipeline.setDequeuedNeedingChange(item)
             try:
                 self.reportItem(item)
             except MergeFailure:
                 pass
-            changed = True
-            return changed
-        if not item_ahead:
-            merge_failed = False
-            if self.pipeline.areAllJobsComplete(item):
-                try:
-                    self.reportItem(item)
-                except MergeFailure:
-                    merge_failed = True
-                self.dequeueItem(item)
-                changed = True
-            if merge_failed or self.pipeline.didAnyJobFail(item):
-                if item_behind:
-                    self.cancelJobs(item_behind)
-                    changed = True
-                    self.dequeueItem(item)
+            return (True, nnfi)
+        dep_item = self.getFailingDependentItem(item)
+        if dep_item:
+            failing_reasons.append('a needed change is failing')
+            self.cancelJobs(item, prime=False)
         else:
-            if self.pipeline.didAnyJobFail(item):
-                if item_behind:
-                    if self.cancelJobs(item_behind, prime=False):
-                        changed = True
-                # don't restart yet; this change will eventually become
-                # the head
+            if (item_ahead and item_ahead != nnfi and
+                not item_ahead.change.is_merged):
+                # Our current base is different than what we expected,
+                # and it's not because our current base merged.  Something
+                # ahead must have failed.
+                self.log.info("Resetting builds for change %s because the "
+                              "item ahead, %s, is not the nearest non-failing "
+                              "item, %s" % (item.change, item_ahead, nnfi))
+                change_queue.moveItem(item, nnfi)
+                changed = True
+                self.cancelJobs(item)
+            self.prepareRef(item)
+            if item.current_build_set.unable_to_merge:
+                failing_reasons.append("merge conflict")
         if self.launchJobs(item):
             changed = True
-        return changed
+        if self.pipeline.didAnyJobFail(item):
+            failing_reasons.append("at least one job failed")
+        if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
+            try:
+                self.reportItem(item)
+            except MergeFailure:
+                failing_reasons.append("did not merge")
+                for item_behind in item.items_behind:
+                    self.log.info("Resetting builds for change %s because the "
+                                  "item ahead, %s, failed to merge" %
+                                  (item_behind.change, item))
+                    self.cancelJobs(item_behind)
+            self.dequeueItem(item)
+            changed = True
+        elif not failing_reasons:
+            nnfi = item
+        item.current_build_set.failing_reasons = failing_reasons
+        if failing_reasons:
+            self.log.debug("%s is a failing item because %s" %
+                           (item, failing_reasons))
+        return (changed, nnfi)
 
     def processQueue(self):
         # Do whatever needs to be done for each change in the queue
         self.log.debug("Starting queue processor: %s" % self.pipeline.name)
         changed = False
-        for item in self.pipeline.getAllItems():
-            if self._processOneItem(item):
+        for queue in self.pipeline.queues:
+            queue_changed = False
+            nnfi = None  # Nearest non-failing item
+            for item in queue.queue[:]:
+                item_changed, nnfi = self._processOneItem(item, nnfi)
+                if item_changed:
+                    queue_changed = True
+                self.reportStats(item)
+            if queue_changed:
                 changed = True
-            self.reportStats(item)
+                status = ''
+                for item in queue.queue:
+                    status += self.pipeline.formatStatus(item)
+                if status:
+                    self.log.debug("Queue %s status is now:\n %s" %
+                                   (queue.name, status))
         self.log.debug("Finished queue processor: %s (changed: %s)" %
                        (self.pipeline.name, changed))
         return changed
@@ -1079,8 +1080,8 @@
         del self.building_jobs[build]
 
         self.pipeline.setResult(change, build)
-        self.log.info("Change %s status is now:\n %s" %
-                      (change, self.pipeline.formatStatus(change)))
+        self.log.debug("Change %s status is now:\n %s" %
+                       (change, self.pipeline.formatStatus(change)))
         self.updateBuildDescriptions(build.build_set)
         while self.processQueue():
             pass
@@ -1444,3 +1445,15 @@
         self.log.debug("  Change %s is needed but can not be merged" %
                        change.needs_change)
         return False
+
+    def getFailingDependentItem(self, item):
+        if not hasattr(item.change, 'needs_change'):
+            return None
+        if not item.change.needs_change:
+            return None
+        needs_item = self.getItemForChange(item.change.needs_change)
+        if not needs_item:
+            return None
+        if needs_item.current_build_set.failing_reasons:
+            return needs_item
+        return None