Dequeue non-live items with errors

If a change depended on a change with a config error, the series
would be stuck in the check pipeline indefinitely because the
manager would wait for the live item to run jobs, however, it is
unable to run jobs with an invalid configuration.  To correct this,
dequeue non-live items with configuration errors.  This will cause
the live items behind them to be dequeued as well since their
dependency is no longer in the queue.

Also, detect and report configuration errors on pipelines.  This
wasn't originally implemented because pipeline reconfiguration is
not supported in dynamic reconfiguration, however, we perform a
dry-run reconfiguration that includes config repos before loading
the real dynamic config (without config repos).  This now includes
pipelines in the dry run.  That should be safe because the results
are immediately discarded, and we don't run the postConfig method
on the pipeline managers (which has driver interactions).

Change-Id: I4448f8e6e5664b62a6f135663d3716f640d8e716
diff --git a/tests/base.py b/tests/base.py
index c49e1ce..4214809 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -195,9 +195,16 @@
         if not large:
             for fn, content in files.items():
                 fn = os.path.join(path, fn)
-                with open(fn, 'w') as f:
-                    f.write(content)
-                repo.index.add([fn])
+                if content is None:
+                    os.unlink(fn)
+                    repo.index.remove([fn])
+                else:
+                    d = os.path.dirname(fn)
+                    if not os.path.exists(d):
+                        os.makedirs(d)
+                    with open(fn, 'w') as f:
+                        f.write(content)
+                    repo.index.add([fn])
         else:
             for fni in range(100):
                 fn = os.path.join(path, str(fni))
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 9d695aa..c681305 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -775,6 +775,76 @@
         # isn't this will raise an exception.
         tenant.layout.getJob('project-test2')
 
+    def test_pipeline_error(self):
+        with open(os.path.join(FIXTURE_DIR,
+                               'config/in-repo/git/',
+                               'common-config/zuul.yaml')) as f:
+            base_common_config = f.read()
+
+        in_repo_conf_A = textwrap.dedent(
+            """
+            - pipeline:
+                name: periodic
+                foo: error
+            """)
+
+        file_dict = {'zuul.yaml': None,
+                     'zuul.d/main.yaml': base_common_config,
+                     'zuul.d/test1.yaml': in_repo_conf_A}
+        A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+                                           files=file_dict)
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('syntax error',
+                      A.messages[0],
+                      "A should have an error reported")
+
+    def test_change_series_error(self):
+        with open(os.path.join(FIXTURE_DIR,
+                               'config/in-repo/git/',
+                               'common-config/zuul.yaml')) as f:
+            base_common_config = f.read()
+
+        in_repo_conf_A = textwrap.dedent(
+            """
+            - pipeline:
+                name: periodic
+                foo: error
+            """)
+
+        file_dict = {'zuul.yaml': None,
+                     'zuul.d/main.yaml': base_common_config,
+                     'zuul.d/test1.yaml': in_repo_conf_A}
+        A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+                                           files=file_dict)
+
+        in_repo_conf_B = textwrap.dedent(
+            """
+            - job:
+                name: project-test2
+                foo: error
+            """)
+
+        file_dict = {'zuul.yaml': None,
+                     'zuul.d/main.yaml': base_common_config,
+                     'zuul.d/test1.yaml': in_repo_conf_A,
+                     'zuul.d/test2.yaml': in_repo_conf_B}
+        B = self.fake_gerrit.addFakeChange('common-config', 'master', 'B',
+                                           files=file_dict)
+        B.setDependsOn(A, 1)
+        C = self.fake_gerrit.addFakeChange('common-config', 'master', 'C')
+        C.setDependsOn(B, 1)
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(C.reported, 1,
+                         "C should report failure")
+        self.assertIn('depends on a change that failed to merge',
+                      C.messages[0],
+                      "C should have an error reported")
+
 
 class TestAnsible(AnsibleZuulTestCase):
     # A temporary class to hold new tests while others are disabled
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 94c0d2a..3b09623 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -1471,6 +1471,8 @@
 
     @staticmethod
     def _parseLayout(base, tenant, data, scheduler, connections):
