Refactor sources out of triggers

This is to further differentiate between sources and triggers.
Eventually allowing for multiple triggers per pipeline.

Still to come is separating connections from everything.

Change-Id: I1d680dbed5f650165643842af450f16b32ec5ed9
diff --git a/tests/base.py b/tests/base.py
index 57b30b5..9316b43 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -52,6 +52,7 @@
 import zuul.merger.server
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
+import zuul.source.gerrit
 import zuul.trigger.gerrit
 import zuul.trigger.timer
 import zuul.trigger.zuultrigger
@@ -382,12 +383,12 @@
     log = logging.getLogger("zuul.test.FakeGerrit")
 
     def __init__(self, hostname, username, port=29418, keyfile=None,
-                 changes_dbs={}):
+                 changes_dbs={}, queues_dbs={}):
         self.hostname = hostname
         self.username = username
         self.port = port
         self.keyfile = keyfile
-        self.event_queue = Queue.Queue()
+        self.event_queue = queues_dbs.get(hostname, {})
         self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
         self.change_number = 0
         self.changes = changes_dbs.get(hostname, {})
@@ -499,13 +500,14 @@
         return ret
 
 
-class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
+class FakeGerritSource(zuul.source.gerrit.Gerrit):
     name = 'gerrit'
 
     def __init__(self, upstream_root, *args):
-        super(FakeGerritTrigger, self).__init__(*args)
+        super(FakeGerritSource, self).__init__(*args)
         self.upstream_root = upstream_root
-        self.gerrit_connector.delay = 0.0
+        self.replication_timeout = 1.5
+        self.replication_retry_interval = 0.5
 
     def getGitUrl(self, project):
         return os.path.join(self.upstream_root, project.name)
@@ -958,11 +960,6 @@
         old_urlopen = urllib2.urlopen
         urllib2.urlopen = URLOpenerFactory
 
-        self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
-                                                      self.swift)
-        self.merge_client = zuul.merger.client.MergeClient(
-            self.config, self.sched)
-
         self.smtp_messages = []
 
         def FakeSMTPFactory(*args, **kw):
@@ -971,12 +968,16 @@
 
         # Set a changes database so multiple FakeGerrit's can report back to
         # a virtual canonical database given by the configured hostname
+        self.gerrit_queues_dbs = {
+            self.config.get('gerrit', 'server'): Queue.Queue()
+        }
         self.gerrit_changes_dbs = {
             self.config.get('gerrit', 'server'): {}
         }
 
         def FakeGerritFactory(*args, **kw):
             kw['changes_dbs'] = self.gerrit_changes_dbs
+            kw['queues_dbs'] = self.gerrit_queues_dbs
             return FakeGerrit(*args, **kw)
 
         self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
@@ -984,32 +985,23 @@
 
         self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
 
-        self.gerrit = FakeGerritTrigger(
-            self.upstream_root, self.config, self.sched)
-        self.gerrit.replication_timeout = 1.5
-        self.gerrit.replication_retry_interval = 0.5
-        self.fake_gerrit = self.gerrit.gerrit
-        self.fake_gerrit.upstream_root = self.upstream_root
-
-        self.webapp = zuul.webapp.WebApp(self.sched, port=0)
-        self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
+        self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
+                                                      self.swift)
+        self.merge_client = zuul.merger.client.MergeClient(
+            self.config, self.sched)
 
         self.sched.setLauncher(self.launcher)
         self.sched.setMerger(self.merge_client)
-        self.sched.registerTrigger(self.gerrit)
-        self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
-        self.sched.registerTrigger(self.timer)
-        self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
-                                                                self.sched)
-        self.sched.registerTrigger(self.zuultrigger)
 
-        self.sched.registerReporter(
-            zuul.reporter.gerrit.Reporter(self.gerrit))
-        self.smtp_reporter = zuul.reporter.smtp.Reporter(
-            self.config.get('smtp', 'default_from'),
-            self.config.get('smtp', 'default_to'),
-            self.config.get('smtp', 'server'))
-        self.sched.registerReporter(self.smtp_reporter)
+        self.register_sources()
+        self.fake_gerrit = self.gerrit_source.gerrit
+        self.fake_gerrit.upstream_root = self.upstream_root
+
+        self.register_triggers()
+        self.register_reporters()
+
+        self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+        self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.start()
         self.sched.reconfigure(self.config)
@@ -1024,6 +1016,38 @@
         self.addCleanup(self.assertFinalState)
         self.addCleanup(self.shutdown)
 
+    def register_sources(self):
+        # Register the available sources
+        self.gerrit_source = FakeGerritSource(
+            self.upstream_root, self.config, self.sched)
+        self.gerrit_source.replication_timeout = 1.5
+        self.gerrit_source.replication_retry_interval = 0.5
+
+        self.sched.registerSource(self.gerrit_source)
+
+    def register_triggers(self):
+        # Register the available triggers
+        self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(
+            self.fake_gerrit, self.config, self.sched, self.gerrit_source)
+        self.gerrit_trigger.gerrit_connector.delay = 0.0
+
+        self.sched.registerTrigger(self.gerrit_trigger)
+        self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
+        self.sched.registerTrigger(self.timer)
+        self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
+                                                                self.sched)
+        self.sched.registerTrigger(self.zuultrigger)
+
+    def register_reporters(self):
+        # Register the available reporters
+        self.sched.registerReporter(
+            zuul.reporter.gerrit.Reporter(self.fake_gerrit))
+        self.smtp_reporter = zuul.reporter.smtp.Reporter(
+            self.config.get('smtp', 'default_from'),
+            self.config.get('smtp', 'default_to'),
+            self.config.get('smtp', 'server'))
+        self.sched.registerReporter(self.smtp_reporter)
+
     def setup_config(self):
         """Per test config object. Override to set different config."""
         self.config = ConfigParser.ConfigParser()
@@ -1050,7 +1074,7 @@
         self.merge_server.join()
         self.merge_client.stop()
         self.worker.shutdown()
