Split the merger into a separate process

Connect it to Zuul via Gearman.  Any number of mergers may be
deployed.

Directly find the pipeline for a build when processing a result,
so that the procedure is roughly the same for build and merge
results.

The timer trigger currently requires the gerrit trigger also be
configured.  Make that explicit inside of the timer trigger so
that the scheduler API interaction with triggers is cleaner.

Change-Id: I69498813764753c97c426e42d17596c2ef1d87cf
diff --git a/NEWS.rst b/NEWS.rst
index ba6c986..bd09bfe 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -3,6 +3,15 @@
 * The push_change_refs option which specified that Zuul refs should be
   pushed to Gerrit has been removed.
 
+* Git merge operations are now performed in a separate process.  Run
+  at least one instance of the ``zuul-merger`` program which is now
+  included.  Any number of Zuul-Mergers may be run in order to
+  distribute the work of speculatively merging changes into git and
+  serving the results to test workers.  This system is designed to
+  scale out to many servers, but one instance may be co-located with
+  the Zuul server in smaller deployments.  Several configuration
+  options have moved from the ``zuul`` section to ``merger``.
+
 Since 1.3.0:
 
 * The Jenkins launcher is replaced with Gearman launcher.  An internal
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 4b7b4b0..c5beda0 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -20,6 +20,7 @@
 
    gating
    triggers
+   merger
    launchers
    reporters
    zuul
diff --git a/doc/source/merger.rst b/doc/source/merger.rst
new file mode 100644
index 0000000..4c445c6
--- /dev/null
+++ b/doc/source/merger.rst
@@ -0,0 +1,63 @@
+:title: Merger
+
+Merger
+======
+
+The Zuul Merger is a separate component which communicates with the
+main Zuul server via Gearman.  Its purpose is to speculatively merge
+the changes for Zuul in preparation for testing.  The resulting git
+commits also must be served to the test workers, and the server(s)
+running the Zuul Merger are expected to do this as well.  Because both
+of these tasks are resource intensive, any number of Zuul Mergers can
+be run in parallel on distinct hosts.
+
+Configuration
+~~~~~~~~~~~~~
+
+The Zuul Merger can read the same zuul.conf file as the main Zuul
+server and requires the ``gearman``, ``gerrit``, ``merger``, and
+``zuul`` sections (indicated fields only).  Be sure the zuul_url is
+set appropriately on each host that runs a zuul-merger.
+
+Zuul References
+~~~~~~~~~~~~~~~
+
+As the DependentPipelineManager may combine several changes together
+for testing when performing speculative execution, determining exactly
+how the workspace should be set up when running a Job can be complex.
+To alleviate this problem, Zuul performs merges itself, merging or
+cherry-picking changes as required and identifies the result with a
+Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
+Preparing the workspace is then a simple matter of fetching that ref
+and checking it out.  The parameters that provide this information are
+described in :ref:`launchers`.
+
+These references need to be made available via a Git repository that
+is available to Jenkins.  This is accomplished by serving Zuul's Git
+repositories directly.
+
+Serving Zuul Git Repos
+~~~~~~~~~~~~~~~~~~~~~~
+
+Zuul maintains its own copies of any needed Git repositories in the
+directory specified by ``git_dir`` in the ``merger`` section of
+zuul.conf (by default, /var/lib/zuul/git).  To directly serve Zuul's
+Git repositories in order to provide Zuul refs for Jenkins, you can
+configure Apache to do so using the following directives::
+
+  SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
+  SetEnv GIT_HTTP_EXPORT_ALL
+
+  AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
+  AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
+  ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
+
+And set ``push_change_refs`` to ``false`` (the default) in the
+``zuul`` section of zuul.conf.
+
+Note that Zuul's Git repositories are not bare, which means they have
+a working tree, and are not suitable for public consumption (for
+instance, a clone will produce a repository in an unpredictable state
+depending on what the state of Zuul's repository is when the clone
+happens).  They are, however, suitable for automated systems that
+respond to Zuul triggers.
diff --git a/doc/source/triggers.rst b/doc/source/triggers.rst
index 287246c..c4485bf 100644
--- a/doc/source/triggers.rst
+++ b/doc/source/triggers.rst
@@ -35,49 +35,6 @@
 be added to Gerrit.  Zuul is very flexible and can take advantage of
 those.
 