+        # Don't call this method from dynamic reconfiguration because
+        # it interacts with drivers and connections.
         layout = model.Layout(tenant)
 
         TenantParser._parseLayoutItems(layout, tenant, data,
@@ -1582,7 +1584,8 @@
                     config.extend(incdata)
 
     def createDynamicLayout(self, tenant, files,
-                            include_config_projects=False):
+                            include_config_projects=False,
+                            scheduler=None, connections=None):
         if include_config_projects:
             config = model.UnparsedTenantConfig()
             for project in tenant.config_projects:
@@ -1594,22 +1597,29 @@
             self._loadDynamicProjectData(config, project, files, False, tenant)
 
         layout = model.Layout(tenant)
-        # NOTE: the actual pipeline objects (complete with queues and
-        # enqueued items) are copied by reference here.  This allows
-        # our shadow dynamic configuration to continue to interact
-        # with all the other changes, each of which may have their own
-        # version of reality.  We do not support creating, updating,
-        # or deleting pipelines in dynamic layout changes.
-        layout.pipelines = tenant.layout.pipelines
+        if not include_config_projects:
+            # NOTE: the actual pipeline objects (complete with queues
+            # and enqueued items) are copied by reference here.  This
+            # allows our shadow dynamic configuration to continue to
+            # interact with all the other changes, each of which may
+            # have their own version of reality.  We do not support
+            # creating, updating, or deleting pipelines in dynamic
+            # layout changes.
+            layout.pipelines = tenant.layout.pipelines
 
-        # NOTE: the semaphore definitions are copied from the static layout
-        # here. For semaphores there should be no per patch max value but
-        # exactly one value at any time. So we do not support dynamic semaphore
-        # configuration changes.
-        layout.semaphores = tenant.layout.semaphores
+            # NOTE: the semaphore definitions are copied from the
+            # static layout here. For semaphores there should be no
+            # per patch max value but exactly one value at any
+            # time. So we do not support dynamic semaphore
+            # configuration changes.
+            layout.semaphores = tenant.layout.semaphores
+            skip_pipelines = skip_semaphores = True
+        else:
+            skip_pipelines = skip_semaphores = False
 
-        TenantParser._parseLayoutItems(layout, tenant, config, None, None,
-                                       skip_pipelines=True,
-                                       skip_semaphores=True)
+        TenantParser._parseLayoutItems(layout, tenant, config,
+                                       scheduler, connections,
+                                       skip_pipelines=skip_pipelines,
+                                       skip_semaphores=skip_semaphores)
 
         return layout
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 8282f86..efd86eb 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -444,7 +444,9 @@
             loader.createDynamicLayout(
                 item.pipeline.layout.tenant,
                 build_set.files,
-                include_config_projects=True)
+                include_config_projects=True,
+                scheduler=self.sched,
+                connections=self.sched.connections)
 
             # Then create the config a second time but without changes
             # to config repos so that we actually use this config.
@@ -540,6 +542,7 @@
     def _processOneItem(self, item, nnfi):
         changed = False
         ready = False
+        dequeued = False
         failing_reasons = []  # Reasons this item is failing
 
         item_ahead = item.item_ahead
@@ -594,8 +597,14 @@
                     item.reported_start = True
                 if item.current_build_set.unable_to_merge:
                     failing_reasons.append("it has a merge conflict")
+                    if (not item.live) and (not dequeued):
+                        self.dequeueItem(item)
+                        changed = dequeued = True
                 if item.current_build_set.config_error:
                     failing_reasons.append("it has an invalid configuration")
+                    if (not item.live) and (not dequeued):
+                        self.dequeueItem(item)
+                        changed = dequeued = True
                 if ready and self.provisionNodes(item):
                     changed = True
         if ready and self.executeJobs(item):
@@ -603,10 +612,10 @@
 
         if item.didAnyJobFail():
             failing_reasons.append("at least one job failed")
-        if (not item.live) and (not item.items_behind):
+        if (not item.live) and (not item.items_behind) and (not dequeued):
             failing_reasons.append("is a non-live item with no items behind")
             self.dequeueItem(item)
-            changed = True
+            changed = dequeued = True
         if ((not item_ahead) and item.areAllJobsComplete() and item.live):
             try:
                 self.reportItem(item)
@@ -618,7 +627,7 @@
                                   (item_behind.change, item))
                     self.cancelJobs(item_behind)
             self.dequeueItem(item)
-            changed = True
+            changed = dequeued = True
         elif not failing_reasons and item.live:
             nnfi = item
         item.current_build_set.failing_reasons = failing_reasons