Perform pre-launch merge checks

Move merge scheduling to its own function in the pipeline manager,
and call it in the main _processOneItem loop once the item has
entered the active window, in addition to the previous location in
the executor when getting the layout information.

Since we are now scheduling merges for all items in any pipeline,
make sure we properly handle both Ref and Change objects.

Also, if the executor encounters a merger failure, immediately report
that result.

Change-Id: I1c9db6993994bf8e841ecd8554c37a3ec0afc798
Co-Authored-By: Adam Gandelman <adamg@ubuntu.com>
Story: 2000773
Task: 3468
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 32f0cbb..cb6341a 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -440,11 +440,15 @@
         # the merger.
         number = None
         patchset = None
+        refspec = None
+        branch = None
         oldrev = None
         newrev = None
         if hasattr(item.change, 'number'):
             number = item.change.number
             patchset = item.change.patchset
+            refspec = item.change.refspec
+            branch = item.change.branch
         elif hasattr(item.change, 'newrev'):
             oldrev = item.change.oldrev
             newrev = item.change.newrev
@@ -456,8 +460,8 @@
                         item.change.project),
                     connection_name=connection_name,
                     merge_mode=item.current_build_set.getMergeMode(project),
-                    refspec=item.change.refspec,
-                    branch=item.change.branch,
+                    refspec=refspec,
+                    branch=branch,
                     ref=item.current_build_set.ref,
                     number=number,
                     patchset=patchset,
@@ -515,30 +519,54 @@
         if build_set.merge_state == build_set.COMPLETE:
             if build_set.unable_to_merge:
                 return None
+            self.log.debug("Preparing dynamic layout for: %s" % item.change)
             return self._loadDynamicLayout(item)
-        build_set.merge_state = build_set.PENDING
-        self.log.debug("Preparing dynamic layout for: %s" % item.change)
+
+    def scheduleMerge(self, item, files=None):
+        build_set = item.current_build_set
+
+        if not hasattr(item.change, 'branch'):
+            self.log.debug("Change %s does not have an associated branch, "
+                           "not scheduling a merge job for item %s" %
+                           (item.change, item))
+            build_set.merge_state = build_set.COMPLETE
+            return True
+
+        self.log.debug("Scheduling merge for item %s (files: %s)" %
+                       (item, files))
         dependent_items = self.getDependentItems(item)
         dependent_items.reverse()
         all_items = dependent_items + [item]
         merger_items = map(self._makeMergerItem, all_items)
+        build_set = item.current_build_set
+        build_set.merge_state = build_set.PENDING
         self.sched.merger.mergeChanges(merger_items,
                                        item.current_build_set,
-                                       ['zuul.yaml', '.zuul.yaml'],
+                                       files,
                                        self.pipeline.precedence)
+        return False
 
-    def prepareLayout(self, item):
-        # Get a copy of the layout in the context of the current
-        # queue.
-        # Returns True if the ref is ready, false otherwise
-        if not item.current_build_set.ref:
-            item.current_build_set.setConfiguration()
-        if not item.current_build_set.layout:
-            item.current_build_set.layout = self.getLayout(item)
-        if not item.current_build_set.layout:
+    def prepareItem(self, item):
+        # This runs on every iteration of _processOneItem
+        # Returns True if the item is ready, false otherwise
+        build_set = item.current_build_set
+        if not build_set.ref:
+            build_set.setConfiguration()
+        if build_set.merge_state == build_set.NEW:
+            return self.scheduleMerge(item, ['zuul.yaml', '.zuul.yaml'])
+        if build_set.config_error:
             return False
-        if item.current_build_set.config_error:
+        return True
+
+    def prepareJobs(self, item):
+        # This only runs once the item is in the pipeline's action window
+        # Returns True if the item is ready, false otherwise
+        build_set = item.current_build_set
+        if not build_set.layout:
+            build_set.layout = self.getLayout(item)
+        if not build_set.layout:
             return False
+
         if not item.job_graph:
             try:
                 item.freezeJobGraph()
@@ -553,11 +581,13 @@
 
     def _processOneItem(self, item, nnfi):
         changed = False
+        ready = False
+        failing_reasons = []  # Reasons this item is failing
+
         item_ahead = item.item_ahead
         if item_ahead and (not item_ahead.live):
             item_ahead = None
         change_queue = item.queue
-        failing_reasons = []  # Reasons this item is failing
 
         if self.checkForChangesNeededBy(item.change, change_queue) is not True:
             # It's not okay to enqueue this change, we should remove it.
@@ -572,10 +602,11 @@
                 except exceptions.MergeFailure:
                     pass
             return (True, nnfi)
-        dep_items = self.getFailingDependentItems(item)
+
         actionable = change_queue.isActionable(item)
         item.active = actionable
-        ready = False
+
+        dep_items = self.getFailingDependentItems(item)
         if dep_items:
             failing_reasons.append('a needed change is failing')
             self.cancelJobs(item, prime=False)
@@ -594,15 +625,16 @@
                 changed = True
                 self.cancelJobs(item)
             if actionable:
-                ready = self.prepareLayout(item)
+                ready = self.prepareItem(item) and self.prepareJobs(item)
                 if item.current_build_set.unable_to_merge:
                     failing_reasons.append("it has a merge conflict")
                 if item.current_build_set.config_error:
                     failing_reasons.append("it has an invalid configuration")
                 if ready and self.provisionNodes(item):
                     changed = True
-        if actionable and ready and self.executeJobs(item):
+        if ready and self.executeJobs(item):
             changed = True
+
         if item.didAnyJobFail():
             failing_reasons.append("at least one job failed")
         if (not item.live) and (not item.items_behind):
@@ -740,10 +772,11 @@
             # TODOv3(jeblair): consider a new reporter action for this
             actions = self.pipeline.merge_failure_actions
             item.setReportedResult('CONFIG_ERROR')
+        elif item.didMergerFail():
+            actions = self.pipeline.merge_failure_actions
+            item.setReportedResult('MERGER_FAILURE')
         elif not item.getJobs():
-            # We don't send empty reports with +1,
-            # and the same for -1's (merge failures or transient errors)
-            # as they cannot be followed by +1's
+            # We don't send empty reports with +1
             self.log.debug("No jobs for change %s" % item.change)
             actions = []
         elif item.didAllJobsSucceed():
@@ -751,9 +784,6 @@
             actions = self.pipeline.success_actions
             item.setReportedResult('SUCCESS')
             self.pipeline._consecutive_failures = 0
-        elif item.didMergerFail():
-            actions = self.pipeline.merge_failure_actions
-            item.setReportedResult('MERGER_FAILURE')
         else:
             actions = self.pipeline.failure_actions
             item.setReportedResult('FAILURE')