-Zuul References
-~~~~~~~~~~~~~~~
-
-As the DependentPipelineManager may combine several changes together
-for testing when performing speculative execution, determining exactly
-how the workspace should be set up when running a Job can be complex.
-To alleviate this problem, Zuul performs merges itself, merging or
-cherry-picking changes as required and identifies the result with a
-Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
-Preparing the workspace is then a simple matter of fetching that ref
-and checking it out.  The parameters that provide this information are
-described in :ref:`launchers`.
-
-These references need to be made available via a Git repository that
-is available to Jenkins.  This is accomplished by serving Zuul's Git
-repositories directly.
-
-Serving Zuul Git Repos
-""""""""""""""""""""""
-
-Zuul maintains its own copies of any needed Git repositories in the
-directory specified by ``git_dir`` in the ``zuul`` section of
-zuul.conf (by default, /var/lib/zuul/git).  To directly serve Zuul's
-Git repositories in order to provide Zuul refs for Jenkins, you can
-configure Apache to do so using the following directives::
-
-  SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
-  SetEnv GIT_HTTP_EXPORT_ALL
-
-  AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
-  AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
-  ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
-
-And set ``push_change_refs`` to ``false`` (the default) in the
-``zuul`` section of zuul.conf.
-
-Note that Zuul's Git repositories are not bare, which means they have
-a working tree, and are not suitable for public consumption (for
-instance, a clone will produce a repository in an unpredictable state
-depending on what the state of Zuul's repository is when the clone
-happens).  They are, however, suitable for automated systems that
-respond to Zuul triggers.
-
 Timer
 -----
 
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index d71912c..ee70523 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -83,21 +83,49 @@
 """"
 
 **layout_config**
-  Path to layout config file.
+  Path to layout config file.  Used by zuul-server only.
   ``layout_config=/etc/zuul/layout.yaml``
 
 **log_config**
-  Path to log config file.
+  Path to log config file.  Used by all Zuul commands.
   ``log_config=/etc/zuul/logging.yaml``
 
 **pidfile**
-  Path to PID lock file.
+  Path to PID lock file.  Used by all Zuul commands.
   ``pidfile=/var/run/zuul/zuul.pid``
 
 **state_dir**
-  Path to directory that Zuul should save state to.
+  Path to directory that Zuul should save state to.  Used by all Zuul
+  commands.
   ``state_dir=/var/lib/zuul``
 
+**report_times**
+  Boolean value (``true`` or ``false``) that determines if Zuul should
+  include elapsed times for each job in the textual report.  Used by
+  zuul-server only.
+  ``report_times=true``
+
+**status_url**
+  URL that will be posted in Zuul comments made to Gerrit changes when
+  starting jobs for a change.  Used by zuul-server only.
+  ``status_url=https://zuul.example.com/status``
+
+**url_pattern**
+  If you are storing build logs external to the system that originally
+  ran jobs and wish to link to those logs when Zuul makes comments on
+  Gerrit changes for completed jobs this setting configures what the
+  URLs for those links should be.  Used by zuul-server only.
+  ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
+
+**job_name_in_report**
+  Boolean value (``true`` or ``false``) that indicates whether the
+  job name should be included in the report (normally only the URL
+  is included).  Defaults to ``false``.  Used by zuul-server only.
+  ``job_name_in_report=true``
+
+merger
+""""""
+
 **git_dir**
   Directory that Zuul should clone local git repositories to.
   ``git_dir=/var/lib/zuul/git``
@@ -110,32 +138,10 @@
   Optional: Value to pass to `git config user.name`.
   ``git_user_name=zuul``
 
-**report_times**
-  Boolean value (``true`` or ``false``) that determines if Zuul should
-  include elapsed times for each job in the textual report.
-  ``report_times=true``
-
-**status_url**
-  URL that will be posted in Zuul comments made to Gerrit changes when
-  starting jobs for a change.
-  ``status_url=https://zuul.example.com/status``
-
-**url_pattern**
-  If you are storing build logs external to the system that originally
-  ran jobs and wish to link to those logs when Zuul makes comments on
-  Gerrit changes for completed jobs this setting configures what the
-  URLs for those links should be.
-  ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
-
-**job_name_in_report**
-  Boolean value (``true`` or ``false``) that indicates whether the
-  job name should be included in the report (normally only the URL
-  is included).  Defaults to ``false``.
-  ``job_name_in_report=true``
-
 **zuul_url**
