Use NNFI scheduler algorithm
Update the scheduler algorithm to NNFI -- Nearest Non-Failing Item.
A stateless description of the algorithm is that jobs for every
item should always be run based on the repository state(s) set by
the nearest non-failing item ahead of it in its change queue.
This means that should an item fail (for any reason -- failure to
merge, a merge conflict, or job failure) changes after it will
have their builds canceled and restarted with the assumption that
the failed change will not merge but the nearest non-failing
change ahead will merge.
This should mean that dependent queues will always be running
jobs and no longer need to wait for a failing change to merge or
not merge before restarting jobs.
This removes the dequeue-on-conflict behavior because there is
now no cost to keeping an item that can not merge in the queue.
The documentation and associated test for this are removed.
This also removes the concept of severed heads because a failing
change at the head will not prevent other changes from proceeding
with their tests. If the jobs for the change at the head run
longer than following changes, it could still impact them while
it completes, but the reduction in code complexity is worth this
minor de-optimization.
The debugging representation of QueueItem is changed to make it
more useful.
Change-Id: I0d2d416fb0dd88647490ec06ed69deae71d39374
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 70b68c5..395ff25 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1068,9 +1068,6 @@
print 'pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue)
self.assertEqual(len(queue.queue), 0)
- if len(queue.severed_heads) != 0:
- print 'heads', queue.severed_heads
- self.assertEqual(len(queue.severed_heads), 0)
def assertReportedStat(self, key, value=None, kind=None):
start = time.time()
@@ -1403,11 +1400,20 @@
self.release(self.builds[2])
self.waitUntilSettled()
- # project-test1 and project-test2 for A, project-test2 for B
- self.assertEqual(len(self.builds), 3)
+ # project-test1 and project-test2 for A
+ # project-test2 for B
+ # project-merge for C (without B)
+ self.assertEqual(len(self.builds), 4)
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
- # check that build status of aborted jobs are masked ('CANCELED')
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ # project-test1 and project-test2 for A
+ # project-test2 for B
+ # project-test1 and project-test2 for C
+ self.assertEqual(len(self.builds), 5)
+
items = self.sched.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
@@ -1418,7 +1424,7 @@
self.assertEqual(self.countJobResults(builds, None), 1)
builds = items[2].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
- self.assertEqual(self.countJobResults(builds, 'CANCELED'), 2)
+ self.assertEqual(self.countJobResults(builds, None), 2)
self.worker.hold_jobs_in_build = False
self.worker.release()
@@ -1667,8 +1673,9 @@
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.waitUntilSettled()
- queue = self.gearman_server.getQueue()
- self.getParameter(queue[-1], 'ZUUL_REF')
+
+ self.assertEqual(len(self.history), 2) # A and C merge jobs
+
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
@@ -1679,32 +1686,7 @@
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
-
- def test_dequeue_conflict(self):
- "Test that the option to dequeue merge conflicts works"
-
- self.gearman_server.hold_jobs_in_queue = True
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addPatchset(['conflict'])
- B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
- B.addPatchset(['conflict'])
- A.addApproval('CRVW', 2)
- B.addApproval('CRVW', 2)
- self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
- self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
- self.waitUntilSettled()
-
- self.assertEqual(A.reported, 1)
- self.assertEqual(B.reported, 2)
-
- self.gearman_server.hold_jobs_in_queue = False
- self.gearman_server.release()
- self.waitUntilSettled()
-
- self.assertEqual(A.data['status'], 'MERGED')
- self.assertEqual(B.data['status'], 'NEW')
- self.assertEqual(A.reported, 2)
- self.assertEqual(B.reported, 2)
+ self.assertEqual(len(self.history), 6)
def test_post(self):
"Test that post jobs run"
@@ -1888,6 +1870,66 @@
self.assertEqual(C.reported, 2)
self.assertEqual(len(self.history), 1)
+ def test_failing_dependent_changes(self):
+ "Test that failing dependent patches are taken out of stream"
+ self.worker.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+ D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+ E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+ C.addApproval('CRVW', 2)
+ D.addApproval('CRVW', 2)
+ E.addApproval('CRVW', 2)
+
+ # E, D -> C -> B, A
+
+ D.setDependsOn(C, 1)
+ C.setDependsOn(B, 1)
+
+ self.worker.addFailTest('project-test1', B)
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.fake_gerrit.addEvent(E.addApproval('APRV', 1))
+
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+
+ self.worker.hold_jobs_in_build = False
+ for build in self.builds:
+ if build.parameters['ZUUL_CHANGE'] != '1':
+ build.release()
+ self.waitUntilSettled()
+
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(B.reported, 2)
+ self.assertEqual(C.data['status'], 'NEW')
+ self.assertEqual(C.reported, 2)
+ self.assertEqual(D.data['status'], 'NEW')
+ self.assertEqual(D.reported, 2)
+ self.assertEqual(E.data['status'], 'MERGED')
+ self.assertEqual(E.reported, 2)
+ self.assertEqual(len(self.history), 18)
+
def test_head_is_dequeued_once(self):
"Test that if a change at the head fails it is dequeued only once"
# If it's dequeued more than once, we should see extra