-        self.gerrit.stop()
+        self.gerrit_trigger.stop()
         self.timer.stop()
         self.sched.stop()
         self.sched.join()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a257440..d99a65c 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -729,8 +729,8 @@
         self.assertEqual(self.history[6].changes,
                          '1,1 2,1 3,1 4,1 5,1 6,1 7,1')
 
-    def test_trigger_cache(self):
-        "Test that the trigger cache operates correctly"
+    def test_source_cache(self):
+        "Test that the source cache operates correctly"
         self.worker.hold_jobs_in_build = True
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -762,9 +762,9 @@
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        self.log.debug("len %s" % self.gerrit._change_cache.keys())
+        self.log.debug("len %s" % self.gerrit_source._change_cache.keys())
         # there should still be changes in the cache
-        self.assertNotEqual(len(self.gerrit._change_cache.keys()), 0)
+        self.assertNotEqual(len(self.gerrit_source._change_cache.keys()), 0)
 
         self.worker.hold_jobs_in_build = False
         self.worker.release()
@@ -1469,7 +1469,7 @@
         "Test that the merger works with large changes after a repack"
         # https://bugs.launchpad.net/zuul/+bug/1078946
         # This test assumes the repo is already cloned; make sure it is
-        url = self.sched.triggers['gerrit'].getGitUrl(
+        url = self.sched.sources['gerrit'].getGitUrl(
             self.sched.layout.projects['org/project1'])
         self.merge_server.merger.addProject('org/project1', url)
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@@ -2165,6 +2165,7 @@
     def test_test_config(self):
         "Test that we can test the config"
         sched = zuul.scheduler.Scheduler()
+        sched.registerSource(None, 'gerrit')
         sched.registerTrigger(None, 'gerrit')
         sched.registerTrigger(None, 'timer')
         sched.registerTrigger(None, 'zuul')
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 2d99a1f..75226dc 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -144,17 +144,58 @@
         if self.gear_server_pid:
             os.kill(self.gear_server_pid, signal.SIGKILL)
 
+    def register_sources(self):
+        # Register the available sources
+        # See comment at top of file about zuul imports
+        import zuul.source.gerrit
+        self.gerrit_source = zuul.source.gerrit.Gerrit(self.config, self.sched)
+
+        self.sched.registerSource(self.gerrit_source)
+
+    def register_triggers(self):
+        # Register the available triggers
+        # See comment at top of file about zuul imports
+        import zuul.trigger.gerrit
+        import zuul.trigger.timer
+        import zuul.trigger.zuultrigger
+        self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(self.gerrit,
+                                                         self.config,
+                                                         self.sched,
+                                                         self.gerrit_source)
+        timer = zuul.trigger.timer.Timer(self.config, self.sched)
+        zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
+                                                           self.sched)
+
+        self.sched.registerTrigger(self.gerrit_trigger)
+        self.sched.registerTrigger(timer)
+        self.sched.registerTrigger(zuultrigger)
+
+    def register_reporters(self):
+        # Register the available reporters
+        # See comment at top of file about zuul imports
+        import zuul.reporter.gerrit
+        import zuul.reporter.smtp
+        gerrit_reporter = zuul.reporter.gerrit.Reporter(self.gerrit)
+        smtp_reporter = zuul.reporter.smtp.Reporter(
+            self.config.get('smtp', 'default_from')
+            if self.config.has_option('smtp', 'default_from') else 'zuul',
+            self.config.get('smtp', 'default_to')
+            if self.config.has_option('smtp', 'default_to') else 'zuul',
+            self.config.get('smtp', 'server')
+            if self.config.has_option('smtp', 'server') else 'localhost',
+            self.config.get('smtp', 'port')
+            if self.config.has_option('smtp', 'port') else 25
+        )
+
+        self.sched.registerReporter(gerrit_reporter)
+        self.sched.registerReporter(smtp_reporter)
+
     def main(self):
         # See comment at top of file about zuul imports
         import zuul.scheduler
         import zuul.launcher.gearman
         import zuul.merger.client
         import zuul.lib.swift
-        import zuul.reporter.gerrit
-        import zuul.reporter.smtp
-        import zuul.trigger.gerrit
-        import zuul.trigger.timer
-        import zuul.trigger.zuultrigger
         import zuul.webapp
         import zuul.rpclistener
 
@@ -172,35 +213,22 @@
         gearman = zuul.launcher.gearman.Gearman(self.config, self.sched,
                                                 self.swift)
         merger = zuul.merger.client.MergeClient(self.config, self.sched)
-        gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
-        timer = zuul.trigger.timer.Timer(self.config, self.sched)
-        zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
-                                                           self.sched)
+
         if self.config.has_option('zuul', 'status_expiry'):
             cache_expiry = self.config.getint('zuul', 'status_expiry')
         else:
             cache_expiry = 1
         webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
         rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
-        gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
-        smtp_reporter = zuul.reporter.smtp.Reporter(
-            self.config.get('smtp', 'default_from')
-            if self.config.has_option('smtp', 'default_from') else 'zuul',
-            self.config.get('smtp', 'default_to')
-            if self.config.has_option('smtp', 'default_to') else 'zuul',
-            self.config.get('smtp', 'server')
-            if self.config.has_option('smtp', 'server') else 'localhost',
-            self.config.get('smtp', 'port')
-            if self.config.has_option('smtp', 'port') else 25
-        )
 
         self.sched.setLauncher(gearman)
         self.sched.setMerger(merger)
-        self.sched.registerTrigger(gerrit)
-        self.sched.registerTrigger(timer)
-        self.sched.registerTrigger(zuultrigger)
-        self.sched.registerReporter(gerrit_reporter)
-        self.sched.registerReporter(smtp_reporter)
+        self.register_sources()
+        # TODO(jhesketh): Use connections instead of grabbing the gerrit lib
+        #                 from the source
+        self.gerrit = self.gerrit_source.gerrit
+        self.register_triggers()
+        self.register_reporters()
 
         self.log.info('Starting scheduler')
         self.sched.start()
