Dequeue changes when new patchset created.

When a new patchset is created for a change that is in a pipeline,
cancel running builds and dequeue that change (and possibly
dependent changes that can no longer merge).

Make this optional (and document the option).

Fixes bug 1022643.

Change-Id: I8f591956cf86645443e4b6075b8cdfc95a939e4f
Reviewed-on: https://review.openstack.org/20948
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index f1578ac..9fdf04d 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -277,6 +277,13 @@
   containing 'retrigger' somewhere in the comment text are added to a
   change.
 
+**dequeue-on-new-patchset**
+  Normally, if a new patchset is uploaded to a change that is in a
+  pipeline, the existing entry in the pipeline will be removed (with
+  jobs canceled and any dependent changes that can no longer merge as
+  well.  To suppress this behavior (and allow jobs to continue
+  running), set this to ``false``.  Default: ``true``.
+
 **success**
   Describes what Zuul should do if all the jobs complete successfully.
   This section is optional; if it is omitted, Zuul will run jobs and
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 586e2a5..cab97b9 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -31,6 +31,7 @@
 
   - name: unused
     manager: IndependentPipelineManager
+    dequeue-on-new-patchset: false
     trigger:
       - event: comment-added
         approval: 
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 20ad4ec..aa7001e 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1332,7 +1332,7 @@
         self.assertEmptyQueues()
 
     def test_head_is_dequeued_once(self):
-        "Test that if a change at the head fails it is dequeud only once"
+        "Test that if a change at the head fails it is dequeued only once"
         # If it's dequeued more than once, we should see extra
         # aborted jobs.
         self.fake_jenkins.hold_jobs_in_build = True
@@ -1669,3 +1669,189 @@
         jobs = self.fake_jenkins.job_history
         assert len(jobs) == 0
         self.assertEmptyQueues()
+
+    def test_new_patchset_dequeues_old(self):
+        "Test that a new patchset causes the old to be dequeued"
+        # D -> C (depends on B) -> B (depends on A) -> A -> M
+        self.fake_jenkins.hold_jobs_in_build = True
+
+        M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
+        M.setMerged()
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+        D.addApproval('CRVW', 2)
+
+        C.setDependsOn(B, 1)
+        B.setDependsOn(A, 1)
+        A.setDependsOn(M, 1)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        B.addPatchset()
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+        self.waitUntilSettled()
+
+        self.fake_jenkins.hold_jobs_in_build = False
+        self.fake_jenkins.fakeRelease()
+        self.waitUntilSettled()
+
+        jobs = self.fake_jenkins.all_jobs
+        finished_jobs = self.fake_jenkins.job_history
+
+        for x in jobs:
+            print x
+        for x in finished_jobs:
+            print x
+
+        assert A.data['status'] == 'MERGED'
+        assert A.reported == 2
+        assert B.data['status'] == 'NEW'
+        assert B.reported == 2
+        assert C.data['status'] == 'NEW'
+        assert C.reported == 2
+        assert D.data['status'] == 'MERGED'
+        assert D.reported == 2
+        assert len(finished_jobs) == 9  # 3 each for A, B, D.
+        self.assertEmptyQueues()
+
+    def test_new_patchset_dequeues_old_on_head(self):
+        "Test that a new patchset causes the old to be dequeued (at head)"
+        # D -> C (depends on B) -> B (depends on A) -> A -> M
+        self.fake_jenkins.hold_jobs_in_build = True
+
+        M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
+        M.setMerged()
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+        D.addApproval('CRVW', 2)
+
+        C.setDependsOn(B, 1)
+        B.setDependsOn(A, 1)
+        A.setDependsOn(M, 1)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        A.addPatchset()
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
+        self.waitUntilSettled()
+
+        self.fake_jenkins.hold_jobs_in_build = False
+        self.fake_jenkins.fakeRelease()
+        self.waitUntilSettled()
+
+        jobs = self.fake_jenkins.all_jobs
+        finished_jobs = self.fake_jenkins.job_history
+
+        for x in jobs:
+            print x
+        for x in finished_jobs:
+            print x
+
+        assert A.data['status'] == 'NEW'
+        assert A.reported == 2
+        assert B.data['status'] == 'NEW'
+        assert B.reported == 2
+        assert C.data['status'] == 'NEW'
+        assert C.reported == 2
+        assert D.data['status'] == 'MERGED'
+        assert D.reported == 2
+        assert len(finished_jobs) == 7
+        self.assertEmptyQueues()
+
+    def test_new_patchset_dequeues_old_without_dependents(self):
+        "Test that a new patchset causes only the old to be dequeued"
+        self.fake_jenkins.hold_jobs_in_build = True
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        B.addPatchset()
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+        self.waitUntilSettled()
+
+        self.fake_jenkins.hold_jobs_in_build = False
+        self.fake_jenkins.fakeRelease()
+        self.waitUntilSettled()
+
+        jobs = self.fake_jenkins.all_jobs
+        finished_jobs = self.fake_jenkins.job_history
+
+        for x in jobs:
+            print x
+        for x in finished_jobs:
+            print x
+
+        assert A.data['status'] == 'MERGED'
+        assert A.reported == 2
+        assert B.data['status'] == 'NEW'
+        assert B.reported == 2
+        assert C.data['status'] == 'MERGED'
+        assert C.reported == 2
+        assert len(finished_jobs) == 9
+        self.assertEmptyQueues()
+
+    def test_new_patchset_dequeues_old_independent_queue(self):
+        "Test that a new patchset causes the old to be dequeued (independent)"
+        self.fake_jenkins.hold_jobs_in_build = True
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        B.addPatchset()
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+        self.waitUntilSettled()
+
+        self.fake_jenkins.hold_jobs_in_build = False
+        self.fake_jenkins.fakeRelease()
+        self.waitUntilSettled()
+
+        jobs = self.fake_jenkins.all_jobs
+        finished_jobs = self.fake_jenkins.job_history
+
+        for x in jobs:
+            print x
+        for x in finished_jobs:
+            print x
+
+        assert A.data['status'] == 'NEW'
+        assert A.reported == 1
+        assert B.data['status'] == 'NEW'
+        assert B.reported == 1
+        assert C.data['status'] == 'NEW'
+        assert C.reported == 1
+        assert len(finished_jobs) == 10
+        assert self.countJobResults(finished_jobs, 'ABORTED') == 1
+        self.assertEmptyQueues()
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 8d21773..e57cabb 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -47,6 +47,7 @@
                 'description': str,
                 'success-message': str,
                 'failure-message': str,
+                'dequeue-on-new-patchset': bool,
                 'trigger': toList(trigger),
                 'success': variable_dict,
                 'failure': variable_dict,
diff --git a/zuul/model.py b/zuul/model.py
index 9a60049..eb47007 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -30,6 +30,7 @@
         self.description = None
         self.failure_message = None
         self.success_message = None
+        self.dequeue_on_new_patchset = True
         self.job_trees = {}  # project -> JobTree
         self.manager = None
         self.queues = []
@@ -503,6 +504,9 @@
     def equals(self, other):
         raise NotImplementedError()
 
+    def isUpdateOf(self, other):
+        raise NotImplementedError()
+
     def filterJobs(self, jobs):
         return filter(lambda job: job.changeMatches(self), jobs)
 
@@ -547,6 +551,11 @@
             return True
         return False
 
+    def isUpdateOf(self, other):
+        if self.number == other.number and self.patchset > other.patchset:
+            return True
+        return False
+
     def setReportedResult(self, result):
         self.current_build_set.result = result
 
@@ -585,6 +594,9 @@
             return True
         return False
 
+    def isUpdateOf(self, other):
+        return False
+
 
 class TriggerEvent(object):
     def __init__(self):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index f8d607f..97937aa 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -100,6 +100,9 @@
                                                          "Build failed.")
             pipeline.success_message = conf_pipeline.get('success-message',
                                                          "Build succeeded.")
