Merge "Add support for disabling bad pipelines"
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index bcdfabb..a3b3b20 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -607,6 +607,18 @@
   do when a change is added to the pipeline manager.  This can be used,
   for example, to reset the value of the Verified review category.
 
+**disabled**
+  Uses the same syntax as **success**, but describes what Zuul should
+  do when a pipeline is disabled.
+  See ``disable-after-consecutive-failures``.
+
+**disable-after-consecutive-failures**
+  If set, a pipeline can enter a ''disabled'' state if too many changes
+  in a row fail. When this value is exceeded the pipeline will stop
+  reporting to any of the ``success``, ``failure`` or ``merge-failure``
+  reporters and instead only report to the ``disabled`` reporters.
+  (No ``start`` reports are made when a pipeline is disabled).
+
 **precedence**
   Indicates how the build scheduler should prioritize jobs for
   different pipelines.  Each pipeline may have one precedence, jobs
diff --git a/tests/fixtures/layout-disable-at.yaml b/tests/fixtures/layout-disable-at.yaml
new file mode 100644
index 0000000..a2b2526
--- /dev/null
+++ b/tests/fixtures/layout-disable-at.yaml
@@ -0,0 +1,21 @@
+pipelines:
+  - name: check
+    manager: IndependentPipelineManager
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+    disabled:
+      smtp:
+        to: you@example.com
+    disable-after-consecutive-failures: 3
+
+projects:
+  - name: org/project
+    check:
+      - project-test1
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 6276569..af3e488 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -4014,3 +4014,125 @@
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
         self.waitUntilSettled()
         self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2')
