Merge "Add pipeline source"
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index cdab4b7..03b00b3 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -295,6 +295,7 @@
 
   - name: check
     manager: IndependentPipelineManager
+    source: gerrit
     trigger:
       gerrit:
         - event: patchset-created
@@ -311,6 +312,11 @@
   This is an optional field that may be used to provide a textual
   description of the pipeline.
 
+**source**
+  A required field that specifies a trigger that provides access to
+  the change objects that this pipeline operates on.  Currently only
+  the value ``gerrit`` is supported.
+
 **success-message**
   An optional field that supplies the introductory text in message
   reported back to Gerrit when all the voting builds are successful.
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index c6c608f..247bde1 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -651,19 +651,19 @@
         "Test whether a change is ready to merge"
         # TODO: move to test_gerrit (this is a unit test!)
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        trigger = self.sched.layout.pipelines['gate'].trigger
-        a = self.sched.triggers['gerrit'].getChange(1, 2)
+        source = self.sched.layout.pipelines['gate'].source
+        a = source._getChange(1, 2)
         mgr = self.sched.layout.pipelines['gate'].manager
-        self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
+        self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
 
         A.addApproval('CRVW', 2)
-        a = trigger.getChange(1, 2, refresh=True)
-        self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
+        a = source._getChange(1, 2, refresh=True)
+        self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
 
         A.addApproval('APRV', 1)
-        a = trigger.getChange(1, 2, refresh=True)
-        self.assertTrue(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
-        trigger.maintainCache([])
+        a = source._getChange(1, 2, refresh=True)
+        self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
+        source.maintainCache([])
 
     def test_build_configuration(self):
         "Test that zuul merges the right commits for testing"
diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py
index 7256bc4..0adc2c2 100644
--- a/zuul/layoutvalidator.py
+++ b/zuul/layoutvalidator.py
@@ -87,6 +87,7 @@
 
     pipeline = {v.Required('name'): str,
                 v.Required('manager'): manager,
+                'source': v.Any('gerrit'),
                 'precedence': precedence,
                 'description': str,
                 'require': require,
diff --git a/zuul/model.py b/zuul/model.py
index b85f3eb..8b97241 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -77,7 +77,7 @@
         self.manager = None
         self.queues = []
         self.precedence = PRECEDENCE_NORMAL
-        self.trigger = None
+        self.source = None
         self.start_actions = None
         self.success_actions = None
         self.failure_actions = None
@@ -965,20 +965,6 @@
 
         return ret
 
-    def getChange(self, project, trigger):
-        if self.change_number:
-            change = trigger.getChange(self.change_number, self.patch_number)
-        elif self.ref:
-            change = Ref(project)
-            change.ref = self.ref
-            change.oldrev = self.oldrev
-            change.newrev = self.newrev
-            change.url = trigger.getGitwebUrl(project, sha=self.newrev)
-        else:
-            change = NullChange(project)
-
-        return change
-
 
 class BaseFilter(object):
     def __init__(self, required_approvals=[]):
@@ -1038,11 +1024,12 @@
 
 
 class EventFilter(BaseFilter):
-    def __init__(self, types=[], branches=[], refs=[], event_approvals={},
-                 comments=[], emails=[], usernames=[], timespecs=[],
-                 required_approvals=[]):
+    def __init__(self, trigger, types=[], branches=[], refs=[],
+                 event_approvals={}, comments=[], emails=[], usernames=[],
+                 timespecs=[], required_approvals=[]):
         super(EventFilter, self).__init__(
             required_approvals=required_approvals)
+        self.trigger = trigger
         self._types = types
         self._branches = branches
         self._refs = refs
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index fcf1161..05b8d03 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -109,7 +109,7 @@
         if not errors:
             event.change_number, event.patch_number = args['change'].split(',')
             try:
-                event.getChange(project, trigger)
+                pipeline.source.getChange(event, project)
             except Exception:
                 errors += 'Invalid change: %s\n' % (args['change'],)
 
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index b3163b3..60a22ce 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -235,6 +235,8 @@
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])
             pipeline.description = conf_pipeline.get('description')