+            pipeline.dequeue_on_new_patchset = conf_pipeline.get(
+                'dequeue-on-new-patchset',
+                True)
             manager = globals()[conf_pipeline['manager']](self, pipeline)
             pipeline.setManager(manager)
 
@@ -401,10 +404,12 @@
             return
 
         for pipeline in self.pipelines.values():
+            change = event.getChange(project, self.trigger)
+            if event.type == 'patchset-created':
+                pipeline.manager.removeOldVersionsOfChange(change)
             if not pipeline.manager.eventMatches(event):
                 self.log.debug("Event %s ignored by %s" % (event, pipeline))
                 continue
-            change = event.getChange(project, self.trigger)
             self.log.info("Adding %s, %s to %s" %
                           (project, change, pipeline))
             pipeline.manager.addChange(change)
@@ -572,6 +577,22 @@
     def enqueueChangesBehind(self, change):
         return True
 
+    def findOldVersionOfChangeAlreadyInQueue(self, change):
+        for c in self.pipeline.getChangesInQueue():
+            if change.isUpdateOf(c):
+                return c
+        return None
+
+    def removeOldVersionsOfChange(self, change):
+        if not self.pipeline.dequeue_on_new_patchset:
+            return
+        old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
+        if old_change:
+            self.log.debug("Change %s is a new version of %s, removing %s" %
+                           (change, old_change, old_change))
+            self.removeChange(old_change)
+            self.launchJobs()
+
     def addChange(self, change):
         self.log.debug("Considering adding change %s" % change)
         if self.isChangeAlreadyInQueue(change):
