Refactor sources out of triggers

This is to further differentiate between sources and triggers.
Eventually allowing for multiple triggers per pipeline.

Still to come is separating connections from everything.

Change-Id: I1d680dbed5f650165643842af450f16b32ec5ed9
diff --git a/tests/base.py b/tests/base.py
index 57b30b5..9316b43 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -52,6 +52,7 @@
 import zuul.merger.server
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
+import zuul.source.gerrit
 import zuul.trigger.gerrit
 import zuul.trigger.timer
 import zuul.trigger.zuultrigger
@@ -382,12 +383,12 @@
     log = logging.getLogger("zuul.test.FakeGerrit")
 
     def __init__(self, hostname, username, port=29418, keyfile=None,
-                 changes_dbs={}):
+                 changes_dbs={}, queues_dbs={}):
         self.hostname = hostname
         self.username = username
         self.port = port
         self.keyfile = keyfile
-        self.event_queue = Queue.Queue()
+        self.event_queue = queues_dbs.get(hostname, {})
         self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
         self.change_number = 0
         self.changes = changes_dbs.get(hostname, {})
@@ -499,13 +500,14 @@
         return ret
 
 
-class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
+class FakeGerritSource(zuul.source.gerrit.Gerrit):
     name = 'gerrit'
 
     def __init__(self, upstream_root, *args):
-        super(FakeGerritTrigger, self).__init__(*args)
+        super(FakeGerritSource, self).__init__(*args)
         self.upstream_root = upstream_root
-        self.gerrit_connector.delay = 0.0
+        self.replication_timeout = 1.5
+        self.replication_retry_interval = 0.5
 
     def getGitUrl(self, project):
         return os.path.join(self.upstream_root, project.name)
@@ -958,11 +960,6 @@
         old_urlopen = urllib2.urlopen
         urllib2.urlopen = URLOpenerFactory
 
-        self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
-                                                      self.swift)
-        self.merge_client = zuul.merger.client.MergeClient(
-            self.config, self.sched)
-
         self.smtp_messages = []
 
         def FakeSMTPFactory(*args, **kw):
@@ -971,12 +968,16 @@
 
         # Set a changes database so multiple FakeGerrit's can report back to
         # a virtual canonical database given by the configured hostname
+        self.gerrit_queues_dbs = {
+            self.config.get('gerrit', 'server'): Queue.Queue()
+        }
         self.gerrit_changes_dbs = {
             self.config.get('gerrit', 'server'): {}
         }
 
         def FakeGerritFactory(*args, **kw):
             kw['changes_dbs'] = self.gerrit_changes_dbs
+            kw['queues_dbs'] = self.gerrit_queues_dbs
             return FakeGerrit(*args, **kw)
 
         self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
@@ -984,32 +985,23 @@
 
         self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
 
-        self.gerrit = FakeGerritTrigger(
-            self.upstream_root, self.config, self.sched)
-        self.gerrit.replication_timeout = 1.5
-        self.gerrit.replication_retry_interval = 0.5
-        self.fake_gerrit = self.gerrit.gerrit
-        self.fake_gerrit.upstream_root = self.upstream_root
-
-        self.webapp = zuul.webapp.WebApp(self.sched, port=0)
-        self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
+        self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
+                                                      self.swift)
+        self.merge_client = zuul.merger.client.MergeClient(
+            self.config, self.sched)
 
         self.sched.setLauncher(self.launcher)
         self.sched.setMerger(self.merge_client)
-        self.sched.registerTrigger(self.gerrit)
-        self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
-        self.sched.registerTrigger(self.timer)
-        self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
-                                                                self.sched)
-        self.sched.registerTrigger(self.zuultrigger)
 
