Dequeue changes when new patchset created.
When a new patchset is created for a change that is in a pipeline,
cancel running builds and dequeue that change (and possibly
dependent changes that can no longer merge).
Make this optional (and document the option).
Fixes bug 1022643.
Change-Id: I8f591956cf86645443e4b6075b8cdfc95a939e4f
Reviewed-on: https://review.openstack.org/20948
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index f1578ac..9fdf04d 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -277,6 +277,13 @@
containing 'retrigger' somewhere in the comment text are added to a
change.
+**dequeue-on-new-patchset**
+ Normally, if a new patchset is uploaded to a change that is in a
+ pipeline, the existing entry in the pipeline will be removed (with
+ jobs canceled and any dependent changes that can no longer merge as
+ well. To suppress this behavior (and allow jobs to continue
+ running), set this to ``false``. Default: ``true``.
+
**success**
Describes what Zuul should do if all the jobs complete successfully.
This section is optional; if it is omitted, Zuul will run jobs and
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 586e2a5..cab97b9 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -31,6 +31,7 @@
- name: unused
manager: IndependentPipelineManager
+ dequeue-on-new-patchset: false
trigger:
- event: comment-added
approval:
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 20ad4ec..aa7001e 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1332,7 +1332,7 @@
self.assertEmptyQueues()
def test_head_is_dequeued_once(self):
- "Test that if a change at the head fails it is dequeud only once"
+ "Test that if a change at the head fails it is dequeued only once"
# If it's dequeued more than once, we should see extra
# aborted jobs.
self.fake_jenkins.hold_jobs_in_build = True
@@ -1669,3 +1669,189 @@
jobs = self.fake_jenkins.job_history
assert len(jobs) == 0
self.assertEmptyQueues()
+
+ def test_new_patchset_dequeues_old(self):
+ "Test that a new patchset causes the old to be dequeued"
+ # D -> C (depends on B) -> B (depends on A) -> A -> M
+ self.fake_jenkins.hold_jobs_in_build = True
+
+ M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
+ M.setMerged()
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+ D.addApproval('CRVW', 2)
+
+ C.setDependsOn(B, 1)
+ B.setDependsOn(A, 1)
+ A.setDependsOn(M, 1)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ B.addPatchset()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ self.fake_jenkins.hold_jobs_in_build = False
+ self.fake_jenkins.fakeRelease()
+ self.waitUntilSettled()
+
+ jobs = self.fake_jenkins.all_jobs
+ finished_jobs = self.fake_jenkins.job_history
+
+ for x in jobs:
+ print x
+ for x in finished_jobs:
+ print x
+
+ assert A.data['status'] == 'MERGED'
+ assert A.reported == 2
+ assert B.data['status'] == 'NEW'
+ assert B.reported == 2
+ assert C.data['status'] == 'NEW'
+ assert C.reported == 2
+ assert D.data['status'] == 'MERGED'
+ assert D.reported == 2
+ assert len(finished_jobs) == 9 # 3 each for A, B, D.
+ self.assertEmptyQueues()
+
+ def test_new_patchset_dequeues_old_on_head(self):
+ "Test that a new patchset causes the old to be dequeued (at head)"
+ # D -> C (depends on B) -> B (depends on A) -> A -> M
+ self.fake_jenkins.hold_jobs_in_build = True
+
+ M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M')
+ M.setMerged()
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+ D.addApproval('CRVW', 2)
+
+ C.setDependsOn(B, 1)
+ B.setDependsOn(A, 1)
+ A.setDependsOn(M, 1)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ A.addPatchset()
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ self.fake_jenkins.hold_jobs_in_build = False
+ self.fake_jenkins.fakeRelease()
+ self.waitUntilSettled()
+
+ jobs = self.fake_jenkins.all_jobs
+ finished_jobs = self.fake_jenkins.job_history
+
+ for x in jobs:
+ print x
+ for x in finished_jobs:
+ print x
+
+ assert A.data['status'] == 'NEW'
+ assert A.reported == 2
+ assert B.data['status'] == 'NEW'
+ assert B.reported == 2
+ assert C.data['status'] == 'NEW'
+ assert C.reported == 2
+ assert D.data['status'] == 'MERGED'
+ assert D.reported == 2
+ assert len(finished_jobs) == 7
+ self.assertEmptyQueues()
+
+ def test_new_patchset_dequeues_old_without_dependents(self):
+ "Test that a new patchset causes only the old to be dequeued"
+ self.fake_jenkins.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ B.addPatchset()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ self.fake_jenkins.hold_jobs_in_build = False
+ self.fake_jenkins.fakeRelease()
+ self.waitUntilSettled()
+
+ jobs = self.fake_jenkins.all_jobs
+ finished_jobs = self.fake_jenkins.job_history
+
+ for x in jobs:
+ print x
+ for x in finished_jobs:
+ print x
+
+ assert A.data['status'] == 'MERGED'
+ assert A.reported == 2
+ assert B.data['status'] == 'NEW'
+ assert B.reported == 2
+ assert C.data['status'] == 'MERGED'
+ assert C.reported == 2
+ assert len(finished_jobs) == 9
+ self.assertEmptyQueues()
+
+ def test_new_patchset_dequeues_old_independent_queue(self):
+ "Test that a new patchset causes the old to be dequeued (independent)"
+ self.fake_jenkins.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ B.addPatchset()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ self.fake_jenkins.hold_jobs_in_build = False
+ self.fake_jenkins.fakeRelease()
+ self.waitUntilSettled()
+
+ jobs = self.fake_jenkins.all_jobs
+ finished_jobs = self.fake_jenkins.job_history
+
+ for x in jobs:
+ print x
+ for x in finished_jobs:
+ print x
+
+ assert A.data['status'] == 'NEW'
+ assert A.reported == 1
+ assert B.data['status'] == 'NEW'
+ assert B.reported == 1
+ assert C.data['status'] == 'NEW'
+ assert C.reported == 1
+ assert len(finished_jobs) == 10
+ assert self.countJobResults(finished_jobs, 'ABORTED') == 1
+ self.assertEmptyQueues()
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 8d21773..e57cabb 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -47,6 +47,7 @@
'description': str,
'success-message': str,
'failure-message': str,
+ 'dequeue-on-new-patchset': bool,
'trigger': toList(trigger),
'success': variable_dict,
'failure': variable_dict,
diff --git a/zuul/model.py b/zuul/model.py
index 9a60049..eb47007 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -30,6 +30,7 @@
self.description = None
self.failure_message = None
self.success_message = None
+ self.dequeue_on_new_patchset = True
self.job_trees = {} # project -> JobTree
self.manager = None
self.queues = []
@@ -503,6 +504,9 @@
def equals(self, other):
raise NotImplementedError()
+ def isUpdateOf(self, other):
+ raise NotImplementedError()
+
def filterJobs(self, jobs):
return filter(lambda job: job.changeMatches(self), jobs)
@@ -547,6 +551,11 @@
return True
return False
+ def isUpdateOf(self, other):
+ if self.number == other.number and self.patchset > other.patchset:
+ return True
+ return False
+
def setReportedResult(self, result):
self.current_build_set.result = result
@@ -585,6 +594,9 @@
return True
return False
+ def isUpdateOf(self, other):
+ return False
+
class TriggerEvent(object):
def __init__(self):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index f8d607f..97937aa 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -100,6 +100,9 @@
"Build failed.")
pipeline.success_message = conf_pipeline.get('success-message',
"Build succeeded.")
+ pipeline.dequeue_on_new_patchset = conf_pipeline.get(
+ 'dequeue-on-new-patchset',
+ True)
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
@@ -401,10 +404,12 @@
return
for pipeline in self.pipelines.values():
+ change = event.getChange(project, self.trigger)
+ if event.type == 'patchset-created':
+ pipeline.manager.removeOldVersionsOfChange(change)
if not pipeline.manager.eventMatches(event):
self.log.debug("Event %s ignored by %s" % (event, pipeline))
continue
- change = event.getChange(project, self.trigger)
self.log.info("Adding %s, %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change)
@@ -572,6 +577,22 @@
def enqueueChangesBehind(self, change):
return True
+ def findOldVersionOfChangeAlreadyInQueue(self, change):
+ for c in self.pipeline.getChangesInQueue():
+ if change.isUpdateOf(c):
+ return c
+ return None
+
+ def removeOldVersionsOfChange(self, change):
+ if not self.pipeline.dequeue_on_new_patchset:
+ return
+ old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
+ if old_change:
+ self.log.debug("Change %s is a new version of %s, removing %s" %
+ (change, old_change, old_change))
+ self.removeChange(old_change)
+ self.launchJobs()
+
def addChange(self, change):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
@@ -606,6 +627,37 @@
return False
self.launchJobs()
+ def cancelJobs(self, change, prime=True):
+ self.log.debug("Cancel jobs for change %s" % change)
+ to_remove = []
+ for build, build_change in self.building_jobs.items():
+ if build_change == change:
+ self.log.debug("Found build %s for change %s to cancel" %
+ (build, change))
+ try:
+ self.sched.launcher.cancel(build)
+ except:
+ self.log.exception("Exception while canceling build %s "
+ "for change %s" % (build, change))
+ to_remove.append(build)
+ for build in to_remove:
+ self.log.debug("Removing build %s from running builds" % build)
+ build.result = 'CANCELED'
+ del self.building_jobs[build]
+
+ def dequeueChange(self, change):
+ self.log.debug("Removing change %s from queue" % change)
+ change_queue = self.pipeline.getQueue(change.project)
+ change_queue.dequeueChange(change)
+
+ def removeChange(self, change):
+ # Remove a change from the queue, probably because it has been
+ # superceded by another change.
+ self.log.debug("Canceling builds behind change: %s because it is "
+ "being removed." % change)
+ self.cancelJobs(change)
+ self.dequeueChange(change)
+
def _launchJobs(self, change, jobs):
self.log.debug("Launching jobs for change %s" % change)
ref = change.current_build_set.ref
@@ -1126,6 +1178,15 @@
(change.change_behind, change))
self.cancelJobs(change.change_behind, prime=prime)
+ def removeChange(self, change):
+ # Remove a change from the queue (even the middle), probably
+ # because it has been superceded by another change (or
+ # otherwise will not merge).
+ self.log.debug("Canceling builds behind change: %s because it is "
+ "being removed." % change)
+ self.cancelJobs(change, prime=False)
+ self.dequeueChange(change, keep_severed_heads=False)
+
def handleFailedChange(self, change):
# A build failed. All changes behind this change will need to
# be retested. To free up resources cancel the builds behind
@@ -1145,13 +1206,13 @@
"failure." % change)
self.cancelJobs(change_behind, prime=False)
- def dequeueChange(self, change):
+ def dequeueChange(self, change, keep_severed_heads=True):
self.log.debug("Removing change %s from queue" % change)
change_ahead = change.change_ahead
change_behind = change.change_behind
change_queue = self.pipeline.getQueue(change.project)
change_queue.dequeueChange(change)
- if not change_ahead and not change.reported:
+ if keep_severed_heads and not change_ahead and not change.reported:
self.log.debug("Adding %s as a severed head" % change)
change_queue.addSeveredHead(change)
self.dequeueDependentChanges(change_behind)