Fix independent pipeline CRD
If two git-dependent changes in the same project are added to an
independent pipeline, the first should be enqueued as a single
live item in its own change queue, and the second should be
enqueued as a live item behind a non-live copy of the first with
those two in their own change queue. However, the current
behavior is that they are enqueued as two live items each in
their own change queue.
Once the first is removed, the queue processor would determine
that a dependency of the second change had failed to merge, and
therefore the second failed tests (as if it were in a dependent
pipeline). This is because the queue processor looked for the
status of dependent changes throughout the pipeline, so any
failed dependent change would cause it to fail a change.
The queue processor now only looks ahead in a given item's change
queue (rather than the entire pipeline) to determine if a needed
change is missing.
Change-Id: Ieaa0cdbff59e6b77a11c82876f4fd5cb01fe950b
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 9901369..fb90734 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -3137,6 +3137,79 @@
self.assertEqual(self.history[0].changes, '2,1 1,1')
self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+ def test_crd_check_git_depends(self):
+ "Test single-repo dependencies in independent pipelines"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+
+ # Add two git-dependent changes and make sure they both report
+ # success.
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+
+ self.assertEqual(self.history[0].changes, '1,1')
+ self.assertEqual(self.history[-1].changes, '1,1 2,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ self.assertIn('Build succeeded', A.messages[0])
+ self.assertIn('Build succeeded', B.messages[0])
+
+ def test_crd_check_duplicate(self):
+ "Test duplicate check in independent pipelines"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes...
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 2)
+
+ # ...make sure the live one is not duplicated...
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 2)
+
+ # ...but the non-live one is able to be.
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 3)
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release('.*-merge')
+ self.waitUntilSettled()
+ self.gearman_server.release('.*-merge')
+ self.waitUntilSettled()
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+
+ self.assertEqual(self.history[0].changes, '1,1 2,1')
+ self.assertEqual(self.history[1].changes, '1,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ self.assertIn('Build succeeded', A.messages[0])
+ self.assertIn('Build succeeded', B.messages[0])
+
def test_crd_check_reconfiguration(self):
"Test cross-repo dependencies re-enqueued in independent pipelines"
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 49e6698..0bc171e 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Antoine "hashar" Musso
# Copyright 2013 Wikimedia Foundation Inc.
@@ -1029,9 +1029,17 @@
return True
return False
- def isChangeAlreadyInQueue(self, change):
- for c in self.pipeline.getChangesInQueue():
- if change.equals(c):
+ def isChangeAlreadyInPipeline(self, change):
+ # Checks live items in the pipeline
+ for item in self.pipeline.getAllItems():
+ if item.live and change.equals(item.change):
+ return True
+ return False
+
+ def isChangeAlreadyInQueue(self, change, change_queue):
+ # Checks any item in the specified change queue
+ for item in change_queue.queue:
+ if change.equals(item.change):
return True
return False
@@ -1077,7 +1085,7 @@
change_queue):
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
return True
def getFailingDependentItems(self, item):
@@ -1139,8 +1147,13 @@
ignore_requirements=False, live=True,
change_queue=None):
self.log.debug("Considering adding change %s" % change)
- if self.isChangeAlreadyInQueue(change):
- self.log.debug("Change %s is already in queue, ignoring" % change)
+
+ # If we are adding a live change, check if it's a live item
+ # anywhere in the pipeline. Otherwise, we will perform the
+ # duplicate check below on the specific change_queue.
+ if live and self.isChangeAlreadyInPipeline(change):
+ self.log.debug("Change %s is already in pipeline, "
+ "ignoring" % change)
return True
if not self.isChangeReadyToBeEnqueued(change):
@@ -1168,7 +1181,7 @@
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
- if self.isChangeAlreadyInQueue(change):
+ if self.isChangeAlreadyInQueue(change, change_queue):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
@@ -1306,7 +1319,7 @@
change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
- if self.checkForChangesNeededBy(item.change) is not True:
+ if self.checkForChangesNeededBy(item.change, change_queue) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
@@ -1719,7 +1732,7 @@
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
- ret = self.checkForChangesNeededBy(change)
+ ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
@@ -1737,7 +1750,7 @@
return False
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
@@ -1754,7 +1767,7 @@
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
- if self.isChangeAlreadyInQueue(needed_change):
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead in the queue")
continue
self.log.debug(" Change %s is needed" % needed_change)
@@ -1875,7 +1888,7 @@
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
- ret = self.checkForChangesNeededBy(change)
+ ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
@@ -1888,7 +1901,7 @@
return False
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
@@ -1916,7 +1929,7 @@
if not needed_change.is_current_patchset:
self.log.debug(" Needed change is not the current patchset")
return False
- if self.isChangeAlreadyInQueue(needed_change):
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead in the queue")
continue
if self.pipeline.source.canMerge(needed_change,