diff --git a/zuul/model.py b/zuul/model.py
index 9106887..a646413 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -301,11 +301,11 @@
         self.reporter = reporter
         self.params = params
 
-    def report(self, change, message):
+    def report(self, source, change, message):
         """Sends the built message off to the configured reporter.
         Takes the change and message and adds the configured parameters.
         """
-        return self.reporter.report(change, message, self.params)
+        return self.reporter.report(source, change, message, self.params)
 
     def getSubmitAllowNeeds(self):
         """Gets the submit allow needs from the reporter based off the
diff --git a/zuul/reporter/gerrit.py b/zuul/reporter/gerrit.py
index 7c4774b..a2debfd 100644
--- a/zuul/reporter/gerrit.py
+++ b/zuul/reporter/gerrit.py
@@ -21,20 +21,20 @@
     name = 'gerrit'
     log = logging.getLogger("zuul.reporter.gerrit.Reporter")
 
-    def __init__(self, trigger):
+    def __init__(self, gerrit):
         """Set up the reporter."""
-        self.gerrit = trigger.gerrit
-        self.trigger = trigger
+        # TODO: make default_gerrit come from a connection
+        self.default_gerrit = gerrit
 
-    def report(self, change, message, params):
+    def report(self, source, change, message, params):
         """Send a message to gerrit."""
         self.log.debug("Report change %s, params %s, message: %s" %
                        (change, params, message))
         changeid = '%s,%s' % (change.number, change.patchset)
-        change._ref_sha = self.trigger.getRefSha(change.project.name,
-                                                 'refs/heads/' + change.branch)
-        return self.gerrit.review(change.project.name, changeid, message,
-                                  params)
+        change._ref_sha = source.getRefSha(change.project.name,
+                                           'refs/heads/' + change.branch)
+        return self.default_gerrit.review(
+            change.project.name, changeid, message, params)
 
     def getSubmitAllowNeeds(self, params):
         """Get a list of code review labels that are allowed to be
diff --git a/zuul/reporter/smtp.py b/zuul/reporter/smtp.py
index b214019..0992d66 100644
--- a/zuul/reporter/smtp.py
+++ b/zuul/reporter/smtp.py
@@ -35,7 +35,7 @@
         self.smtp_default_from = smtp_default_from
         self.smtp_default_to = smtp_default_to
 
-    def report(self, change, message, params):
+    def report(self, source, change, message, params):
         """Send the compiled report message via smtp."""
         self.log.debug("Report change %s, params %s, message: %s" %
                        (change, params, message))
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2355801..5567f40 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -189,6 +189,7 @@
         self._stopped = False
         self.launcher = None
         self.merger = None
+        self.sources = dict()
         self.triggers = dict()
         self.reporters = dict()
         self.config = None
@@ -265,8 +266,8 @@
             pipeline = Pipeline(conf_pipeline['name'])
             pipeline.description = conf_pipeline.get('description')
             # TODO(jeblair): remove backwards compatibility:
-            pipeline.source = self.triggers[conf_pipeline.get('source',
-                                                              'gerrit')]
+            pipeline.source = self.sources[conf_pipeline.get('source',
+                                                             'gerrit')]
             precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
             pipeline.precedence = precedence
             pipeline.failure_message = conf_pipeline.get('failure-message',
@@ -513,6 +514,11 @@
     def setMerger(self, merger):
         self.merger = merger
 
+    def registerSource(self, source, name=None):
+        if name is None:
+            name = source.name
+        self.sources[name] = source
+
     def registerTrigger(self, trigger, name=None):
         if name is None:
             name = trigger.name
@@ -756,6 +762,8 @@
             self.maintainTriggerCache()
             for trigger in self.triggers.values():
                 trigger.postConfig()
+            for source in self.sources.values():
+                source.postConfig()
             if statsd:
                 try:
                     for pipeline in self.layout.pipelines.values():
@@ -887,8 +895,8 @@
                 relevant.update(item.change.getRelatedChanges())
             self.log.debug("End maintain trigger cache for: %s" % pipeline)
         self.log.debug("Trigger cache size: %s" % len(relevant))
-        for trigger in self.triggers.values():
-            trigger.maintainCache(relevant)
+        for source in self.sources.values():
+            source.maintainCache(relevant)
 
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
@@ -1131,14 +1139,14 @@
                 if self.sched.config.has_option('zuul', 'status_url'):
                     msg += "\n" + self.sched.config.get('zuul', 'status_url')
                 ret = self.sendReport(self.pipeline.start_actions,
-                                      change, msg)
+                                      self.pipeline.source, change, msg)
                 if ret:
                     self.log.error("Reporting change start %s received: %s" %
                                    (change, ret))
             except:
                 self.log.exception("Exception while reporting start:")
 