-        self.sched.registerReporter(
-            zuul.reporter.gerrit.Reporter(self.gerrit))
-        self.smtp_reporter = zuul.reporter.smtp.Reporter(
-            self.config.get('smtp', 'default_from'),
-            self.config.get('smtp', 'default_to'),
-            self.config.get('smtp', 'server'))
-        self.sched.registerReporter(self.smtp_reporter)
+        self.register_sources()
+        self.fake_gerrit = self.gerrit_source.gerrit
+        self.fake_gerrit.upstream_root = self.upstream_root
+
+        self.register_triggers()
+        self.register_reporters()
+
+        self.webapp = zuul.webapp.WebApp(self.sched, port=0)
+        self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.start()
         self.sched.reconfigure(self.config)
@@ -1024,6 +1016,38 @@
         self.addCleanup(self.assertFinalState)
         self.addCleanup(self.shutdown)
 
+    def register_sources(self):
+        # Register the available sources
+        self.gerrit_source = FakeGerritSource(
+            self.upstream_root, self.config, self.sched)
+        self.gerrit_source.replication_timeout = 1.5
+        self.gerrit_source.replication_retry_interval = 0.5
+
+        self.sched.registerSource(self.gerrit_source)
+
+    def register_triggers(self):
+        # Register the available triggers
+        self.gerrit_trigger = zuul.trigger.gerrit.Gerrit(
+            self.fake_gerrit, self.config, self.sched, self.gerrit_source)
+        self.gerrit_trigger.gerrit_connector.delay = 0.0
+
+        self.sched.registerTrigger(self.gerrit_trigger)
+        self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
+        self.sched.registerTrigger(self.timer)
+        self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
+                                                                self.sched)
+        self.sched.registerTrigger(self.zuultrigger)
+
+    def register_reporters(self):
+        # Register the available reporters
+        self.sched.registerReporter(
+            zuul.reporter.gerrit.Reporter(self.fake_gerrit))
+        self.smtp_reporter = zuul.reporter.smtp.Reporter(
+            self.config.get('smtp', 'default_from'),
+            self.config.get('smtp', 'default_to'),
+            self.config.get('smtp', 'server'))
+        self.sched.registerReporter(self.smtp_reporter)
+
     def setup_config(self):
         """Per test config object. Override to set different config."""
         self.config = ConfigParser.ConfigParser()
@@ -1050,7 +1074,7 @@
         self.merge_server.join()
         self.merge_client.stop()
         self.worker.shutdown()
-        self.gerrit.stop()
+        self.gerrit_trigger.stop()
         self.timer.stop()
         self.sched.stop()
         self.sched.join()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index a257440..d99a65c 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -729,8 +729,8 @@
         self.assertEqual(self.history[6].changes,
                          '1,1 2,1 3,1 4,1 5,1 6,1 7,1')
 
-    def test_trigger_cache(self):
-        "Test that the trigger cache operates correctly"
+    def test_source_cache(self):
+        "Test that the source cache operates correctly"
         self.worker.hold_jobs_in_build = True
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -762,9 +762,9 @@
         self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
         self.waitUntilSettled()
 
-        self.log.debug("len %s" % self.gerrit._change_cache.keys())
+        self.log.debug("len %s" % self.gerrit_source._change_cache.keys())
         # there should still be changes in the cache
-        self.assertNotEqual(len(self.gerrit._change_cache.keys()), 0)
+        self.assertNotEqual(len(self.gerrit_source._change_cache.keys()), 0)
 
         self.worker.hold_jobs_in_build = False
         self.worker.release()
@@ -1469,7 +1469,7 @@
         "Test that the merger works with large changes after a repack"
         # https://bugs.launchpad.net/zuul/+bug/1078946
         # This test assumes the repo is already cloned; make sure it is
-        url = self.sched.triggers['gerrit'].getGitUrl(
+        url = self.sched.sources['gerrit'].getGitUrl(
             self.sched.layout.projects['org/project1'])
         self.merge_server.merger.addProject('org/project1', url)
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@@ -2165,6 +2165,7 @@
     def test_test_config(self):
         "Test that we can test the config"
         sched = zuul.scheduler.Scheduler()
+        sched.registerSource(None, 'gerrit')
         sched.registerTrigger(None, 'gerrit')
         sched.registerTrigger(None, 'timer')
         sched.registerTrigger(None, 'zuul')