-  URL of Zuul's git repos, accessible to test workers.  
-  Usually "http://zuul.example.com/p".
+  URL of this merger's git repos, accessible to test workers.  Usually
+  "http://zuul.example.com/p" or "http://zuul-merger01.example.com/p"
+  depending on whether the merger is co-located with the Zuul server.
 
 smtp
 """"
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index a4d1390..75c84e4 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -15,10 +15,12 @@
 log_config=/etc/zuul/logging.conf
 pidfile=/var/run/zuul/zuul.pid
 state_dir=/var/lib/zuul
+status_url=https://jenkins.example.com/zuul/status
+
+[merger]
 git_dir=/var/lib/zuul/git
 ;git_user_email=zuul@example.com
 ;git_user_name=zuul
-status_url=https://jenkins.example.com/zuul/status
 zuul_url=http://zuul.example.com/p
 
 [smtp]
diff --git a/setup.cfg b/setup.cfg
index 9ff62d6..21b1199 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@
 [entry_points]
 console_scripts =
     zuul-server = zuul.cmd.server:main
+    zuul-merger = zuul.cmd.merger:main
     zuul = zuul.cmd.client:main
 
 [build_sphinx]
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index f77e07e..bee06e4 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -8,11 +8,13 @@
 
 [zuul]
 layout_config=layout.yaml
+url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
+job_name_in_report=true
+
+[merger]
 git_dir=/tmp/zuul-test/git
 git_user_email=zuul@example.com
 git_user_name=zuul
-url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
-job_name_in_report=true
 zuul_url=http://zuul.example.com/p
 
 [smtp]
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 67c4709..b2106f8 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -47,6 +47,8 @@
 import zuul.rpclistener
 import zuul.rpcclient
 import zuul.launcher.gearman
+import zuul.merger.server
+import zuul.merger.client
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
 import zuul.trigger.gerrit
@@ -764,7 +766,7 @@
         self.upstream_root = os.path.join(self.test_root, "upstream")
         self.git_root = os.path.join(self.test_root, "git")
 
-        CONFIG.set('zuul', 'git_dir', self.git_root)
+        CONFIG.set('merger', 'git_dir', self.git_root)
         if os.path.exists(self.test_root):
             shutil.rmtree(self.test_root)
         os.makedirs(self.test_root)
@@ -804,6 +806,9 @@
         self.worker.addServer('127.0.0.1', self.gearman_server.port)
         self.gearman_server.worker = self.worker
 
+        self.merge_server = zuul.merger.server.MergeServer(self.config)
+        self.merge_server.start()
+
         self.sched = zuul.scheduler.Scheduler()
 
         def URLOpenerFactory(*args, **kw):
@@ -812,6 +817,8 @@
 
         urllib2.urlopen = URLOpenerFactory
         self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
+        self.merge_client = zuul.merger.client.MergeClient(
+            self.config, self.sched)
 
         self.smtp_messages = []
 
@@ -833,6 +840,7 @@
         self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.setLauncher(self.launcher)
+        self.sched.setMerger(self.merge_client)
         self.sched.registerTrigger(self.gerrit)
         self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
         self.sched.registerTrigger(self.timer)
@@ -873,6 +881,9 @@
     def shutdown(self):
         self.log.debug("Shutting down after tests")
         self.launcher.stop()
+        self.merge_server.stop()
+        self.merge_server.join()
+        self.merge_client.stop()
         self.worker.shutdown()
         self.gearman_server.shutdown()
         self.gerrit.stop()
@@ -991,13 +1002,15 @@
             done = True
             for connection in self.gearman_server.active_connections:
                 if (connection.functions and
-                    connection.client_id != 'Zuul RPC Listener'):
+                    connection.client_id not in ['Zuul RPC Listener',
+                                                 'Zuul Merger']):
                     done = False
             if done:
                 break
             time.sleep(0)
         self.gearman_server.functions = set()
         self.rpc.register()
+        self.merge_server.register()
 
     def haveAllBuildsReported(self):
         # See if Zuul is waiting on a meta job to complete
@@ -1087,6 +1100,7 @@
                 if (self.sched.trigger_event_queue.empty() and
                     self.sched.result_event_queue.empty() and
                     self.fake_gerrit.event_queue.empty() and
+                    not self.merge_client.build_sets and
                     self.areAllBuildsWaiting()):
                     self.sched.run_handler_lock.release()
                     self.worker.lock.release()
@@ -2357,7 +2371,7 @@
         # This test assumes the repo is already cloned; make sure it is
         url = self.sched.triggers['gerrit'].getGitUrl(
             self.sched.layout.projects['org/project1'])
-        self.sched.merger.addProject('org/project1', url)
+        self.merge_server.merger.addProject('org/project1', url)
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         A.addPatchset(large=True)
         path = os.path.join(self.upstream_root, "org/project1")
@@ -2479,7 +2493,7 @@
 
     def test_zuul_url_return(self):
         "Test if ZUUL_URL is returning when zuul_url is set in zuul.conf"
-        self.assertTrue(self.sched.config.has_option('zuul', 'zuul_url'))
+        self.assertTrue(self.sched.config.has_option('merger', 'zuul_url'))
         self.worker.hold_jobs_in_build = True
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
new file mode 100644
index 0000000..e9722cf
--- /dev/null
+++ b/zuul/cmd/merger.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import ConfigParser
+import daemon
+import extras
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+import logging
+import logging.config
+import os
+import sys
+import signal
+import traceback
+
+# No zuul imports here because they pull in paramiko which must not be
+# imported until after the daemonization.
+# https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
+
+
+def stack_dump_handler(signum, frame):
+    signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+    log_str = ""
+    for thread_id, stack_frame in sys._current_frames().items():
+        log_str += "Thread: %s\n" % thread_id
+        log_str += "".join(traceback.format_stack(stack_frame))
+    log = logging.getLogger("zuul.stack_dump")
+    log.debug(log_str)
+    signal.signal(signal.SIGUSR2, stack_dump_handler)
+
+
+class Merger(object):
+    def __init__(self):
+        self.args = None
+        self.config = None
+
+    def parse_arguments(self):
+        parser = argparse.ArgumentParser(description='Zuul merge worker.')
+        parser.add_argument('-c', dest='config',
+                            help='specify the config file')
+        parser.add_argument('-d', dest='nodaemon', action='store_true',
+                            help='do not run as a daemon')
+        parser.add_argument('--version', dest='version', action='store_true',
+                            help='show zuul version')
+        self.args = parser.parse_args()
+
+    def read_config(self):
+        self.config = ConfigParser.ConfigParser()
+        if self.args.config:
+            locations = [self.args.config]
+        else:
+            locations = ['/etc/zuul/zuul.conf',
+                         '~/zuul.conf']
+        for fp in locations:
+            if os.path.exists(os.path.expanduser(fp)):
+                self.config.read(os.path.expanduser(fp))
+                return
+        raise Exception("Unable to locate config file in %s" % locations)
+
+    def setup_logging(self, section, parameter):
+        if self.config.has_option(section, parameter):
+            fp = os.path.expanduser(self.config.get(section, parameter))
+            if not os.path.exists(fp):
+                raise Exception("Unable to read logging config file at %s" %
+                                fp)
+            logging.config.fileConfig(fp)
+        else:
+            logging.basicConfig(level=logging.DEBUG)
+
+    def exit_handler(self, signum, frame):
+        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+        self.merger.stop()
+        self.merger.join()
+
+    def main(self):
+        # See comment at top of file about zuul imports
+        import zuul.merger.server
+
+        self.setup_logging('zuul', 'log_config')
+
+        self.merger = zuul.merger.server.MergeServer(self.config)
+        self.merger.start()
+
+        signal.signal(signal.SIGUSR1, self.exit_handler)
+        signal.signal(signal.SIGUSR2, stack_dump_handler)
+        while True:
+            try:
+                signal.pause()
+            except KeyboardInterrupt:
+                print "Ctrl + C: asking merger to exit nicely...\n"
+                self.exit_handler(signal.SIGINT, None)
+
+
+def main():
+    server = Merger()
+    server.parse_arguments()
+
+    if server.args.version:
+        from zuul.version import version_info as zuul_version_info
+        print "Zuul version: %s" % zuul_version_info.version_string()
+        sys.exit(0)
+
+    server.read_config()
+
+    if server.config.has_option('zuul', 'state_dir'):
+        state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
+    else:
+        state_dir = '/var/lib/zuul'
+    test_fn = os.path.join(state_dir, 'test')
+    try:
+        f = open(test_fn, 'w')
+        f.close()
+        os.unlink(test_fn)
+    except Exception:
+        print
+        print "Unable to write to state directory: %s" % state_dir
+        print
+        raise
+
+    if server.config.has_option('zuul', 'pidfile'):
+        pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
+    else:
+        pid_fn = '/var/run/zuul/merger.pid'
+    pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+    if server.args.nodaemon:
+        server.main()
+    else:
+        with daemon.DaemonContext(pidfile=pid):
+            server.main()
+
+
+if __name__ == "__main__":
+    sys.path.insert(0, '.')
+    main()
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 7901535..5d83959 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -172,6 +172,7 @@
         # See comment at top of file about zuul imports
         import zuul.scheduler
         import zuul.launcher.gearman
+        import zuul.merger.client
         import zuul.reporter.gerrit
         import zuul.reporter.smtp
         import zuul.trigger.gerrit
@@ -188,6 +189,7 @@
         self.sched = zuul.scheduler.Scheduler()
 
         gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
+        merger = zuul.merger.client.MergeClient(self.config, self.sched)
         gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
         timer = zuul.trigger.timer.Timer(self.config, self.sched)
         webapp = zuul.webapp.WebApp(self.sched)
@@ -205,6 +207,7 @@
         )
 
         self.sched.setLauncher(gearman)
+        self.sched.setMerger(merger)
         self.sched.registerTrigger(gerrit)
         self.sched.registerTrigger(timer)
         self.sched.registerReporter(gerrit_reporter)
@@ -243,21 +246,6 @@
             path = None
         sys.exit(server.test_config(path))
 
-    if server.config.has_option('zuul', 'state_dir'):
-        state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
-    else:
-        state_dir = '/var/lib/zuul'
-    test_fn = os.path.join(state_dir, 'test')
-    try:
-        f = open(test_fn, 'w')
-        f.close()
-        os.unlink(test_fn)
-    except:
-        print
-        print "Unable to write to state directory: %s" % state_dir
-        print
-        raise
-
     if server.config.has_option('zuul', 'pidfile'):
         pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
     else:
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 3500445..37fc743 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -155,7 +155,6 @@
         self.sched = sched
         self.builds = {}
         self.meta_jobs = {}  # A list of meta-jobs like stop or describe
-        self.zuul_server = config.get('zuul', 'zuul_url')
 
         server = config.get('gearman', 'server')
         if config.has_option('gearman', 'port'):
@@ -226,7 +225,7 @@
         params = dict(ZUUL_UUID=uuid,
                       ZUUL_PROJECT=item.change.project.name)
         params['ZUUL_PIPELINE'] = pipeline.name
-        params['ZUUL_URL'] = self.zuul_server
+        params['ZUUL_URL'] = item.current_build_set.zuul_url
         if hasattr(item.change, 'refspec'):
             changes_str = '^'.join(
                 ['%s:%s:%s' % (i.change.project.name, i.change.branch,
diff --git a/zuul/merger/__init__.py b/zuul/merger/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/merger/__init__.py
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
new file mode 100644
index 0000000..72fd4c5
--- /dev/null
+++ b/zuul/merger/client.py
@@ -0,0 +1,117 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+from uuid import uuid4
+
+import gear
+
+
+def getJobData(job):
+    if not len(job.data):
+        return {}
+    d = job.data[-1]
+    if not d:
+        return {}
+    return json.loads(d)
+
+
+class MergeGearmanClient(gear.Client):
+    def __init__(self, merge_client):
+        super(MergeGearmanClient, self).__init__()
+        self.__merge_client = merge_client
+
+    def handleWorkComplete(self, packet):
+        job = super(MergeGearmanClient, self).handleWorkComplete(packet)
+        self.__merge_client.onBuildCompleted(job)
+        return job
+
+    def handleWorkFail(self, packet):
+        job = super(MergeGearmanClient, self).handleWorkFail(packet)
+        self.__merge_client.onBuildCompleted(job)
+        return job
+
+    def handleWorkException(self, packet):
+        job = super(MergeGearmanClient, self).handleWorkException(packet)
+        self.__merge_client.onBuildCompleted(job)
+        return job
+
+    def handleDisconnect(self, job):
+        job = super(MergeGearmanClient, self).handleDisconnect(job)
+        self.__merge_client.onBuildCompleted(job)
+
+
+class MergeClient(object):
+    log = logging.getLogger("zuul.MergeClient")
+
+    def __init__(self, config, sched):
+        self.config = config
+        self.sched = sched
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+        self.gearman = MergeGearmanClient(self)
+        self.gearman.addServer(server, port)
+        self.log.debug("Waiting for gearman")
+        self.gearman.waitForServer()
+        self.build_sets = {}
+
+    def stop(self):
+        self.gearman.shutdown()
+
+    def areMergesOutstanding(self):
+        if self.build_sets:
+            return True
+        return False
+
+    def submitJob(self, name, data, build_set):
+        uuid = str(uuid4().hex)
+        self.log.debug("Submitting job %s with data %s" % (name, data))
+        job = gear.Job(name,
+                       json.dumps(data),
+                       unique=uuid)
+        self.build_sets[uuid] = build_set
+        self.gearman.submitJob(job)
+
+    def mergeChanges(self, items, build_set):
+        data = dict(items=items)
+        self.submitJob('merger:merge', data, build_set)
+
+    def updateRepo(self, project, url, build_set):
+        data = dict(project=project,
+                    url=url)
+        self.submitJob('merger:update', data, build_set)
+
+    def onBuildCompleted(self, job):
+        build_set = self.build_sets.get(job.unique)
+        if build_set:
+            data = getJobData(job)
+            zuul_url = data.get('zuul_url')
+            merged = data.get('merged', False)
+            updated = data.get('updated', False)
+            commit = data.get('commit')
+            self.log.info("Merge %s complete, merged: %s, updated: %s, "
+                          "commit: %s" %
+                          (job, merged, updated, build_set.commit))
+            self.sched.onMergeCompleted(build_set, zuul_url,
+                                        merged, updated, commit)
+            # The test suite expects the build_set to be removed from
+            # the internal dict after the wake flag is set.
+            del self.build_sets[job.unique]
+        else:
+            self.log.error("Unable to find build set for uuid %s" % job.unique)
diff --git a/zuul/merger.py b/zuul/merger/merger.py
similarity index 98%
rename from zuul/merger.py
rename to zuul/merger/merger.py
index 6efffdb..13dd122 100644
--- a/zuul/merger.py
+++ b/zuul/merger/merger.py
@@ -1,4 +1,5 @@
 # Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License"); you may
 # not use this file except in compliance with the License. You may obtain
@@ -201,9 +202,9 @@
         repo = self.getRepo(item['project'], item['url'])
         try:
             repo.checkout(ref)
-        except:
+        except Exception:
             self.log.exception("Unable to checkout %s" % ref)
-            return False
+            return None
 
         try:
             mode = item['merge_mode']
@@ -219,7 +220,7 @@
             # Log exceptions at debug level because they are
             # usually benign merge conflicts
             self.log.debug("Unable to merge %s" % item, exc_info=True)
-            return False
+            return None
 
         return commit
 
@@ -256,6 +257,8 @@
             self.log.debug("Found base commit %s for %s" % (base, key,))
         # Merge the change
         commit = self._mergeChange(item, base)
+        if not commit:
+            return None
         # Store this commit as the most recent for this project-branch
         recent[key] = commit
         # Set the Zuul ref for this item to point to the most recent
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
new file mode 100644
index 0000000..5d52041
--- /dev/null
+++ b/zuul/merger/server.py
@@ -0,0 +1,115 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import threading
+import traceback
+
+import gear
+
+import merger
+
+
+class MergeServer(object):
+    log = logging.getLogger("zuul.MergeServer")
+
+    def __init__(self, config):
+        self.config = config
+        self.zuul_url = config.get('merger', 'zuul_url')
+
+        if self.config.has_option('merger', 'git_dir'):
+            merge_root = self.config.get('merger', 'git_dir')
+        else:
+            merge_root = '/var/lib/zuul/git'
+
+        if self.config.has_option('merger', 'git_user_email'):
+            merge_email = self.config.get('merger', 'git_user_email')
+        else:
+            merge_email = None
+
+        if self.config.has_option('merger', 'git_user_name'):
+            merge_name = self.config.get('merger', 'git_user_name')
+        else:
+            merge_name = None
+
+        if self.config.has_option('gerrit', 'sshkey'):
+            sshkey = self.config.get('gerrit', 'sshkey')
+        else:
+            sshkey = None
+
+        self.merger = merger.Merger(merge_root, sshkey,
+                                    merge_email, merge_name)
+
+    def start(self):
+        self._running = True
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = gear.Worker('Zuul Merger')
+        self.worker.addServer(server, port)
+        self.thread = threading.Thread(target=self.run)
+        self.thread.daemon = True
+        self.thread.start()
+        self.worker.waitForServer()
+        self.register()
+
+    def register(self):
+        self.worker.registerFunction("merger:merge")
+        self.worker.registerFunction("merger:update")
+
+    def stop(self):
+        self.log.debug("Stopping")
+        self._running = False
+        self.worker.shutdown()
+        self.log.debug("Stopped")
+
+    def join(self):
+        self.thread.join()
+
+    def run(self):
+        self.log.debug("Starting merge listener")
+        while self._running:
+            try:
+                job = self.worker.getJob()
+                try:
+                    if job.name == 'merger:merge':
+                        self.merge(job)
+                    elif job.name == 'merger:update':
+                        self.update(job)
+                    else:
+                        self.log.error("Unable to handle job %s" % job.name)
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(traceback.format_exc())
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def merge(self, job):
+        args = json.loads(job.arguments)
+        commit = self.merger.mergeChanges(args['items'])
+        result = dict(merged=(commit is not None),
+                      commit=commit,
+                      zuul_url=self.zuul_url)
+        job.sendWorkComplete(json.dumps(result))
+
+    def update(self, job):
+        args = json.loads(job.arguments)
+        self.merger.updateRepo(args['project'], args['url'])
+        result = dict(updated=True,
+                      zuul_url=self.zuul_url)
+        job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/model.py b/zuul/model.py
index 5da9cef..2a52306 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -633,6 +633,11 @@
 
 
 class BuildSet(object):
+    # Merge states:
+    NEW = 1
+    PENDING = 2
+    COMPLETE = 3
+
     def __init__(self, item):
         self.item = item
         self.other_changes = []
@@ -642,9 +647,11 @@
         self.previous_build_set = None
         self.ref = None
         self.commit = None
+        self.zuul_url = None
         self.unable_to_merge = False
         self.unable_to_merge_message = None
         self.failing_reasons = []
+        self.merge_state = self.NEW
 
     def setConfiguration(self):
         # The change isn't enqueued until after it's created
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 805d334..eaa5eae 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,7 +30,6 @@
 import layoutvalidator
 import model
 from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
-import merger
 from zuul import version as zuul_version
 
 statsd = extras.try_import('statsd.statsd')
@@ -136,6 +135,24 @@
         self.build = build
 
 
+class MergeCompletedEvent(ResultEvent):
+    """A remote merge operation has completed
+
+    :arg BuildSet build_set: The build_set which is ready.
+    :arg str zuul_url: The URL of the Zuul Merger.
+    :arg bool merged: Whether the merge succeeded (changes with refs).
+    :arg bool updated: Whether the repo was updated (changes without refs).
+    :arg str commit: The SHA of the merged commit (changes with refs).
+    """
+
+    def __init__(self, build_set, zuul_url, merged, updated, commit):
+        self.build_set = build_set
+        self.zuul_url = zuul_url
+        self.merged = merged
+        self.updated = updated
+        self.commit = commit
+
+
 class Scheduler(threading.Thread):
     log = logging.getLogger("zuul.Scheduler")
 
@@ -149,6 +166,7 @@
         self._exit = False
         self._stopped = False
         self.launcher = None
+        self.merger = None
         self.triggers = dict()
         self.reporters = dict()
         self.config = None
@@ -379,36 +397,12 @@
 
         return layout
 
-    def _setupMerger(self):
-        if self.config.has_option('zuul', 'git_dir'):
-            merge_root = self.config.get('zuul', 'git_dir')
-        else:
-            merge_root = '/var/lib/zuul/git'
-
-        if self.config.has_option('zuul', 'git_user_email'):
-            merge_email = self.config.get('zuul', 'git_user_email')
-        else:
-            merge_email = None
-
-        if self.config.has_option('zuul', 'git_user_name'):
-            merge_name = self.config.get('zuul', 'git_user_name')
-        else:
-            merge_name = None
-
-        if self.config.has_option('gerrit', 'sshkey'):
-            sshkey = self.config.get('gerrit', 'sshkey')
-        else:
-            sshkey = None
-
-        # TODO: The merger should have an upstream repo independent of
-        # triggers, and then each trigger should provide a fetch
-        # location.
-        self.merger = merger.Merger(merge_root, sshkey,
-                                    merge_email, merge_name)
-
     def setLauncher(self, launcher):
         self.launcher = launcher
 
+    def setMerger(self, merger):
+        self.merger = merger
+
     def registerTrigger(self, trigger, name=None):
         if name is None:
             name = trigger.name
@@ -468,6 +462,14 @@
         self.wake_event.set()
         self.log.debug("Done adding complete event for build: %s" % build)
 
+    def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
+        self.log.debug("Adding merge complete event for build set: %s" %
+                       build_set)
+        event = MergeCompletedEvent(build_set, zuul_url,
+                                    merged, updated, commit)
+        self.result_event_queue.put(event)
+        self.wake_event.set()
+
     def reconfigure(self, config):
         self.log.debug("Prepare to reconfigure")
         event = ReconfigureEvent(config)
@@ -594,7 +596,6 @@
                 new_pipeline.manager.building_jobs = \
                     old_pipeline.manager.building_jobs
             self.layout = layout
-            self._setupMerger()
             for trigger in self.triggers.values():
                 trigger.postConfig()
             if statsd:
@@ -651,6 +652,8 @@
     def _areAllBuildsComplete(self):
         self.log.debug("Checking if all builds are complete")
         waiting = False
+        if self.merger.areMergesOutstanding():
+            waiting = True
         for pipeline in self.layout.pipelines.values():
             for build in pipeline.manager.building_jobs.keys():
                 self.log.debug("%s waiting on %s" % (pipeline.manager, build))
@@ -672,6 +675,7 @@
             self.wake_event.wait()
             self.wake_event.clear()
             if self._stopped:
+                self.log.debug("Run handler stopping")
                 return
             self.log.debug("Run handler awake")
             self.run_handler_lock.acquire()
@@ -728,17 +732,6 @@
                 self.log.warning("Project %s not found" % event.project_name)
                 return
 
-            # Preprocessing for ref-update events
-            if event.ref:
-                # Make sure the local git repo is up-to-date with the
-                # remote one.  We better have the new ref before
-                # enqueuing the changes.  This is done before
-                # enqueuing the changes to avoid calling an update per
-                # pipeline accepting the change.
-                self.log.info("Fetching references for %s" % project)
-                url = self.triggers['gerrit'].getGitUrl(project)
-                self.merger.updateRepo(project.name, url)
-
             for pipeline in self.layout.pipelines.values():
                 change = event.getChange(project,
                                          self.triggers.get(event.trigger_name))
@@ -776,24 +769,50 @@
                 self._doBuildStartedEvent(event)
             elif isinstance(event, BuildCompletedEvent):
                 self._doBuildCompletedEvent(event)
+            elif isinstance(event, MergeCompletedEvent):
+                self._doMergeCompletedEvent(event)
             else:
                 self.log.error("Unable to handle event %s" % event)
         finally:
             self.result_event_queue.task_done()
 
     def _doBuildStartedEvent(self, event):
-        for pipeline in self.layout.pipelines.values():
-            if pipeline.manager.onBuildStarted(event.build):
-                return
-        self.log.warning("Build %s not found by any queue manager" %
-                         (event.build))
+        build = event.build
+        if build.build_set is not build.build_set.item.current_build_set:
+            self.log.warning("Build %s is not in the current build set" %
+                             (build,))
+            return
+        pipeline = build.build_set.item.pipeline
+        if not pipeline:
+            self.log.warning("Build %s is not associated with a pipeline" %
+                             (build,))
+            return
+        pipeline.manager.onBuildStarted(event.build)
 
     def _doBuildCompletedEvent(self, event):
-        for pipeline in self.layout.pipelines.values():
-            if pipeline.manager.onBuildCompleted(event.build):
-                return
-        self.log.warning("Build %s not found by any queue manager" %
-                         (event.build))
+        build = event.build
+        if build.build_set is not build.build_set.item.current_build_set:
+            self.log.warning("Build %s is not in the current build set" %
+                             (build,))
+            return
+        pipeline = build.build_set.item.pipeline
+        if not pipeline:
+            self.log.warning("Build %s is not associated with a pipeline" %
+                             (build,))
+            return
+        pipeline.manager.onBuildCompleted(event.build)
+
+    def _doMergeCompletedEvent(self, event):
+        build_set = event.build_set
+        if build_set is not build_set.item.current_build_set:
+            self.log.warning("Build set %s is not current" % (build_set,))
+            return
+        pipeline = build_set.item.pipeline
+        if not pipeline:
+            self.log.warning("Build set %s is not associated with a pipeline" %
+                             (build_set,))
+            return
+        pipeline.manager.onMergeCompleted(event)
 
     def formatStatusHTML(self):
         ret = '<html><pre>'
@@ -1083,7 +1102,7 @@
         # Create a dictionary with all info about the item needed by
         # the merger.
         return dict(project=item.change.project.name,
-                    url=self.sched.triggers['gerrit'].getGitUrl(
+                    url=self.pipeline.trigger.getGitUrl(
                         item.change.project),
                     merge_mode=item.change.project.merge_mode,
                     refspec=item.change.refspec,
@@ -1092,26 +1111,28 @@
                     )
 
     def prepareRef(self, item):
-        # Returns False on success.
-        # Returns True if we were unable to prepare the ref.
-        ref = item.current_build_set.ref
+        # Returns True if the ref is ready, false otherwise
+        build_set = item.current_build_set
+        if build_set.merge_state == build_set.COMPLETE:
+            return True
+        if build_set.merge_state == build_set.PENDING:
+            return False
+        build_set.merge_state = build_set.PENDING
+        ref = build_set.ref
         if hasattr(item.change, 'refspec') and not ref:
             self.log.debug("Preparing ref for: %s" % item.change)
             item.current_build_set.setConfiguration()
-            ref = item.current_build_set.ref
             dependent_items = self.getDependentItems(item)
             dependent_items.reverse()
             all_items = dependent_items + [item]
             merger_items = map(self._makeMergerItem, all_items)
-            commit = self.sched.merger.mergeChanges(merger_items)
-            item.current_build_set.commit = commit
-            if not commit:
-                self.log.info("Unable to merge change %s" % item.change)
-                msg = ("This change was unable to be automatically merged "
-                       "with the current state of the repository. Please "
-                       "rebase your change and upload a new patchset.")
-                self.pipeline.setUnableToMerge(item, msg)
-                return True
+            self.sched.merger.mergeChanges(merger_items,
+                                           item.current_build_set)
+        else:
+            self.log.debug("Preparing update repo for: %s" % item.change)
+            url = self.pipeline.trigger.getGitUrl(item.change.project)
+            self.sched.merger.updateRepo(item.change.project.name,
+                                         url, build_set)
         return False
 
     def _launchJobs(self, item, jobs):
@@ -1164,7 +1185,7 @@
                 canceled = True
         return canceled
 
-    def _processOneItem(self, item, nnfi):
+    def _processOneItem(self, item, nnfi, ready_ahead):
         changed = False
         item_ahead = item.item_ahead
         change_queue = self.pipeline.getQueue(item.change.project)
@@ -1181,10 +1202,11 @@
                 self.reportItem(item)
             except MergeFailure:
                 pass
-            return (True, nnfi)
+            return (True, nnfi, ready_ahead)
         dep_item = self.getFailingDependentItem(item)
         actionable = change_queue.isActionable(item)
         item.active = actionable
+        ready = False
         if dep_item:
             failing_reasons.append('a needed change is failing')
             self.cancelJobs(item, prime=False)
@@ -1204,10 +1226,13 @@
                 changed = True
                 self.cancelJobs(item)
             if actionable:
-                self.prepareRef(item)
+                ready = self.prepareRef(item)
                 if item.current_build_set.unable_to_merge:
                     failing_reasons.append("it has a merge conflict")
-        if actionable and self.launchJobs(item):
+                    ready = False
+        if not ready:
+            ready_ahead = False
+        if actionable and ready_ahead and self.launchJobs(item):
             changed = True
         if self.pipeline.didAnyJobFail(item):
             failing_reasons.append("at least one job failed")
@@ -1229,7 +1254,7 @@
         if failing_reasons:
             self.log.debug("%s is a failing item because %s" %
                            (item, failing_reasons))
-        return (changed, nnfi)
+        return (changed, nnfi, ready_ahead)
 
     def processQueue(self):
         # Do whatever needs to be done for each change in the queue
@@ -1238,8 +1263,10 @@
         for queue in self.pipeline.queues:
             queue_changed = False
             nnfi = None  # Nearest non-failing item
+            ready_ahead = True  # All build sets ahead are ready
             for item in queue.queue[:]:
-                item_changed, nnfi = self._processOneItem(item, nnfi)
+                item_changed, nnfi, ready_ahhead = self._processOneItem(
+                    item, nnfi, ready_ahead)
                 if item_changed:
                     queue_changed = True
                 self.reportStats(item)
@@ -1294,6 +1321,22 @@
         self.updateBuildDescriptions(build.build_set)
         return True
 
+    def onMergeCompleted(self, event):
+        build_set = event.build_set
+        item = build_set.item
+        build_set.merge_state = build_set.COMPLETE
+        build_set.zuul_url = event.zuul_url
+        if event.merged:
+            build_set.commit = event.commit
+        elif event.updated:
+            build_set.commit = item.change.newrev
+        if not build_set.commit:
+            self.log.info("Unable to merge change %s" % item.change)
+            msg = ("This change was unable to be automatically merged "
+                   "with the current state of the repository. Please "
+                   "rebase your change and upload a new patchset.")
+            self.pipeline.setUnableToMerge(item, msg)
+
     def reportItem(self, item):
         if item.reported:
             raise Exception("Already reported change %s" % item.change)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index f055a50..904fa7a 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -86,7 +86,8 @@
         raise Exception("Timer trigger does not support changes.")
 
     def getGitUrl(self, project):
-        pass
+        # For the moment, the timer trigger requires gerrit.
+        return self.sched.triggers['gerrit'].getGitUrl(project)
 
     def getGitwebUrl(self, project, sha=None):
         url = '%s/gitweb?p=%s.git' % (self.baseurl, project)