-    def sendReport(self, action_reporters, change, message):
+    def sendReport(self, action_reporters, source, change, message):
         """Sends the built message off to configured reporters.
 
         Takes the action_reporters, change, message and extra options and
@@ -1147,7 +1155,7 @@
         report_errors = []
         if len(action_reporters) > 0:
             for action_reporter in action_reporters:
-                ret = action_reporter.report(change, message)
+                ret = action_reporter.report(source, change, message)
                 if ret:
                     report_errors.append(ret)
             if len(report_errors) == 0:
@@ -1604,7 +1612,8 @@
             try:
                 self.log.info("Reporting change %s, actions: %s" %
                               (item.change, actions))
-                ret = self.sendReport(actions, item.change, report)
+                ret = self.sendReport(actions, self.pipeline.source,
+                                      item.change, report)
                 if ret:
                     self.log.error("Reporting change %s received: %s" %
                                    (item.change, ret))
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/source/__init__.py
diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py
new file mode 100644
index 0000000..44b8609
--- /dev/null
+++ b/zuul/source/gerrit.py
@@ -0,0 +1,408 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import re
+import time
+import urllib2
+from zuul.lib import gerrit
+from zuul.model import Change, Ref, NullChange
+
+
+class Gerrit(object):
+    name = 'gerrit'
+    log = logging.getLogger("zuul.source.Gerrit")
+    replication_timeout = 300
+    replication_retry_interval = 5
+
+    depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
+                               re.MULTILINE | re.IGNORECASE)
+
+    def __init__(self, config, sched):
+        self._change_cache = {}
+        self.sched = sched
+        self.config = config
+        self.server = config.get('gerrit', 'server')
+        if config.has_option('gerrit', 'baseurl'):
+            self.baseurl = config.get('gerrit', 'baseurl')
+        else:
+            self.baseurl = 'https://%s' % self.server
+        user = config.get('gerrit', 'user')
+        if config.has_option('gerrit', 'sshkey'):
+            sshkey = config.get('gerrit', 'sshkey')
+        else:
+            sshkey = None
+        if config.has_option('gerrit', 'port'):
+            port = int(config.get('gerrit', 'port'))
+        else:
+            port = 29418
+        self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
+        self.gerrit.startWatching()
+
+    def _getInfoRefs(self, project):
+        url = "%s/p/%s/info/refs?service=git-upload-pack" % (
+            self.baseurl, project)
+        try:
+            data = urllib2.urlopen(url).read()
+        except:
+            self.log.error("Cannot get references from %s" % url)
+            raise  # keeps urllib2 error informations
+        ret = {}
+        read_headers = False
+        read_advertisement = False
+        if data[4] != '#':
+            raise Exception("Gerrit repository does not support "
+                            "git-upload-pack")
+        i = 0
+        while i < len(data):
+            if len(data) - i < 4:
+                raise Exception("Invalid length in info/refs")
+            plen = int(data[i:i + 4], 16)
+            i += 4
+            # It's the length of the packet, including the 4 bytes of the
+            # length itself, unless it's null, in which case the length is
+            # not included.
+            if plen > 0:
+                plen -= 4
+            if len(data) - i < plen:
+                raise Exception("Invalid data in info/refs")
+            line = data[i:i + plen]
+            i += plen
+            if not read_headers:
+                if plen == 0:
+                    read_headers = True
+                continue
+            if not read_advertisement:
+                read_advertisement = True
+                continue
+            if plen == 0:
+                # The terminating null
+                continue
+            line = line.strip()
+            revision, ref = line.split()
+            ret[ref] = revision
+        return ret
+
+    def getRefSha(self, project, ref):
+        refs = {}
+        try:
+            refs = self._getInfoRefs(project)
+        except:
+            self.log.exception("Exception looking for ref %s" %
+                               ref)
+        sha = refs.get(ref, '')
+        return sha
+
+    def waitForRefSha(self, project, ref, old_sha=''):
+        # Wait for the ref to show up in the repo
+        start = time.time()
+        while time.time() - start < self.replication_timeout:
+            sha = self.getRefSha(project.name, ref)
+            if old_sha != sha:
+                return True
+            time.sleep(self.replication_retry_interval)
+        return False
+
+    def isMerged(self, change, head=None):
+        self.log.debug("Checking if change %s is merged" % change)
+        if not change.number:
+            self.log.debug("Change has no number; considering it merged")
+            # Good question.  It's probably ref-updated, which, ah,
+            # means it's merged.
+            return True
+
+        data = self.gerrit.query(change.number)
+        change._data = data
+        change.is_merged = self._isMerged(change)
+        if not head:
+            return change.is_merged
+        if not change.is_merged:
+            return False
+
+        ref = 'refs/heads/' + change.branch
+        self.log.debug("Waiting for %s to appear in git repo" % (change))
+        if self.waitForRefSha(change.project, ref, change._ref_sha):
+            self.log.debug("Change %s is in the git repo" %
+                           (change))
+            return True
+        self.log.debug("Change %s did not appear in the git repo" %
+                       (change))
+        return False
+
+    def _isMerged(self, change):
+        data = change._data
+        if not data:
+            return False
+        status = data.get('status')
+        if not status:
+            return False
+        self.log.debug("Change %s status: %s" % (change, status))
+        if status == 'MERGED':
+            return True
+        return False
+
+    def canMerge(self, change, allow_needs):
+        if not change.number:
+            self.log.debug("Change has no number; considering it merged")
+            # Good question.  It's probably ref-updated, which, ah,
+            # means it's merged.
+            return True
+        data = change._data
+        if not data:
+            return False
+        if 'submitRecords' not in data:
+            return False
+        try:
+            for sr in data['submitRecords']:
+                if sr['status'] == 'OK':
+                    return True
+                elif sr['status'] == 'NOT_READY':
+                    for label in sr['labels']:
+                        if label['status'] in ['OK', 'MAY']:
+                            continue
+                        elif label['status'] in ['NEED', 'REJECT']:
+                            # It may be our own rejection, so we ignore
+                            if label['label'].lower() not in allow_needs:
+                                return False
+                            continue
+                        else:
+                            # IMPOSSIBLE
+                            return False
+                else:
+                    # CLOSED, RULE_ERROR
+                    return False
+        except:
+            self.log.exception("Exception determining whether change"
+                               "%s can merge:" % change)
+            return False
+        return True
+
+    def maintainCache(self, relevant):
+        # This lets the user supply a list of change objects that are
+        # still in use.  Anything in our cache that isn't in the supplied
+        # list should be safe to remove from the cache.
+        remove = []
+        for key, change in self._change_cache.items():
+            if change not in relevant:
+                remove.append(key)
+        for key in remove:
+            del self._change_cache[key]
+
+    def postConfig(self):
+        pass
+
+    def getChange(self, event, project):
+        if event.change_number:
+            change = self._getChange(event.change_number, event.patch_number)
+        elif event.ref:
+            change = Ref(project)
+            change.ref = event.ref
+            change.oldrev = event.oldrev
+            change.newrev = event.newrev
+            change.url = self.getGitwebUrl(project, sha=event.newrev)
+        else:
+            change = NullChange(project)
+        return change
+
+    def _getChange(self, number, patchset, refresh=False, history=None):
+        key = '%s,%s' % (number, patchset)
+        change = None
+        if key in self._change_cache:
+            change = self._change_cache.get(key)
+            if not refresh:
+                return change
+        if not change:
+            change = Change(None)
+            change.number = number
+            change.patchset = patchset
+        key = '%s,%s' % (change.number, change.patchset)
+        self._change_cache[key] = change
+        try:
+            self.updateChange(change, history)
+        except Exception:
+            del self._change_cache[key]
+            raise
+        return change
+
+    def getProjectOpenChanges(self, project):
+        # This is a best-effort function in case Gerrit is unable to return
+        # a particular change.  It happens.
+        query = "project:%s status:open" % (project.name,)
+        self.log.debug("Running query %s to get project open changes" %
+                       (query,))
+        data = self.gerrit.simpleQuery(query)
+        changes = []
+        for record in data:
+            try:
+                changes.append(
+                    self._getChange(record['number'],
+                                    record['currentPatchSet']['number']))
+            except Exception:
+                self.log.exception("Unable to query change %s" %
+                                   (record.get('number'),))
+        return changes
+
+    def _getDependsOnFromCommit(self, message):
+        records = []
+        seen = set()
+        for match in self.depends_on_re.findall(message):
+            if match in seen:
+                self.log.debug("Ignoring duplicate Depends-On: %s" %
+                               (match,))
+                continue
+            seen.add(match)
+            query = "change:%s" % (match,)
+            self.log.debug("Running query %s to find needed changes" %
+                           (query,))
+            records.extend(self.gerrit.simpleQuery(query))
+        return records
+
+    def _getNeededByFromCommit(self, change_id):
+        records = []
+        seen = set()
+        query = 'message:%s' % change_id
+        self.log.debug("Running query %s to find changes needed-by" %
+                       (query,))
+        results = self.gerrit.simpleQuery(query)
+        for result in results:
+            for match in self.depends_on_re.findall(
+                result['commitMessage']):
+                if match != change_id:
+                    continue
+                key = (result['number'], result['currentPatchSet']['number'])
+                if key in seen:
+                    continue
+                self.log.debug("Found change %s,%s needs %s from commit" %
+                               (key[0], key[1], change_id))
+                seen.add(key)
+                records.append(result)
+        return records
+
+    def updateChange(self, change, history=None):
+        self.log.info("Updating information for %s,%s" %
+                      (change.number, change.patchset))
+        data = self.gerrit.query(change.number)
+        change._data = data
+
+        if change.patchset is None:
+            change.patchset = data['currentPatchSet']['number']
+
+        if 'project' not in data:
+            raise Exception("Change %s,%s not found" % (change.number,
+                                                        change.patchset))
+        # If updated changed came as a dependent on
+        # and its project is not defined,
+        # then create a 'foreign' project for it in layout
+        change.project = self.sched.getProject(data['project'],
+                                               create_foreign=bool(history))
+        change.branch = data['branch']
+        change.url = data['url']
+        max_ps = 0
+        files = []
+        for ps in data['patchSets']:
+            if ps['number'] == change.patchset:
+                change.refspec = ps['ref']
+                for f in ps.get('files', []):
+                    files.append(f['file'])
+            if int(ps['number']) > int(max_ps):
+                max_ps = ps['number']
+        if max_ps == change.patchset:
+            change.is_current_patchset = True
+        else:
+            change.is_current_patchset = False
+        change.files = files
+
+        change.is_merged = self._isMerged(change)
+        change.approvals = data['currentPatchSet'].get('approvals', [])
+        change.open = data['open']
+        change.status = data['status']
+        change.owner = data['owner']
+
+        if change.is_merged:
+            # This change is merged, so we don't need to look any further
+            # for dependencies.
+            return change
+
+        if history is None:
+            history = []
+        else:
+            history = history[:]
+        history.append(change.number)
+
+        needs_changes = []
+        if 'dependsOn' in data:
+            parts = data['dependsOn'][0]['ref'].split('/')
+            dep_num, dep_ps = parts[3], parts[4]
+            if dep_num in history:
+                raise Exception("Dependency cycle detected: %s in %s" % (
+                    dep_num, history))
+            self.log.debug("Getting git-dependent change %s,%s" %
+                           (dep_num, dep_ps))
+            dep = self._getChange(dep_num, dep_ps, history=history)
+            if (not dep.is_merged) and dep not in needs_changes:
+                needs_changes.append(dep)
+
+        for record in self._getDependsOnFromCommit(data['commitMessage']):
+            dep_num = record['number']
+            dep_ps = record['currentPatchSet']['number']
+            if dep_num in history:
+                raise Exception("Dependency cycle detected: %s in %s" % (
+                    dep_num, history))
+            self.log.debug("Getting commit-dependent change %s,%s" %
+                           (dep_num, dep_ps))
+            dep = self._getChange(dep_num, dep_ps, history=history)
+            if (not dep.is_merged) and dep not in needs_changes:
+                needs_changes.append(dep)
+        change.needs_changes = needs_changes
+
+        needed_by_changes = []
+        if 'neededBy' in data:
+            for needed in data['neededBy']:
+                parts = needed['ref'].split('/')
+                dep_num, dep_ps = parts[3], parts[4]
+                dep = self._getChange(dep_num, dep_ps)
+                if (not dep.is_merged) and dep.is_current_patchset:
+                    needed_by_changes.append(dep)
+
+        for record in self._getNeededByFromCommit(data['id']):
+            dep_num = record['number']
+            dep_ps = record['currentPatchSet']['number']
+            self.log.debug("Getting commit-needed change %s,%s" %
+                           (dep_num, dep_ps))
+            # Because a commit needed-by may be a cross-repo
+            # dependency, cause that change to refresh so that it will
+            # reference the latest patchset of its Depends-On (this
+            # change).
+            dep = self._getChange(dep_num, dep_ps, refresh=True)
+            if (not dep.is_merged) and dep.is_current_patchset:
+                needed_by_changes.append(dep)
+        change.needed_by_changes = needed_by_changes
+
+        return change
+
+    def getGitUrl(self, project):
+        server = self.config.get('gerrit', 'server')
+        user = self.config.get('gerrit', 'user')
+        if self.config.has_option('gerrit', 'port'):
+            port = int(self.config.get('gerrit', 'port'))
+        else:
+            port = 29418
+        url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
+        return url
+
+    def getGitwebUrl(self, project, sha=None):
+        url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
+        if sha:
+            url += ';a=commitdiff;h=' + sha
+        return url
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 05d7581..428f67a 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -13,13 +13,10 @@
 # under the License.
 
 import logging
-import re
 import threading
 import time
-import urllib2
 import voluptuous
-from zuul.lib import gerrit
-from zuul.model import TriggerEvent, Change, Ref, NullChange
+from zuul.model import TriggerEvent
 
 
 class GerritEventConnector(threading.Thread):
@@ -28,12 +25,13 @@
     log = logging.getLogger("zuul.GerritEventConnector")
     delay = 5.0
 
-    def __init__(self, gerrit, sched, trigger):
+    def __init__(self, gerrit, sched, trigger, source):
         super(GerritEventConnector, self).__init__()
         self.daemon = True
         self.gerrit = gerrit
         self.sched = sched
         self.trigger = trigger
+        self.source = source
         self._stopped = False
 
     def stop(self):
@@ -98,9 +96,9 @@
             # Call _getChange for the side effect of updating the
             # cache.  Note that this modifies Change objects outside
             # the main thread.
-            self.trigger._getChange(event.change_number,
-                                    event.patch_number,
-                                    refresh=True)
+            self.source._getChange(event.change_number,
+                                   event.patch_number,
+                                   refresh=True)
 
         self.sched.addEvent(event)
 
@@ -118,398 +116,28 @@
 
 class Gerrit(object):
     name = 'gerrit'
-    log = logging.getLogger("zuul.Gerrit")
-    replication_timeout = 300
-    replication_retry_interval = 5
+    log = logging.getLogger("zuul.trigger.Gerrit")
 
-    depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
-                               re.MULTILINE | re.IGNORECASE)
-
-    def __init__(self, config, sched):
-        self._change_cache = {}
+    def __init__(self, gerrit, config, sched, source):
         self.sched = sched
+        # TODO(jhesketh): Make 'gerrit' come from a connection (rather than the
+        #                 source)
+        # TODO(jhesketh): Remove the requirement for a gerrit source (currently
+        #                 it is needed so on a trigger event the cache is
+        #                 updated. However if we share a connection object the
+        #                 cache could be stored there)
         self.config = config
-        self.server = config.get('gerrit', 'server')
-        if config.has_option('gerrit', 'baseurl'):
-            self.baseurl = config.get('gerrit', 'baseurl')
-        else:
-            self.baseurl = 'https://%s' % self.server
-        user = config.get('gerrit', 'user')
-        if config.has_option('gerrit', 'sshkey'):
-            sshkey = config.get('gerrit', 'sshkey')
-        else:
-            sshkey = None
-        if config.has_option('gerrit', 'port'):
-            port = int(config.get('gerrit', 'port'))
-        else:
-            port = 29418
-        self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
-        self.gerrit.startWatching()
-        self.gerrit_connector = GerritEventConnector(
-            self.gerrit, sched, self)
+        self.gerrit_connector = GerritEventConnector(gerrit, sched, self,
+                                                     source)
         self.gerrit_connector.start()
 
     def stop(self):
         self.gerrit_connector.stop()
         self.gerrit_connector.join()
 
-    def _getInfoRefs(self, project):
-        url = "%s/p/%s/info/refs?service=git-upload-pack" % (
-            self.baseurl, project)
-        try:
-            data = urllib2.urlopen(url).read()
-        except:
-            self.log.error("Cannot get references from %s" % url)
-            raise  # keeps urllib2 error informations
-        ret = {}
-        read_headers = False
-        read_advertisement = False
-        if data[4] != '#':
-            raise Exception("Gerrit repository does not support "
-                            "git-upload-pack")
-        i = 0
-        while i < len(data):
-            if len(data) - i < 4:
-                raise Exception("Invalid length in info/refs")
-            plen = int(data[i:i + 4], 16)
-            i += 4
-            # It's the length of the packet, including the 4 bytes of the
-            # length itself, unless it's null, in which case the length is
-            # not included.
-            if plen > 0:
-                plen -= 4
-            if len(data) - i < plen:
-                raise Exception("Invalid data in info/refs")
-            line = data[i:i + plen]
-            i += plen
-            if not read_headers:
-                if plen == 0:
-                    read_headers = True
-                continue
-            if not read_advertisement:
-                read_advertisement = True
-                continue
-            if plen == 0:
-                # The terminating null
-                continue
-            line = line.strip()
-            revision, ref = line.split()
-            ret[ref] = revision
-        return ret
-
-    def getRefSha(self, project, ref):
-        refs = {}
-        try:
-            refs = self._getInfoRefs(project)
-        except:
-            self.log.exception("Exception looking for ref %s" %
-                               ref)
-        sha = refs.get(ref, '')
-        return sha
-
-    def waitForRefSha(self, project, ref, old_sha=''):
-        # Wait for the ref to show up in the repo
-        start = time.time()
-        while time.time() - start < self.replication_timeout:
-            sha = self.getRefSha(project.name, ref)
-            if old_sha != sha:
-                return True
-            time.sleep(self.replication_retry_interval)
-        return False
-
-    def isMerged(self, change, head=None):
-        self.log.debug("Checking if change %s is merged" % change)
-        if not change.number:
-            self.log.debug("Change has no number; considering it merged")
-            # Good question.  It's probably ref-updated, which, ah,
-            # means it's merged.
-            return True
-
-        data = self.gerrit.query(change.number)
-        change._data = data
-        change.is_merged = self._isMerged(change)
-        if not head:
-            return change.is_merged
-        if not change.is_merged:
-            return False
-
-        ref = 'refs/heads/' + change.branch
-        self.log.debug("Waiting for %s to appear in git repo" % (change))
-        if self.waitForRefSha(change.project, ref, change._ref_sha):
-            self.log.debug("Change %s is in the git repo" %
-                           (change))
-            return True
-        self.log.debug("Change %s did not appear in the git repo" %
-                       (change))
-        return False
-
-    def _isMerged(self, change):
-        data = change._data
-        if not data:
-            return False
-        status = data.get('status')
-        if not status:
-            return False
-        self.log.debug("Change %s status: %s" % (change, status))
-        if status == 'MERGED':
-            return True
-        return False
-
-    def canMerge(self, change, allow_needs):
-        if not change.number:
-            self.log.debug("Change has no number; considering it merged")
-            # Good question.  It's probably ref-updated, which, ah,
-            # means it's merged.
-            return True
-        data = change._data
-        if not data:
-            return False
-        if 'submitRecords' not in data:
-            return False
-        try:
-            for sr in data['submitRecords']:
-                if sr['status'] == 'OK':
-                    return True
-                elif sr['status'] == 'NOT_READY':
-                    for label in sr['labels']:
-                        if label['status'] in ['OK', 'MAY']:
-                            continue
-                        elif label['status'] in ['NEED', 'REJECT']:
-                            # It may be our own rejection, so we ignore
-                            if label['label'].lower() not in allow_needs:
-                                return False
-                            continue
-                        else:
-                            # IMPOSSIBLE
-                            return False
-                else:
-                    # CLOSED, RULE_ERROR
-                    return False
-        except:
-            self.log.exception("Exception determining whether change"
-                               "%s can merge:" % change)
-            return False
-        return True
-
-    def maintainCache(self, relevant):
-        # This lets the user supply a list of change objects that are
-        # still in use.  Anything in our cache that isn't in the supplied
-        # list should be safe to remove from the cache.
-        remove = []
-        for key, change in self._change_cache.items():
-            if change not in relevant:
-                remove.append(key)
-        for key in remove:
-            del self._change_cache[key]
-
     def postConfig(self):
         pass
 
-    def getChange(self, event, project):
-        if event.change_number:
-            change = self._getChange(event.change_number, event.patch_number)
-        elif event.ref:
-            change = Ref(project)
-            change.ref = event.ref
-            change.oldrev = event.oldrev
-            change.newrev = event.newrev
-            change.url = self.getGitwebUrl(project, sha=event.newrev)
-        else:
-            change = NullChange(project)
-        return change
-
-    def _getChange(self, number, patchset, refresh=False, history=None):
-        key = '%s,%s' % (number, patchset)
-        change = None
-        if key in self._change_cache:
-            change = self._change_cache.get(key)
-            if not refresh:
-                return change
-        if not change:
-            change = Change(None)
-            change.number = number
-            change.patchset = patchset
-        key = '%s,%s' % (change.number, change.patchset)
-        self._change_cache[key] = change
-        try:
-            self.updateChange(change, history)
-        except Exception:
-            del self._change_cache[key]
-            raise
-        return change
-
-    def getProjectOpenChanges(self, project):
-        # This is a best-effort function in case Gerrit is unable to return
-        # a particular change.  It happens.
-        query = "project:%s status:open" % (project.name,)
-        self.log.debug("Running query %s to get project open changes" %
-                       (query,))
-        data = self.gerrit.simpleQuery(query)
-        changes = []
-        for record in data:
-            try:
-                changes.append(
-                    self._getChange(record['number'],
-                                    record['currentPatchSet']['number']))
-            except Exception:
-                self.log.exception("Unable to query change %s" %
-                                   (record.get('number'),))
-        return changes
-
-    def _getDependsOnFromCommit(self, message):
-        records = []
-        seen = set()
-        for match in self.depends_on_re.findall(message):
-            if match in seen:
-                self.log.debug("Ignoring duplicate Depends-On: %s" %
-                               (match,))
-                continue
-            seen.add(match)
-            query = "change:%s" % (match,)
-            self.log.debug("Running query %s to find needed changes" %
-                           (query,))
-            records.extend(self.gerrit.simpleQuery(query))
-        return records
-
-    def _getNeededByFromCommit(self, change_id):
-        records = []
-        seen = set()
-        query = 'message:%s' % change_id
-        self.log.debug("Running query %s to find changes needed-by" %
-                       (query,))
-        results = self.gerrit.simpleQuery(query)
-        for result in results:
-            for match in self.depends_on_re.findall(
-                result['commitMessage']):
-                if match != change_id:
-                    continue
-                key = (result['number'], result['currentPatchSet']['number'])
-                if key in seen:
-                    continue
-                self.log.debug("Found change %s,%s needs %s from commit" %
-                               (key[0], key[1], change_id))
-                seen.add(key)
-                records.append(result)
-        return records
-
-    def updateChange(self, change, history=None):
-        self.log.info("Updating information for %s,%s" %
-                      (change.number, change.patchset))
-        data = self.gerrit.query(change.number)
-        change._data = data
-
-        if change.patchset is None:
-            change.patchset = data['currentPatchSet']['number']
-
-        if 'project' not in data:
-            raise Exception("Change %s,%s not found" % (change.number,
-                                                        change.patchset))
-        # If updated changed came as a dependent on
-        # and its project is not defined,
-        # then create a 'foreign' project for it in layout
-        change.project = self.sched.getProject(data['project'],
-                                               create_foreign=bool(history))
-        change.branch = data['branch']
-        change.url = data['url']
-        max_ps = 0
-        files = []
-        for ps in data['patchSets']:
-            if ps['number'] == change.patchset:
-                change.refspec = ps['ref']
-                for f in ps.get('files', []):
-                    files.append(f['file'])
-            if int(ps['number']) > int(max_ps):
-                max_ps = ps['number']
-        if max_ps == change.patchset:
-            change.is_current_patchset = True
-        else:
-            change.is_current_patchset = False
-        change.files = files
-
-        change.is_merged = self._isMerged(change)
-        change.approvals = data['currentPatchSet'].get('approvals', [])
-        change.open = data['open']
-        change.status = data['status']
-        change.owner = data['owner']
-
-        if change.is_merged:
-            # This change is merged, so we don't need to look any further
-            # for dependencies.
-            return change
-
-        if history is None:
-            history = []
-        else:
-            history = history[:]
-        history.append(change.number)
-
-        needs_changes = []
-        if 'dependsOn' in data:
-            parts = data['dependsOn'][0]['ref'].split('/')
-            dep_num, dep_ps = parts[3], parts[4]
-            if dep_num in history:
-                raise Exception("Dependency cycle detected: %s in %s" % (
-                    dep_num, history))
-            self.log.debug("Getting git-dependent change %s,%s" %
-                           (dep_num, dep_ps))
-            dep = self._getChange(dep_num, dep_ps, history=history)
-            if (not dep.is_merged) and dep not in needs_changes:
-                needs_changes.append(dep)
-
-        for record in self._getDependsOnFromCommit(data['commitMessage']):
-            dep_num = record['number']
-            dep_ps = record['currentPatchSet']['number']
-            if dep_num in history:
-                raise Exception("Dependency cycle detected: %s in %s" % (
-                    dep_num, history))
-            self.log.debug("Getting commit-dependent change %s,%s" %
-                           (dep_num, dep_ps))
-            dep = self._getChange(dep_num, dep_ps, history=history)
-            if (not dep.is_merged) and dep not in needs_changes:
-                needs_changes.append(dep)
-        change.needs_changes = needs_changes
-
-        needed_by_changes = []
-        if 'neededBy' in data:
-            for needed in data['neededBy']:
-                parts = needed['ref'].split('/')
-                dep_num, dep_ps = parts[3], parts[4]
-                dep = self._getChange(dep_num, dep_ps)
-                if (not dep.is_merged) and dep.is_current_patchset:
-                    needed_by_changes.append(dep)
-
-        for record in self._getNeededByFromCommit(data['id']):
-            dep_num = record['number']
-            dep_ps = record['currentPatchSet']['number']
-            self.log.debug("Getting commit-needed change %s,%s" %
-                           (dep_num, dep_ps))
-            # Because a commit needed-by may be a cross-repo
-            # dependency, cause that change to refresh so that it will
-            # reference the latest patchset of its Depends-On (this
-            # change).
-            dep = self._getChange(dep_num, dep_ps, refresh=True)
-            if (not dep.is_merged) and dep.is_current_patchset:
-                needed_by_changes.append(dep)
-        change.needed_by_changes = needed_by_changes
-
-        return change
-
-    def getGitUrl(self, project):
-        server = self.config.get('gerrit', 'server')
-        user = self.config.get('gerrit', 'user')
-        if self.config.has_option('gerrit', 'port'):
-            port = int(self.config.get('gerrit', 'port'))
-        else:
-            port = 29418
-        url = 'ssh://%s@%s:%s/%s' % (user, server, port, project.name)
-        return url
-
-    def getGitwebUrl(self, project, sha=None):
-        url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
-        if sha:
-            url += ';a=commitdiff;h=' + sha
-        return url
-
 
 def validate_trigger(trigger_data):
     """Validates the layout's trigger data."""
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index 3d5cd9b..be22a70 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -41,17 +41,6 @@
     def stop(self):
         self.apsched.shutdown()
 
