Check liveness of changes before removing
The checks to remove old versions or abandoned changes have been
updated to operate on items and consider whether those items are
live.
Change-Id: Ife3c4833a7d6a8f29014eabb33e50f82918051f4
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index fb90734..4ebc0da 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1497,35 +1497,178 @@
self.assertEqual(D.reported, 2)
self.assertEqual(len(self.history), 9) # 3 each for A, B, D.
- def test_abandoned_change_dequeues(self):
- "Test that an abandoned change is dequeued"
+ def test_new_patchset_check(self):
+ "Test a new patchset in check"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
+
+ # A live item, and a non-live/live pair
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertEqual(items[0].change.patchset, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertEqual(items[1].change.patchset, '1')
+ self.assertTrue(items[1].live)
+
+ self.assertEqual(items[2].change.number, '1')
+ self.assertEqual(items[2].change.patchset, '1')
+ self.assertTrue(items[2].live)
+
+ # Add a new patchset to A
+ A.addPatchset()
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ # The live copy of A,1 should be gone, but the non-live and B
+ # should continue, and we should have a new A,2
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertEqual(items[0].change.patchset, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertEqual(items[1].change.patchset, '1')
+ self.assertTrue(items[1].live)
+
+ self.assertEqual(items[2].change.number, '1')
+ self.assertEqual(items[2].change.patchset, '2')
+ self.assertTrue(items[2].live)
+
+ # Add a new patchset to B
+ B.addPatchset()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ # The live copy of B,1 should be gone, and it's non-live copy of A,1
+ # but we should have a new B,2 (still based on A,1)
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertEqual(items[0].change.patchset, '2')
+ self.assertTrue(items[0].live)
+
+ self.assertEqual(items[1].change.number, '1')
+ self.assertEqual(items[1].change.patchset, '1')
+ self.assertFalse(items[1].live)
+
+ self.assertEqual(items[2].change.number, '2')
+ self.assertEqual(items[2].change.patchset, '2')
+ self.assertTrue(items[2].live)
+
+ self.builds[0].release()
+ self.waitUntilSettled()
+ self.builds[0].release()
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertEqual(self.history[0].result, 'ABORTED')
+ self.assertEqual(self.history[0].changes, '1,1')
+ self.assertEqual(self.history[1].result, 'ABORTED')
+ self.assertEqual(self.history[1].changes, '1,1 2,1')
+ self.assertEqual(self.history[2].result, 'SUCCESS')
+ self.assertEqual(self.history[2].changes, '1,2')
+ self.assertEqual(self.history[3].result, 'SUCCESS')
+ self.assertEqual(self.history[3].changes, '1,1 2,2')
+
+ def test_abandoned_gate(self):
+ "Test that an abandoned change is dequeued from gate"
+
+ self.worker.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('CRVW', 2)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
self.assertEqual(len(self.builds), 1, "One job being built (on hold)")
self.assertEqual(self.builds[0].name, 'project-merge')
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
- # For debugging purposes...
- # for pipeline in self.sched.layout.pipelines.values():
- # for queue in pipeline.queues:
- # self.log.info("pipepline %s queue %s contents %s" % (
- # pipeline.name, queue.name, queue.queue))
-
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0, "No job running")
- self.assertEmptyQueues()
self.assertEqual(len(self.history), 1, "Only one build in history")
self.assertEqual(self.history[0].result, 'ABORTED',
+ "Build should have been aborted")
+ self.assertEqual(A.reported, 1,
+ "Abandoned gate change should report only start")
+
+ def test_abandoned_check(self):
+ "Test that an abandoned change is dequeued from check"
+
+ self.worker.hold_jobs_in_build = True
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ # A live item, and a non-live/live pair
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 3)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertTrue(items[1].live)
+
+ self.assertEqual(items[2].change.number, '1')
+ self.assertTrue(items[2].live)
+
+ # Abandon A
+ self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
+ self.waitUntilSettled()
+
+ # The live copy of A should be gone, but the non-live and B
+ # should continue
+ items = check_pipeline.getAllItems()
+ self.assertEqual(len(items), 2)
+
+ self.assertEqual(items[0].change.number, '1')
+ self.assertFalse(items[0].live)
+
+ self.assertEqual(items[1].change.number, '2')
+ self.assertTrue(items[1].live)
+
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.history), 4)
+ self.assertEqual(self.history[0].result, 'ABORTED',
'Build should have been aborted')
self.assertEqual(A.reported, 0, "Abandoned change should not report")
+ self.assertEqual(B.reported, 1, "Change should report")
def test_zuul_url_return(self):
"Test if ZUUL_URL is returning when zuul_url is set in zuul.conf"
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 0bc171e..4f3efde 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1109,23 +1109,29 @@
return None
def findOldVersionOfChangeAlreadyInQueue(self, change):
- for c in self.pipeline.getChangesInQueue():
- if change.isUpdateOf(c):
- return c
+ for item in self.pipeline.getAllItems():
+ if not item.live:
+ continue
+ if change.isUpdateOf(item.change):
+ return item
return None
def removeOldVersionsOfChange(self, change):
if not self.pipeline.dequeue_on_new_patchset:
return
- old_change = self.findOldVersionOfChangeAlreadyInQueue(change)
- if old_change:
+ old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
+ if old_item:
self.log.debug("Change %s is a new version of %s, removing %s" %
- (change, old_change, old_change))
- self.removeChange(old_change)
+ (change, old_item.change, old_item))
+ self.removeItem(old_item)
def removeAbandonedChange(self, change):
self.log.debug("Change %s abandoned, removing." % change)
- self.removeChange(change)
+ for item in self.pipeline.getAllItems():
+ if not item.live:
+ continue
+ if item.change.equals(change):
+ self.removeItem(item)
def reEnqueueItem(self, item, last_head):
if last_head.queue:
@@ -1205,16 +1211,14 @@
self.log.debug("Removing change %s from queue" % item.change)
item.queue.dequeueItem(item)
- def removeChange(self, change):
- # Remove a change from the queue, probably because it has been
+ def removeItem(self, item):
+ # Remove an item from the queue, probably because it has been
# superseded by another change.
- for item in self.pipeline.getAllItems():
- if item.change == change:
- self.log.debug("Canceling builds behind change: %s "
- "because it is being removed." % item.change)
- self.cancelJobs(item)
- self.dequeueItem(item)
- self.reportStats(item)
+ self.log.debug("Canceling builds behind change: %s "
+ "because it is being removed." % item.change)
+ self.cancelJobs(item)
+ self.dequeueItem(item)
+ self.reportStats(item)
def _makeMergerItem(self, item):
# Create a dictionary with all info about the item needed by