Add pipeline source

A TriggerEvent may originate from a trigger that does not represent
the canonical location of the project source.  For instance, the
timer trigger strangely depends on the gerrit trigger to actually
handle Git operations behind the scenes.  Instead, make an explicit
association between pipelines and their source triggers so that
their event trigger does not need to have that implicit association.

This is a step toward having pipelines support multiple triggers
(they already support multiple reporters).

Change-Id: Ie80ffde411fe40fddfc4496b7adb0004f660c48c
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index cb49c82..1b9ce64 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -278,6 +278,7 @@
 
   - name: check
     manager: IndependentPipelineManager
+    source: gerrit
     trigger:
       gerrit:
         - event: patchset-created
@@ -294,6 +295,11 @@
   This is an optional field that may be used to provide a textual
   description of the pipeline.
 
+**source**
+  A required field that specifies a trigger that provides access to
+  the change objects that this pipeline operates on.  Currently only
+  the value ``gerrit`` is supported.
+
 **success-message**
   An optional field that supplies the introductory text in message
   reported back to Gerrit when all the voting builds are successful.
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 52c6c81..fe6a584 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -651,19 +651,19 @@
         "Test whether a change is ready to merge"
         # TODO: move to test_gerrit (this is a unit test!)
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        trigger = self.sched.layout.pipelines['gate'].trigger
-        a = self.sched.triggers['gerrit'].getChange(1, 2)
+        source = self.sched.layout.pipelines['gate'].source
+        a = source._getChange(1, 2)
         mgr = self.sched.layout.pipelines['gate'].manager
-        self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
+        self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
 
         A.addApproval('CRVW', 2)
-        a = trigger.getChange(1, 2, refresh=True)
-        self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
+        a = source._getChange(1, 2, refresh=True)
+        self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
 
         A.addApproval('APRV', 1)
-        a = trigger.getChange(1, 2, refresh=True)
-        self.assertTrue(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
-        trigger.maintainCache([])
+        a = source._getChange(1, 2, refresh=True)
+        self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
+        source.maintainCache([])
 
     def test_build_configuration(self):
         "Test that zuul merges the right commits for testing"
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 1984aaa..9a173ff 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -86,6 +86,7 @@
 
     pipeline = {v.Required('name'): str,
                 v.Required('manager'): manager,
+                'source': v.Any('gerrit'),
                 'precedence': precedence,
                 'description': str,
                 'require': require,
diff --git a/zuul/model.py b/zuul/model.py
index b85f3eb..8b97241 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -77,7 +77,7 @@
         self.manager = None
         self.queues = []
         self.precedence = PRECEDENCE_NORMAL
-        self.trigger = None
+        self.source = None
         self.start_actions = None
         self.success_actions = None
         self.failure_actions = None
@@ -965,20 +965,6 @@
 
         return ret
 
-    def getChange(self, project, trigger):
-        if self.change_number:
-            change = trigger.getChange(self.change_number, self.patch_number)
-        elif self.ref:
-            change = Ref(project)
-            change.ref = self.ref
-            change.oldrev = self.oldrev
-            change.newrev = self.newrev
-            change.url = trigger.getGitwebUrl(project, sha=self.newrev)
-        else:
-            change = NullChange(project)
-
-        return change
-
 
 class BaseFilter(object):
     def __init__(self, required_approvals=[]):
@@ -1038,11 +1024,12 @@
 
 
 class EventFilter(BaseFilter):
-    def __init__(self, types=[], branches=[], refs=[], event_approvals={},
-                 comments=[], emails=[], usernames=[], timespecs=[],
-                 required_approvals=[]):
+    def __init__(self, trigger, types=[], branches=[], refs=[],
+                 event_approvals={}, comments=[], emails=[], usernames=[],
+                 timespecs=[], required_approvals=[]):
         super(EventFilter, self).__init__(
             required_approvals=required_approvals)
+        self.trigger = trigger
         self._types = types
         self._branches = branches
         self._refs = refs
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index fcf1161..05b8d03 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -109,7 +109,7 @@
         if not errors:
             event.change_number, event.patch_number = args['change'].split(',')
             try:
-                event.getChange(project, trigger)
+                pipeline.source.getChange(event, project)
             except Exception:
                 errors += 'Invalid change: %s\n' % (args['change'],)
 
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 0b228d9..eb57f93 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -235,6 +235,8 @@
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])
             pipeline.description = conf_pipeline.get('description')