-    def isMerged(self, change, head=None):
-        raise Exception("Timer trigger does not support checking if "
-                        "a change is merged.")
-
-    def canMerge(self, change, allow_needs):
-        raise Exception("Timer trigger does not support checking if "
-                        "a change can merge.")
-
-    def maintainCache(self, relevant):
-        return
-
     def postConfig(self):
         for job in self.apsched.get_jobs():
             self.apsched.unschedule_job(job)
@@ -81,12 +70,3 @@
                                               second=second,
                                               args=(pipeline.name,
                                                     timespec,))
-
-    def getChange(self, event, project):
-        raise Exception("Timer trigger does not support changes.")
-
-    def getGitUrl(self, project):
-        raise Exception("Timer trigger does not support changes.")
-
-    def getGitwebUrl(self, project, sha=None):
-        raise Exception("Timer trigger does not support changes.")
diff --git a/zuul/trigger/zuultrigger.py b/zuul/trigger/zuultrigger.py
index 4418d6f..3cc4ff9 100644
--- a/zuul/trigger/zuultrigger.py
+++ b/zuul/trigger/zuultrigger.py
@@ -30,17 +30,6 @@
     def stop(self):
         pass
 
-    def isMerged(self, change, head=None):
-        raise Exception("Zuul trigger does not support checking if "
-                        "a change is merged.")
-
-    def canMerge(self, change, allow_needs):
-        raise Exception("Zuul trigger does not support checking if "
-                        "a change can merge.")
-
-    def maintainCache(self, relevant):
-        return
-
     def onChangeMerged(self, change):
         # Called each time zuul merges a change
         if self._handle_project_change_merged_events:
@@ -62,7 +51,7 @@
                     "%s in %s" % (change, pipeline))
 
     def _createProjectChangeMergedEvents(self, change):
-        changes = self.sched.triggers['gerrit'].getProjectOpenChanges(
+        changes = self.sched.sources['gerrit'].getProjectOpenChanges(
             change.project)
         for open_change in changes:
             self._createProjectChangeMergedEvent(open_change)
@@ -111,12 +100,3 @@
                     self._handle_parent_change_enqueued_events = True
                 elif 'project-change-merged' in ef._types:
                     self._handle_project_change_merged_events = True
-
-    def getChange(self, number, patchset, refresh=False):
-        raise Exception("Zuul trigger does not support changes.")
-
-    def getGitUrl(self, project):
-        raise Exception("Zuul trigger does not support changes.")
-
-    def getGitwebUrl(self, project, sha=None):
-        raise Exception("Zuul trigger does not support changes.")