Add rate limiting to dependent pipeline queues

When changes report failure reduce the total number of changes that will
be tested concurrently in the pipeline queue the reported change
belonged too. Increase the number of changes that will be tested when
changes report success. This implements simple rate limiting which
should reduce resource thrash when tests are unstable.

Change-Id: Id092446c83649b3916751c4e4665d2adc75d0458
diff --git a/tests/fixtures/layout-rate-limit.yaml b/tests/fixtures/layout-rate-limit.yaml
new file mode 100644
index 0000000..9f6748c
--- /dev/null
+++ b/tests/fixtures/layout-rate-limit.yaml
@@ -0,0 +1,32 @@
+pipelines:
+  - name: gate
+    manager: DependentPipelineManager
+    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - approved: 1
+    start:
+      gerrit:
+        verified: 0
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    window: 2
+    window-floor: 1
+    window-increase-type: linear
+    window-increase-factor: 1
+    window-decrease-type: exponential
+    window-decrease-factor: 2
+
+projects:
+  - name: org/project
+    gate:
+      - project-merge:
+        - project-test1
+        - project-test2
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 0b9e5a8..0ce0f88 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -3301,3 +3301,167 @@
         self.worker.hold_jobs_in_build = False
         self.worker.release()
         self.waitUntilSettled()
+
+    def test_queue_rate_limiting(self):
+        "Test that DependentPipelines are rate limited with dep across window"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-rate-limit.yaml')
+        self.sched.reconfigure(self.config)
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        C.setDependsOn(B, 1)
+        self.worker.addFailTest('project-test1', A)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        # Only A and B will have their merge jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+        self.assertEqual(self.builds[1].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only A and B will have their test jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        queue = self.sched.layout.pipelines['gate'].queues[0]
+        # A failed so window is reduced by 1 to 1.
+        self.assertEqual(queue.window, 1)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(A.data['status'], 'NEW')
+
+        # Gate is reset and only B's merge job is queued because
+        # window shrunk to 1.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only B's test jobs are queued because window is still 1.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # B was successfully merged so window is increased to 2.
+        self.assertEqual(queue.window, 2)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(B.data['status'], 'MERGED')
+
+        # Only C is left and its merge job is queued.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # After successful merge job the test jobs for C are queued.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # C successfully merged so window is bumped to 3.
+        self.assertEqual(queue.window, 3)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(C.data['status'], 'MERGED')
+
+    def test_queue_rate_limiting_dependent(self):
+        "Test that DependentPipelines are rate limited with dep in window"
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-rate-limit.yaml')
+        self.sched.reconfigure(self.config)
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+
+        B.setDependsOn(A, 1)
+
+        self.worker.addFailTest('project-test1', A)
+
+        A.addApproval('CRVW', 2)
+        B.addApproval('CRVW', 2)
+        C.addApproval('CRVW', 2)
+
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+        self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        # Only A and B will have their merge jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+        self.assertEqual(self.builds[1].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only A and B will have their test jobs queued because
+        # window is 2.
+        self.assertEqual(len(self.builds), 4)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+        self.assertEqual(self.builds[2].name, 'project-test1')
+        self.assertEqual(self.builds[3].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        queue = self.sched.layout.pipelines['gate'].queues[0]
+        # A failed so window is reduced by 1 to 1.
+        self.assertEqual(queue.window, 1)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(B.data['status'], 'NEW')
+
+        # Gate is reset and only C's merge job is queued because
+        # window shrunk to 1 and A and B were dequeued.
+        self.assertEqual(len(self.builds), 1)
+        self.assertEqual(self.builds[0].name, 'project-merge')
+
+        self.worker.release('.*-merge')
+        self.waitUntilSettled()
+
+        # Only C's test jobs are queued because window is still 1.
+        self.assertEqual(len(self.builds), 2)
+        self.assertEqual(self.builds[0].name, 'project-test1')
+        self.assertEqual(self.builds[1].name, 'project-test2')
+
+        self.worker.release('project-.*')
+        self.waitUntilSettled()
+
+        # C was successfully merged so window is increased to 2.
+        self.assertEqual(queue.window, 2)
+        self.assertEqual(queue.window_floor, 1)
+        self.assertEqual(C.data['status'], 'MERGED')