+            # TODO(jeblair): remove backwards compatibility:
+            pipeline.source = self.triggers[conf_pipeline.get('source', 'gerrit')]
             precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
             pipeline.precedence = precedence
             pipeline.failure_message = conf_pipeline.get('failure-message',
@@ -298,7 +300,6 @@
             # TODO: move this into triggers (may require pluggable
             # configuration)
             if 'gerrit' in conf_pipeline['trigger']:
-                pipeline.trigger = self.triggers['gerrit']
                 for trigger in toList(conf_pipeline['trigger']['gerrit']):
                     approvals = {}
                     for approval_dict in toList(trigger.get('approval')):
@@ -314,7 +315,8 @@
                     usernames = toList(trigger.get('username'))
                     if not usernames:
                         usernames = toList(trigger.get('username_filter'))
-                    f = EventFilter(types=toList(trigger['event']),
+                    f = EventFilter(trigger=self.triggers['gerrit'],
+                                    types=toList(trigger['event']),
                                     branches=toList(trigger.get('branch')),
                                     refs=toList(trigger.get('ref')),
                                     event_approvals=approvals,
@@ -325,9 +327,9 @@
                                     toList(trigger.get('require-approval')))
                     manager.event_filters.append(f)
             elif 'timer' in conf_pipeline['trigger']:
-                pipeline.trigger = self.triggers['timer']
                 for trigger in toList(conf_pipeline['trigger']['timer']):
-                    f = EventFilter(types=['timer'],
+                    f = EventFilter(trigger=self.triggers['timer'],
+                                    types=['timer'],
                                     timespecs=toList(trigger['time']))
                     manager.event_filters.append(f)
 
@@ -714,8 +716,7 @@
     def _doEnqueueEvent(self, event):
         project = self.layout.projects.get(event.project_name)
         pipeline = self.layout.pipelines[event.forced_pipeline]
-        trigger = self.triggers.get(event.trigger_name)
-        change = event.getChange(project, trigger)
+        change = pipeline.source.getChange(event, project)
         self.log.debug("Event %s for change %s was directly assigned "
                        "to pipeline %s" % (event, change, self))
         self.log.info("Adding %s, %s to %s" %
@@ -809,8 +810,7 @@
                 return
 
             for pipeline in self.layout.pipelines.values():
-                change = event.getChange(project,
-                                         self.triggers.get(event.trigger_name))
+                change = pipeline.source.getChange(event, project)
                 if event.type == 'patchset-created':
                     pipeline.manager.removeOldVersionsOfChange(change)
                 elif event.type == 'change-abandoned':
@@ -944,6 +944,7 @@
 
     def _postConfig(self, layout):
         self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
+        self.log.info("  Source: %s" % self.pipeline.source)
         self.log.info("  Requirements:")
         for f in self.changeish_filters:
             self.log.info("    %s" % f)
@@ -1188,7 +1189,7 @@
             oldrev = item.change.oldrev
             newrev = item.change.newrev
         return dict(project=item.change.project.name,
-                    url=self.pipeline.trigger.getGitUrl(
+                    url=self.pipeline.source.getGitUrl(
                         item.change.project),
                     merge_mode=item.change.project.merge_mode,
                     refspec=item.change.refspec,
@@ -1220,7 +1221,7 @@
                                            item.current_build_set)
         else:
             self.log.debug("Preparing update repo for: %s" % item.change)
-            url = self.pipeline.trigger.getGitUrl(item.change.project)
+            url = self.pipeline.source.getGitUrl(item.change.project)
             self.sched.merger.updateRepo(item.change.project.name,
                                          url, build_set)
         return False
@@ -1410,8 +1411,8 @@
             succeeded = self.pipeline.didAllJobsSucceed(item)
             merged = item.reported
             if merged:
-                merged = self.pipeline.trigger.isMerged(item.change,
-                                                        item.change.branch)
+                merged = self.pipeline.source.isMerged(item.change,
+                                                       item.change.branch)
             self.log.info("Reported change %s status: all-succeeded: %s, "
                           "merged: %s" % (item.change, succeeded, merged))
             change_queue = self.pipeline.getQueue(item.change.project)
@@ -1737,8 +1738,8 @@
         return new_change_queues
 
     def isChangeReadyToBeEnqueued(self, change):
-        if not self.pipeline.trigger.canMerge(change,
-                                              self.getSubmitAllowNeeds()):
+        if not self.pipeline.source.canMerge(change,
+                                             self.getSubmitAllowNeeds()):
             self.log.debug("Change %s can not merge, ignoring" % change)
             return False
         return True
@@ -1750,8 +1751,8 @@
             self.log.debug("  Changeish does not support dependencies")
             return
         for needs in change.needed_by_changes:
-            if self.pipeline.trigger.canMerge(needs,
-                                              self.getSubmitAllowNeeds()):
+            if self.pipeline.source.canMerge(needs,
+                                             self.getSubmitAllowNeeds()):
                 self.log.debug("  Change %s needs %s and is ready to merge" %
                                (needs, change))
                 to_enqueue.append(needs)
@@ -1790,8 +1791,8 @@
         if self.isChangeAlreadyInQueue(change.needs_change):
             self.log.debug("  Needed change is already ahead in the queue")
             return True
-        if self.pipeline.trigger.canMerge(change.needs_change,
-                                          self.getSubmitAllowNeeds()):
+        if self.pipeline.source.canMerge(change.needs_change,
+                                         self.getSubmitAllowNeeds()):
             self.log.debug("  Change %s is needed" %
                            change.needs_change)
             return change.needs_change
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index a6eedb1..d2cd7fc 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -17,7 +17,7 @@
 import time
 import urllib2
 from zuul.lib import gerrit
-from zuul.model import TriggerEvent, Change
+from zuul.model import TriggerEvent, Change, Ref, NullChange
 
 
 class GerritEventConnector(threading.Thread):
@@ -84,12 +84,12 @@
             event.account = None
 
         if event.change_number:
-            # Call getChange for the side effect of updating the
+            # Call _getChange for the side effect of updating the
             # cache.  Note that this modifies Change objects outside
             # the main thread.
-            self.trigger.getChange(event.change_number,
-                                   event.patch_number,
-                                   refresh=True)
+            self.trigger._getChange(event.change_number,
+                                    event.patch_number,
+                                    refresh=True)
 
         self.sched.addEvent(event)
         self.gerrit.eventDone()
@@ -290,7 +290,20 @@
     def postConfig(self):
         pass
 
-    def getChange(self, number, patchset, refresh=False):
+    def getChange(self, event, project):
+        if event.change_number:
+            change = self._getChange(event.change_number, event.patch_number)
+        elif event.ref:
+            change = Ref(project)
+            change.ref = event.ref
+            change.oldrev = event.oldrev
+            change.newrev = event.newrev
+            change.url = self.getGitwebUrl(project, sha=event.newrev)
+        else:
+            change = NullChange(project)
+        return change
+
+    def _getChange(self, number, patchset, refresh=False):
         key = '%s,%s' % (number, patchset)
         change = None
         if key in self._change_cache:
@@ -349,7 +362,7 @@
         if 'dependsOn' in data:
             parts = data['dependsOn'][0]['ref'].split('/')
             dep_num, dep_ps = parts[3], parts[4]
-            dep = self.getChange(dep_num, dep_ps)
+            dep = self._getChange(dep_num, dep_ps)
             if not dep.is_merged:
                 change.needs_change = dep
 
@@ -358,7 +371,7 @@
             for needed in data['neededBy']:
                 parts = needed['ref'].split('/')
                 dep_num, dep_ps = parts[3], parts[4]
-                dep = self.getChange(dep_num, dep_ps)
+                dep = self._getChange(dep_num, dep_ps)
                 if not dep.is_merged and dep.is_current_patchset:
                     change.needed_by_changes.append(dep)
 
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index 904fa7a..3d5cd9b 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -56,9 +56,9 @@
         for job in self.apsched.get_jobs():
             self.apsched.unschedule_job(job)
         for pipeline in self.sched.layout.pipelines.values():
-            if pipeline.trigger != self:
-                continue
             for ef in pipeline.manager.event_filters:
+                if ef.trigger != self:
+                    continue
                 for timespec in ef.timespecs:
                     parts = timespec.split()
                     if len(parts) < 5 or len(parts) > 6:
@@ -82,15 +82,11 @@
                                               args=(pipeline.name,
                                                     timespec,))
 
-    def getChange(self, number, patchset, refresh=False):
+    def getChange(self, event, project):
         raise Exception("Timer trigger does not support changes.")
 
     def getGitUrl(self, project):
-        # For the moment, the timer trigger requires gerrit.
-        return self.sched.triggers['gerrit'].getGitUrl(project)
+        raise Exception("Timer trigger does not support changes.")
 
     def getGitwebUrl(self, project, sha=None):
-        url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
-        if sha:
-            url += ';a=commitdiff;h=' + sha
-        return url
+        raise Exception("Timer trigger does not support changes.")