@@ -606,6 +627,37 @@
             return False
         self.launchJobs()
 
+    def cancelJobs(self, change, prime=True):
+        self.log.debug("Cancel jobs for change %s" % change)
+        to_remove = []
+        for build, build_change in self.building_jobs.items():
+            if build_change == change:
+                self.log.debug("Found build %s for change %s to cancel" %
+                               (build, change))
+                try:
+                    self.sched.launcher.cancel(build)
+                except:
+                    self.log.exception("Exception while canceling build %s "
+                                       "for change %s" % (build, change))
+                to_remove.append(build)
+        for build in to_remove:
+            self.log.debug("Removing build %s from running builds" % build)
+            build.result = 'CANCELED'
+            del self.building_jobs[build]
+
+    def dequeueChange(self, change):
+        self.log.debug("Removing change %s from queue" % change)
+        change_queue = self.pipeline.getQueue(change.project)
+        change_queue.dequeueChange(change)
+
+    def removeChange(self, change):
+        # Remove a change from the queue, probably because it has been
+        # superceded by another change.
+        self.log.debug("Canceling builds behind change: %s because it is "
+                       "being removed." % change)
+        self.cancelJobs(change)
+        self.dequeueChange(change)
+
     def _launchJobs(self, change, jobs):
         self.log.debug("Launching jobs for change %s" % change)
         ref = change.current_build_set.ref
@@ -1126,6 +1178,15 @@
                            (change.change_behind, change))
             self.cancelJobs(change.change_behind, prime=prime)
 
+    def removeChange(self, change):
+        # Remove a change from the queue (even the middle), probably
+        # because it has been superceded by another change (or
+        # otherwise will not merge).
+        self.log.debug("Canceling builds behind change: %s because it is "
+                       "being removed." % change)
+        self.cancelJobs(change, prime=False)
+        self.dequeueChange(change, keep_severed_heads=False)
+
     def handleFailedChange(self, change):
         # A build failed. All changes behind this change will need to
         # be retested. To free up resources cancel the builds behind
@@ -1145,13 +1206,13 @@
                            "failure." % change)
             self.cancelJobs(change_behind, prime=False)
 
-    def dequeueChange(self, change):
+    def dequeueChange(self, change, keep_severed_heads=True):
         self.log.debug("Removing change %s from queue" % change)
         change_ahead = change.change_ahead
         change_behind = change.change_behind
         change_queue = self.pipeline.getQueue(change.project)
         change_queue.dequeueChange(change)
-        if not change_ahead and not change.reported:
+        if keep_severed_heads and not change_ahead and not change.reported:
             self.log.debug("Adding %s as a severed head" % change)
             change_queue.addSeveredHead(change)
         self.dequeueDependentChanges(change_behind)