+            # TODO(jeblair): remove backwards compatibility:
+            pipeline.source = self.triggers[conf_pipeline.get('source', 'gerrit')]
             precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
             pipeline.precedence = precedence
             pipeline.failure_message = conf_pipeline.get('failure-message',
@@ -298,7 +300,6 @@
             # TODO: move this into triggers (may require pluggable
             # configuration)
             if 'gerrit' in conf_pipeline['trigger']:
-                pipeline.trigger = self.triggers['gerrit']
                 for trigger in toList(conf_pipeline['trigger']['gerrit']):
                     approvals = {}
                     for approval_dict in toList(trigger.get('approval')):
@@ -314,7 +315,8 @@
                     usernames = toList(trigger.get('username'))
                     if not usernames:
                         usernames = toList(trigger.get('username_filter'))
-                    f = EventFilter(types=toList(trigger['event']),
+                    f = EventFilter(trigger=self.triggers['gerrit'],
+                                    types=toList(trigger['event']),
                                     branches=toList(trigger.get('branch')),
                                     refs=toList(trigger.get('ref')),
                                     event_approvals=approvals,
@@ -325,9 +327,9 @@
                                     toList(trigger.get('require-approval')))
                     manager.event_filters.append(f)
             elif 'timer' in conf_pipeline['trigger']:
-                pipeline.trigger = self.triggers['timer']
                 for trigger in toList(conf_pipeline['trigger']['timer']):
-                    f = EventFilter(types=['timer'],
+                    f = EventFilter(trigger=self.triggers['timer'],
+                                    types=['timer'],
                                     timespecs=toList(trigger['time']))
                     manager.event_filters.append(f)
 
@@ -714,8 +716,7 @@
     def _doEnqueueEvent(self, event):
         project = self.layout.projects.get(event.project_name)
         pipeline = self.layout.pipelines[event.forced_pipeline]
-        trigger = self.triggers.get(event.trigger_name)
-        change = event.getChange(project, trigger)
+        change = pipeline.source.getChange(event, project)
         self.log.debug("Event %s for change %s was directly assigned "
                        "to pipeline %s" % (event, change, self))
         self.log.info("Adding %s, %s to %s" %
@@ -809,8 +810,7 @@
                 return
 
             for pipeline in self.layout.pipelines.values():
-                change = event.getChange(project,
-                                         self.triggers.get(event.trigger_name))
+                change = pipeline.source.getChange(event, project)
                 if event.type == 'patchset-created':
                     pipeline.manager.removeOldVersionsOfChange(change)
                 elif event.type == 'change-abandoned':
@@ -944,6 +944,7 @@
 
     def _postConfig(self, layout):
         self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
+        self.log.info("  Source: %s" % self.pipeline.source)
         self.log.info("  Requirements:")
         for f in self.changeish_filters:
             self.log.info("    %s" % f)
@@ -1188,7 +1189,7 @@
             oldrev = item.change.oldrev
             newrev = item.change.newrev
         return dict(project=item.change.project.name,
-                    url=self.pipeline.trigger.getGitUrl(
+                    url=self.pipeline.source.getGitUrl(
                         item.change.project),
                     merge_mode=item.change.project.merge_mode,
                     refspec=item.change.refspec,
@@ -1220,7 +1221,7 @@
                                            item.current_build_set)
         else:
             self.log.debug("Preparing update repo for: %s" % item.change)
-            url = self.pipeline.trigger.getGitUrl(item.change.project)
+            url = self.pipeline.source.getGitUrl(item.change.project)
             self.sched.merger.updateRepo(item.change.project.name,
                                          url, build_set)
         return False
@@ -1410,8 +1411,8 @@
             succeeded = self.pipeline.didAllJobsSucceed(item)
             merged = item.reported
             if merged:
-                merged = self.pipeline.trigger.isMerged(item.change,
-                                                        item.change.branch)
+                merged = self.pipeline.source.isMerged(item.change,
+                                                       item.change.branch)
             self.log.info("Reported change %s status: all-succeeded: %s, "
                           "merged: %s" % (item.change, succeeded, merged))
             change_queue = self.pipeline.getQueue(item.change.project)
@@ -1738,8 +1739,8 @@
         return new_change_queues
 
     def isChangeReadyToBeEnqueued(self, change):
-        if not self.pipeline.trigger.canMerge(change,
-                                              self.getSubmitAllowNeeds()):
+        if not self.pipeline.source.canMerge(change,
+                                             self.getSubmitAllowNeeds()):
             self.log.debug("Change %s can not merge, ignoring" % change)
             return False
         return True
@@ -1751,8 +1752,8 @@
             self.log.debug("  Changeish does not support dependencies")
             return
         for needs in change.needed_by_changes:
-            if self.pipeline.trigger.canMerge(needs,
-                                              self.getSubmitAllowNeeds()):
+            if self.pipeline.source.canMerge(needs,
+                                             self.getSubmitAllowNeeds()):
                 self.log.debug("  Change %s needs %s and is ready to merge" %
                                (needs, change))
                 to_enqueue.append(needs)
@@ -1791,8 +1792,8 @@
         if self.isChangeAlreadyInQueue(change.needs_change):
             self.log.debug("  Needed change is already ahead in the queue")
             return True
-        if self.pipeline.trigger.canMerge(change.needs_change,
-                                          self.getSubmitAllowNeeds()):
+        if self.pipeline.source.canMerge(change.needs_change,
+                                         self.getSubmitAllowNeeds()):
             self.log.debug("  Change %s is needed" %
                            change.needs_change)
             return change.needs_change
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index a6eedb1..d2cd7fc 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -17,7 +17,7 @@
 import time
 import urllib2
 from zuul.lib import gerrit
-from zuul.model import TriggerEvent, Change
+from zuul.model import TriggerEvent, Change, Ref, NullChange
 
 
 class GerritEventConnector(threading.Thread):
@@ -84,12 +84,12 @@
             event.account = None
 
         if event.change_number:
-            # Call getChange for the side effect of updating the
+            # Call _getChange for the side effect of updating the
             # cache.  Note that this modifies Change objects outside
             # the main thread.
-            self.trigger.getChange(event.change_number,
-                                   event.patch_number,
-                                   refresh=True)
+            self.trigger._getChange(event.change_number,
+                                    event.patch_number,
+                                    refresh=True)
 
         self.sched.addEvent(event)
         self.gerrit.eventDone()
@@ -290,7 +290,20 @@
     def postConfig(self):
         pass
 
-    def getChange(self, number, patchset, refresh=False):
+    def getChange(self, event, project):
+        if event.change_number:
+            change = self._getChange(event.change_number, event.patch_number)
+        elif event.ref:
+            change = Ref(project)
+            change.ref = event.ref
+            change.oldrev = event.oldrev
+            change.newrev = event.newrev
+            change.url = self.getGitwebUrl(project, sha=event.newrev)
+        else:
+            change = NullChange(project)
+        return change
+
+    def _getChange(self, number, patchset, refresh=False):
         key = '%s,%s' % (number, patchset)
         change = None
         if key in self._change_cache:
@@ -349,7 +362,7 @@
         if 'dependsOn' in data:
             parts = data['dependsOn'][0]['ref'].split('/')
             dep_num, dep_ps = parts[3], parts[4]
-            dep = self.getChange(dep_num, dep_ps)
+            dep = self._getChange(dep_num, dep_ps)
             if not dep.is_merged:
                 change.needs_change = dep
 
@@ -358,7 +371,7 @@
             for needed in data['neededBy']:
                 parts = needed['ref'].split('/')
                 dep_num, dep_ps = parts[3], parts[4]
-                dep = self.getChange(dep_num, dep_ps)
+                dep = self._getChange(dep_num, dep_ps)
                 if not dep.is_merged and dep.is_current_patchset:
                     change.needed_by_changes.append(dep)
 
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index 904fa7a..3d5cd9b 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -56,9 +56,9 @@
         for job in self.apsched.get_jobs():
             self.apsched.unschedule_job(job)
         for pipeline in self.sched.layout.pipelines.values():
-            if pipeline.trigger != self:
-                continue
             for ef in pipeline.manager.event_filters:
+                if ef.trigger != self:
+                    continue
                 for timespec in ef.timespecs:
                     parts = timespec.split()
                     if len(parts) < 5 or len(parts) > 6:
@@ -82,15 +82,11 @@
                                               args=(pipeline.name,
                                                     timespec,))
 
-    def getChange(self, number, patchset, refresh=False):
+    def getChange(self, event, project):
         raise Exception("Timer trigger does not support changes.")
 
     def getGitUrl(self, project):
-        # For the moment, the timer trigger requires gerrit.
-        return self.sched.triggers['gerrit'].getGitUrl(project)
+        raise Exception("Timer trigger does not support changes.")
 
     def getGitwebUrl(self, project, sha=None):
-        url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
-        if sha:
-            url += ';a=commitdiff;h=' + sha
-        return url
+        raise Exception("Timer trigger does not support changes.")