+
+    def test_disable_at(self):
+        "Test a pipeline will only report to the disabled trigger when failing"
+
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-disable-at.yaml')
+        self.sched.reconfigure(self.config)
+
+        self.assertEqual(3, self.sched.layout.pipelines['check'].disable_at)
+        self.assertEqual(
+            0, self.sched.layout.pipelines['check']._consecutive_failures)
+        self.assertFalse(self.sched.layout.pipelines['check']._disabled)
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
+        E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
+        F = self.fake_gerrit.addFakeChange('org/project', 'master', 'F')
+        G = self.fake_gerrit.addFakeChange('org/project', 'master', 'G')
+        H = self.fake_gerrit.addFakeChange('org/project', 'master', 'H')
+        I = self.fake_gerrit.addFakeChange('org/project', 'master', 'I')
+        J = self.fake_gerrit.addFakeChange('org/project', 'master', 'J')
+        K = self.fake_gerrit.addFakeChange('org/project', 'master', 'K')
+
+        self.worker.addFailTest('project-test1', A)
+        self.worker.addFailTest('project-test1', B)
+        # Let C pass, resetting the counter
+        self.worker.addFailTest('project-test1', D)
+        self.worker.addFailTest('project-test1', E)
+        self.worker.addFailTest('project-test1', F)
+        self.worker.addFailTest('project-test1', G)
+        self.worker.addFailTest('project-test1', H)
+        # I also passes but should only report to the disabled reporters
+        self.worker.addFailTest('project-test1', J)
+        self.worker.addFailTest('project-test1', K)
+
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(
+            2, self.sched.layout.pipelines['check']._consecutive_failures)
+        self.assertFalse(self.sched.layout.pipelines['check']._disabled)
+
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(
+            0, self.sched.layout.pipelines['check']._consecutive_failures)
+        self.assertFalse(self.sched.layout.pipelines['check']._disabled)
+
+        self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(E.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(F.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        # We should be disabled now
+        self.assertEqual(
+            3, self.sched.layout.pipelines['check']._consecutive_failures)
+        self.assertTrue(self.sched.layout.pipelines['check']._disabled)
+
+        # We need to wait between each of these patches to make sure the
+        # smtp messages come back in an expected order
+        self.fake_gerrit.addEvent(G.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.fake_gerrit.addEvent(H.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.fake_gerrit.addEvent(I.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        # The first 6 (ABCDEF) jobs should have reported back to gerrt thus
+        # leaving a message on each change
+        self.assertEqual(1, len(A.messages))
+        self.assertIn('Build failed.', A.messages[0])
+        self.assertEqual(1, len(B.messages))
+        self.assertIn('Build failed.', B.messages[0])
+        self.assertEqual(1, len(C.messages))
+        self.assertIn('Build succeeded.', C.messages[0])
+        self.assertEqual(1, len(D.messages))
+        self.assertIn('Build failed.', D.messages[0])
+        self.assertEqual(1, len(E.messages))
+        self.assertIn('Build failed.', E.messages[0])
+        self.assertEqual(1, len(F.messages))
+        self.assertIn('Build failed.', F.messages[0])
+
+        # The last 3 (GHI) would have only reported via smtp.
+        self.assertEqual(3, len(self.smtp_messages))
+        self.assertEqual(0, len(G.messages))
+        self.assertIn('Build failed.', self.smtp_messages[0]['body'])
+        self.assertIn('/7/1/check', self.smtp_messages[0]['body'])
+        self.assertEqual(0, len(H.messages))
+        self.assertIn('Build failed.', self.smtp_messages[1]['body'])
+        self.assertIn('/8/1/check', self.smtp_messages[1]['body'])
+        self.assertEqual(0, len(I.messages))
+        self.assertIn('Build succeeded.', self.smtp_messages[2]['body'])
+        self.assertIn('/9/1/check', self.smtp_messages[2]['body'])
+
+        # Now reload the configuration (simulate a HUP) to check the pipeline
+        # comes out of disabled
+        self.sched.reconfigure(self.config)
+
+        self.assertEqual(3, self.sched.layout.pipelines['check'].disable_at)
+        self.assertEqual(
+            0, self.sched.layout.pipelines['check']._consecutive_failures)
+        self.assertFalse(self.sched.layout.pipelines['check']._disabled)
+
+        self.fake_gerrit.addEvent(J.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(K.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(
+            2, self.sched.layout.pipelines['check']._consecutive_failures)
+        self.assertFalse(self.sched.layout.pipelines['check']._disabled)
+
+        # J and K went back to gerrit
+        self.assertEqual(1, len(J.messages))
+        self.assertIn('Build failed.', J.messages[0])
+        self.assertEqual(1, len(K.messages))
+        self.assertIn('Build failed.', K.messages[0])
+        # No more messages reported via smtp
+        self.assertEqual(3, len(self.smtp_messages))
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 88d10e2..78d5208 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -112,6 +112,9 @@
                 'failure': report_actions,
                 'merge-failure': report_actions,
                 'start': report_actions,
+                'disabled': report_actions,
+                'disable-after-consecutive-failures':
+                    v.All(int, v.Range(min=1)),
                 'window': window,
                 'window-floor': window_floor,
                 'window-increase-type': window_type,
diff --git a/zuul/model.py b/zuul/model.py
index 38628f9..0c12f88 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -82,6 +82,10 @@
         self.start_actions = None
         self.success_actions = None
         self.failure_actions = None
+        self.disabled_actions = None
+        self.disable_at = None
+        self._consecutive_failures = 0
+        self._disabled = False
         self.window = None
         self.window_floor = None
         self.window_increase_type = None
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2340800..333117c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -286,7 +286,8 @@
                 'ignore-dependencies', False)
 
             action_reporters = {}
-            for action in ['start', 'success', 'failure', 'merge-failure']:
+            for action in ['start', 'success', 'failure', 'merge-failure',
+                           'disabled']:
                 action_reporters[action] = []
                 if conf_pipeline.get(action):
                     for reporter_name, params \
@@ -300,12 +301,16 @@
             pipeline.start_actions = action_reporters['start']
             pipeline.success_actions = action_reporters['success']
             pipeline.failure_actions = action_reporters['failure']
+            pipeline.disabled_actions = action_reporters['disabled']
             if len(action_reporters['merge-failure']) > 0:
                 pipeline.merge_failure_actions = \
                     action_reporters['merge-failure']
             else:
                 pipeline.merge_failure_actions = action_reporters['failure']
 
+            pipeline.disable_at = conf_pipeline.get(
+                'disable-after-consecutive-failures', None)
+
             pipeline.window = conf_pipeline.get('window', 20)
             pipeline.window_floor = conf_pipeline.get('window-floor', 3)
             pipeline.window_increase_type = conf_pipeline.get(
@@ -1070,6 +1075,8 @@
         self.log.info("    %s" % self.pipeline.failure_actions)
         self.log.info("  On merge-failure:")
         self.log.info("    %s" % self.pipeline.merge_failure_actions)
+        self.log.info("  When disabled:")
+        self.log.info("    %s" % self.pipeline.disabled_actions)
 
     def getSubmitAllowNeeds(self):
         # Get a list of code review labels that are allowed to be
@@ -1111,19 +1118,20 @@
         return False
 
     def reportStart(self, change):
-        try:
-            self.log.info("Reporting start, action %s change %s" %
-                          (self.pipeline.start_actions, change))
-            msg = "Starting %s jobs." % self.pipeline.name
-            if self.sched.config.has_option('zuul', 'status_url'):
-                msg += "\n" + self.sched.config.get('zuul', 'status_url')
-            ret = self.sendReport(self.pipeline.start_actions,
-                                  change, msg)
-            if ret:
-                self.log.error("Reporting change start %s received: %s" %
-                               (change, ret))
-        except:
-            self.log.exception("Exception while reporting start:")
+        if not self.pipeline._disabled:
+            try:
+                self.log.info("Reporting start, action %s change %s" %
+                              (self.pipeline.start_actions, change))
+                msg = "Starting %s jobs." % self.pipeline.name
+                if self.sched.config.has_option('zuul', 'status_url'):
+                    msg += "\n" + self.sched.config.get('zuul', 'status_url')
+                ret = self.sendReport(self.pipeline.start_actions,
+                                      change, msg)
+                if ret:
+                    self.log.error("Reporting change start %s received: %s" %
+                                   (change, ret))
+            except:
+                self.log.exception("Exception while reporting start:")
 
     def sendReport(self, action_reporters, change, message):
         """Sends the built message off to configured reporters.
@@ -1569,12 +1577,22 @@
             self.log.debug("success %s" % (self.pipeline.success_actions))
             actions = self.pipeline.success_actions
             item.setReportedResult('SUCCESS')
+            self.pipeline._consecutive_failures = 0
         elif not self.pipeline.didMergerSucceed(item):
             actions = self.pipeline.merge_failure_actions
             item.setReportedResult('MERGER_FAILURE')
         else:
             actions = self.pipeline.failure_actions
             item.setReportedResult('FAILURE')
+            self.pipeline._consecutive_failures += 1
+        if self.pipeline._disabled:
+            actions = self.pipeline.disabled_actions
+        # Check here if we should disable so that we only use the disabled
+        # reporters /after/ the last disable_at failure is still reported as
+        # normal.
+        if (self.pipeline.disable_at and not self.pipeline._disabled and
+            self.pipeline._consecutive_failures >= self.pipeline.disable_at):
+            self.pipeline._disabled = True
         if actions:
             report = self.formatReport(item)
             try: