Merge "Update trigger script for new zuul url parameter"
diff --git a/NEWS.rst b/NEWS.rst
index 52f5c4f..bd09bfe 100644
--- a/NEWS.rst
+++ b/NEWS.rst
@@ -1,9 +1,16 @@
 Since 2.0.0:
 
 * The push_change_refs option which specified that Zuul refs should be
-  pushed to Gerrit has been removed.  Similar functionality may be
-  obtained using the replication feature.  See the Triggers
-  documentation for details.
+  pushed to Gerrit has been removed.
+
+* Git merge operations are now performed in a separate process.  Run
+  at least one instance of the ``zuul-merger`` program which is now
+  included.  Any number of Zuul-Mergers may be run in order to
+  distribute the work of speculatively merging changes into git and
+  serving the results to test workers.  This system is designed to
+  scale out to many servers, but one instance may be co-located with
+  the Zuul server in smaller deployments.  Several configuration
+  options have moved from the ``zuul`` section to ``merger``.
 
 Since 1.3.0:
 
diff --git a/doc/source/gating.rst b/doc/source/gating.rst
index f3f2d3c..43a5928 100644
--- a/doc/source/gating.rst
+++ b/doc/source/gating.rst
@@ -28,6 +28,9 @@
 Zuul was designed to handle the workflow of the OpenStack project, but
 can be used with any project.
 
+Testing in parallel
+-------------------
+
 A particular focus of Zuul is ensuring correctly ordered testing of
 changes in parallel.  A gating system should always test each change
 applied to the tip of the branch exactly as it is going to be merged.
@@ -208,3 +211,72 @@
     }
   }
 
+
+Cross projects dependencies
+---------------------------
+
+When your projects are closely coupled together, you want to make sure
+changes entering the gate are going to be tested with the version of
+other projects currently enqueued in the gate (since they will
+eventually be merged and might introduce breaking features).
+
+Such dependencies can be defined in Zuul configuration by registering a job
+in a DependentPipeline of several projects. Whenever a change enters such a
+pipeline, it will create references for the other projects as well.  As an
+example, given a main project ``acme`` and a plugin ``plugin`` you can
+define a job ``acme-tests`` which should be run for both projects:
+
+.. code-block:: yaml
+
+  pipelines:
+    - name: gate
+      manager: DependentPipelineManager
+
+  projects::
+    - name: acme
+      gate:
+       - acme-tests
+    - name: plugin
+      gate:
+       - acme-tests  # Register job again
+
+Whenever a change enters the ``gate`` pipeline queue, Zuul creates a reference
+for it.  For each subsequent change, an additional reference is created for the
+changes ahead in the queue.  As a result, you will always be able to fetch the
+future state of your project dependencies for each change in the queue.
+
+Based on the pipeline and project definitions above, three changes are
+inserted in the ``gate`` pipeline with the associated references:
+
+  ========  ======= ====== =========
+  Change    Project Branch Zuul Ref.
+  ========  ======= ====== =========
+  Change 1  acme    master master/Z1
+  Change 2  plugin  stable stable/Z2
+  Change 3  plugin  master master/Z3
+  ========  ======= ====== =========
+
+Since the changes enter a DependentPipelineManager pipeline, Zuul creates
+additional references:
+
+  ====== ======= ========= =============================
+  Change Project Zuul Ref. Description
+  ====== ======= ========= =============================
+  1      acme    master/Z1 acme master + change 1
+  ------ ------- --------- -----------------------------
+  2      acme    master/Z2 acme master + change 1
+  2      plugin  stable/Z2 plugin stable + change 2
+  ------ ------- --------- -----------------------------
+  3      acme    master/Z3 acme master + change 1
+  3      plugin  stable/Z3 plugin stable + change 2
+  3      plugin  master/Z3 plugin master + change 3
+  ====== ======= ========= =============================
+
+In order to test change 3, you would clone both repositories and simply
+fetch the Z3 reference for each combination of project/branch you are
+interested in testing. For example, you could fetch ``acme`` with
+master/Z3 and ``plugin`` with master/Z3 and thus have ``acme`` with
+change 1 applied as the expected state for when Change 3 would merge.
+When your job fetches several repositories without changes ahead in the
+queue, they may not have a Z reference in which case you can just check
+out the branch.
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 4b7b4b0..c5beda0 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -20,6 +20,7 @@
 
    gating
    triggers
+   merger
    launchers
    reporters
    zuul
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c56d6e9..db49933 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -87,8 +87,8 @@
 **ZUUL_PIPELINE**
   The Zuul pipeline that is building this job
 **ZUUL_URL**
-  The url for the zuul server as configured in zuul.conf.  
-  A test runner may use this URL as the basis for fetching 
+  The url for the zuul server as configured in zuul.conf.
+  A test runner may use this URL as the basis for fetching
   git commits.
 
 The following additional parameters will only be provided for builds
@@ -195,6 +195,30 @@
   The URL with the status or results of the build.  Will be used in
   the status page and the final report.
 
+To help with debugging builds a worker may send back some optional
+metadata:
+
+**worker_name** (optional)
+  The name of the worker.
+
+**worker_hostname** (optional)
+  The hostname of the worker.
+
+**worker_ips** (optional)
+  A list of IPs for the worker.
+
+**worker_fqdn** (optional)
+  The FQDN of the worker.
+
+**worker_program** (optional)
+  The program name of the worker. For example Jenkins or turbo-hipster.
+
+**worker_version** (optional)
+  The version of the software running the job.
+
+**worker_extra** (optional)
+  A dictionary of any extra metadata you may want to pass along.
+
 It should then immediately send a WORK_STATUS packet with a value of 0
 percent complete.  It may then optionally send subsequent WORK_STATUS
 packets with updated completion values.
diff --git a/doc/source/merger.rst b/doc/source/merger.rst
new file mode 100644
index 0000000..4c445c6
--- /dev/null
+++ b/doc/source/merger.rst
@@ -0,0 +1,63 @@
+:title: Merger
+
+Merger
+======
+
+The Zuul Merger is a separate component which communicates with the
+main Zuul server via Gearman.  Its purpose is to speculatively merge
+the changes for Zuul in preparation for testing.  The resulting git
+commits also must be served to the test workers, and the server(s)
+running the Zuul Merger are expected to do this as well.  Because both
+of these tasks are resource intensive, any number of Zuul Mergers can
+be run in parallel on distinct hosts.
+
+Configuration
+~~~~~~~~~~~~~
+
+The Zuul Merger can read the same zuul.conf file as the main Zuul
+server and requires the ``gearman``, ``gerrit``, ``merger``, and
+``zuul`` sections (indicated fields only).  Be sure the zuul_url is
+set appropriately on each host that runs a zuul-merger.
+
+Zuul References
+~~~~~~~~~~~~~~~
+
+As the DependentPipelineManager may combine several changes together
+for testing when performing speculative execution, determining exactly
+how the workspace should be set up when running a Job can be complex.
+To alleviate this problem, Zuul performs merges itself, merging or
+cherry-picking changes as required and identifies the result with a
+Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
+Preparing the workspace is then a simple matter of fetching that ref
+and checking it out.  The parameters that provide this information are
+described in :ref:`launchers`.
+
+These references need to be made available via a Git repository that
+is available to Jenkins.  This is accomplished by serving Zuul's Git
+repositories directly.
+
+Serving Zuul Git Repos
+~~~~~~~~~~~~~~~~~~~~~~
+
+Zuul maintains its own copies of any needed Git repositories in the
+directory specified by ``git_dir`` in the ``merger`` section of
+zuul.conf (by default, /var/lib/zuul/git).  To directly serve Zuul's
+Git repositories in order to provide Zuul refs for Jenkins, you can
+configure Apache to do so using the following directives::
+
+  SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
+  SetEnv GIT_HTTP_EXPORT_ALL
+
+  AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
+  AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
+  ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
+
+And set ``push_change_refs`` to ``false`` (the default) in the
+``zuul`` section of zuul.conf.
+
+Note that Zuul's Git repositories are not bare, which means they have
+a working tree, and are not suitable for public consumption (for
+instance, a clone will produce a repository in an unpredictable state
+depending on what the state of Zuul's repository is when the clone
+happens).  They are, however, suitable for automated systems that
+respond to Zuul triggers.
diff --git a/doc/source/triggers.rst b/doc/source/triggers.rst
index 21aab13..c4485bf 100644
--- a/doc/source/triggers.rst
+++ b/doc/source/triggers.rst
@@ -35,79 +35,6 @@
 be added to Gerrit.  Zuul is very flexible and can take advantage of
 those.
 
-Zuul References
-~~~~~~~~~~~~~~~
-
-As the DependentPipelineManager may combine several changes together
-for testing when performing speculative execution, determining exactly
-how the workspace should be set up when running a Job can be complex.
-To alleviate this problem, Zuul performs merges itself, merging or
-cherry-picking changes as required and identifies the result with a
-Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
-Preparing the workspace is then a simple matter of fetching that ref
-and checking it out.  The parameters that provide this information are
-described in :ref:`launchers`.
-
-These references need to be made available via a Git repository that
-is available to Jenkins.  You may accomplish this by either serving
-Zuul's git repositories directly, allowing Zuul to push the references
-back to Gerrit, or pushing the references to a third location.
-Instructions for each of these alternatives are in the following
-sections.
-
-Serving Zuul Git Repos
-""""""""""""""""""""""
-
-Zuul maintains its own copies of any needed Git repositories in the
-directory specified by ``git_dir`` in the ``zuul`` section of
-zuul.conf (by default, /var/lib/zuul/git).  If you want to serve
-Zuul's Git repositories in order to provide Zuul refs for Jenkins, you
-can configure Apache to do so using the following directives::
-
-  SetEnv GIT_PROJECT_ROOT /var/lib/zuul/git
-  SetEnv GIT_HTTP_EXPORT_ALL
-
-  AliasMatch ^/p/(.*/objects/[0-9a-f]{2}/[0-9a-f]{38})$ /var/lib/zuul/git/$1
-  AliasMatch ^/p/(.*/objects/pack/pack-[0-9a-f]{40}.(pack|idx))$ /var/lib/zuul/git/$1
-  ScriptAlias /p/ /usr/lib/git-core/git-http-backend/
-
-And set ``push_change_refs`` to ``false`` (the default) in the
-``zuul`` section of zuul.conf.
-
-Note that Zuul's Git repositories are not bare, which means they have
-a working tree, and are not suitable for public consumption (for
-instance, a clone will produce a repository in an unpredictable state
-depending on what the state of Zuul's repository is when the clone
-happens).  They are, however, suitable for automated systems that
-respond to Zuul triggers.
-
-Pushing to Gerrit
-"""""""""""""""""
-
-If you want to push Zuul refs back to Gerrit, set the following
-permissions for your project (or ``All-Projects``) in Gerrit (where
-``CI Tools`` is a group of which the user you created above is a
-member)::
-
-    [access "refs/zuul/*"]
-            create = group CI Tools
-            push = +force CI Tools
-            pushMerge = group CI Tools
-            forgeAuthor = group CI Tools
-    [access "refs/for/refs/zuul/*"]
-            pushMerge = group CI Tools
-
-And set the following in ``zuul.conf``:
-
-  [replication]
-    url1=ssh://user@review.example.com:29418/
-
-Pushing to Another Location
-"""""""""""""""""""""""""""
-
-Simply set one or more destination URLs in the ``replication`` section
-of zuul.conf as above.
-
 Timer
 -----
 
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index e560a45..1a6a23d 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -83,21 +83,49 @@
 """"
 
 **layout_config**
-  Path to layout config file.
+  Path to layout config file.  Used by zuul-server only.
   ``layout_config=/etc/zuul/layout.yaml``
 
 **log_config**
-  Path to log config file.
+  Path to log config file.  Used by zuul-server only.
   ``log_config=/etc/zuul/logging.yaml``
 
 **pidfile**
-  Path to PID lock file.
+  Path to PID lock file.  Used by zuul-server only.
   ``pidfile=/var/run/zuul/zuul.pid``
 
 **state_dir**
-  Path to directory that Zuul should save state to.
+  Path to directory that Zuul should save state to.  Used by all Zuul
+  commands.
   ``state_dir=/var/lib/zuul``
 
+**report_times**
+  Boolean value (``true`` or ``false``) that determines if Zuul should
+  include elapsed times for each job in the textual report.  Used by
+  zuul-server only.
+  ``report_times=true``
+
+**status_url**
+  URL that will be posted in Zuul comments made to Gerrit changes when
+  starting jobs for a change.  Used by zuul-server only.
+  ``status_url=https://zuul.example.com/status``
+
+**url_pattern**
+  If you are storing build logs external to the system that originally
+  ran jobs and wish to link to those logs when Zuul makes comments on
+  Gerrit changes for completed jobs this setting configures what the
+  URLs for those links should be.  Used by zuul-server only.
+  ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
+
+**job_name_in_report**
+  Boolean value (``true`` or ``false``) that indicates whether the
+  job name should be included in the report (normally only the URL
+  is included).  Defaults to ``false``.  Used by zuul-server only.
+  ``job_name_in_report=true``
+
+merger
+""""""
+
 **git_dir**
   Directory that Zuul should clone local git repositories to.
   ``git_dir=/var/lib/zuul/git``
@@ -110,32 +138,18 @@
   Optional: Value to pass to `git config user.name`.
   ``git_user_name=zuul``
 
-**report_times**
-  Boolean value (``true`` or ``false``) that determines if Zuul should
-  include elapsed times for each job in the textual report.
-  ``report_times=true``
-
-**status_url**
-  URL that will be posted in Zuul comments made to Gerrit changes when
-  starting jobs for a change.
-  ``status_url=https://zuul.example.com/status``
-
-**url_pattern**
-  If you are storing build logs external to the system that originally
-  ran jobs and wish to link to those logs when Zuul makes comments on
-  Gerrit changes for completed jobs this setting configures what the
-  URLs for those links should be.
-  ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}``
-
-**job_name_in_report**
-  Boolean value (``true`` or ``false``) that indicates whether the
-  job name should be included in the report (normally only the URL
-  is included).  Defaults to ``false``.
-  ``job_name_in_report=true``
-
 **zuul_url**
-  URL of Zuul's git repos, accessible to test workers.  
-  Usually "http://zuul.example.com/p".
+  URL of this merger's git repos, accessible to test workers.  Usually
+  "http://zuul.example.com/p" or "http://zuul-merger01.example.com/p"
+  depending on whether the merger is co-located with the Zuul server.
+
+**log_config**
+  Path to log config file for the merger process.
+  ``log_config=/etc/zuul/logging.yaml``
+
+**pidfile**
+  Path to PID lock file for the merger process.
+  ``pidfile=/var/run/zuul-merger/merger.pid``
 
 smtp
 """"
@@ -154,16 +168,6 @@
   This can be overridden by individual pipelines.
   ``default_to=you@example.com``
 
-replication
-"""""""""""
-
-Zuul can push the refs it creates to any number of servers.  To do so,
-list the git push URLs in this section, one per line as follows::
-
-  [replication]
-    url1=ssh://user@host1.example.com:port/path/to/repo
-    url2=ssh://user@host2.example.com:port/path/to/repo
-
 layout.yaml
 ~~~~~~~~~~~
 
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index a4d1390..75c84e4 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -15,10 +15,12 @@
 log_config=/etc/zuul/logging.conf
 pidfile=/var/run/zuul/zuul.pid
 state_dir=/var/lib/zuul
+status_url=https://jenkins.example.com/zuul/status
+
+[merger]
 git_dir=/var/lib/zuul/git
 ;git_user_email=zuul@example.com
 ;git_user_name=zuul
-status_url=https://jenkins.example.com/zuul/status
 zuul_url=http://zuul.example.com/p
 
 [smtp]
diff --git a/requirements.txt b/requirements.txt
index 170b5152..92bb296 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -13,5 +13,5 @@
 extras
 statsd>=1.0.0,<3.0
 voluptuous>=0.7
-gear>=0.4.0,<1.0.0
+gear>=0.5.1,<1.0.0
 apscheduler>=2.1.1,<3.0
diff --git a/setup.cfg b/setup.cfg
index 9ff62d6..21b1199 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@
 [entry_points]
 console_scripts =
     zuul-server = zuul.cmd.server:main
+    zuul-merger = zuul.cmd.merger:main
     zuul = zuul.cmd.client:main
 
 [build_sphinx]
diff --git a/tests/fixtures/layout-no-jobs.yaml b/tests/fixtures/layout-no-jobs.yaml
new file mode 100644
index 0000000..ee8dc62
--- /dev/null
+++ b/tests/fixtures/layout-no-jobs.yaml
@@ -0,0 +1,43 @@
+includes:
+  - python-file: custom_functions.py
+
+pipelines:
+  - name: check
+    manager: IndependentPipelineManager
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+
+  - name: gate
+    manager: DependentPipelineManager
+    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - approved: 1
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    start:
+      gerrit:
+        verified: 0
+    precedence: high
+
+projects:
+  - name: org/project
+    merge-mode: cherry-pick
+    check:
+      - noop
+    gate:
+      - noop
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 98dfe86..b1c94de 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -231,3 +231,7 @@
       - conflict-project-merge:
         - conflict-project-test1
         - conflict-project-test2
+
+  - name: org/noop-project
+    gate:
+      - noop
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index f77e07e..bee06e4 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -8,11 +8,13 @@
 
 [zuul]
 layout_config=layout.yaml
+url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
+job_name_in_report=true
+
+[merger]
 git_dir=/tmp/zuul-test/git
 git_user_email=zuul@example.com
 git_user_name=zuul
-url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
-job_name_in_report=true
 zuul_url=http://zuul.example.com/p
 
 [smtp]
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 9787ae1..9576440 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -47,6 +47,8 @@
 import zuul.rpclistener
 import zuul.rpcclient
 import zuul.launcher.gearman
+import zuul.merger.server
+import zuul.merger.client
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
 import zuul.trigger.gerrit
@@ -66,9 +68,10 @@
 
 
 def repack_repo(path):
-    output = subprocess.Popen(
-        ['git', '--git-dir=%s/.git' % path, 'repack', '-afd'],
-        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
+    output = subprocess.Popen(cmd, close_fds=True,
+                              stdout=subprocess.PIPE,
+                              stderr=subprocess.PIPE)
     out = output.communicate()
     if output.returncode:
         raise Exception("git repack returned %d" % output.returncode)
@@ -158,6 +161,7 @@
         repo.head.reference = 'master'
         repo.head.reset(index=True, working_tree=True)
         repo.git.clean('-x', '-f', '-d')
+        repo.heads['master'].checkout()
         return r
 
     def addPatchset(self, files=[], large=False):
@@ -494,12 +498,23 @@
             'name': self.name,
             'number': self.number,
             'manager': self.worker.worker_id,
+            'worker_name': 'My Worker',
+            'worker_hostname': 'localhost',
+            'worker_ips': ['127.0.0.1', '192.168.1.1'],
+            'worker_fqdn': 'zuul.example.org',
+            'worker_program': 'FakeBuilder',
+            'worker_version': 'v1.1',
+            'worker_extra': {'something': 'else'}
         }
 
+        self.log.debug('Running build %s' % self.unique)
+
         self.job.sendWorkData(json.dumps(data))
+        self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
         self.job.sendWorkStatus(0, 100)
 
         if self.worker.hold_jobs_in_build:
+            self.log.debug('Holding build %s' % self.unique)
             self._wait()
         self.log.debug("Build %s continuing" % self.unique)
 
@@ -762,7 +777,7 @@
         self.upstream_root = os.path.join(self.test_root, "upstream")
         self.git_root = os.path.join(self.test_root, "git")
 
-        CONFIG.set('zuul', 'git_dir', self.git_root)
+        CONFIG.set('merger', 'git_dir', self.git_root)
         if os.path.exists(self.test_root):
             shutil.rmtree(self.test_root)
         os.makedirs(self.test_root)
@@ -802,6 +817,9 @@
         self.worker.addServer('127.0.0.1', self.gearman_server.port)
         self.gearman_server.worker = self.worker
 
+        self.merge_server = zuul.merger.server.MergeServer(self.config)
+        self.merge_server.start()
+
         self.sched = zuul.scheduler.Scheduler()
 
         def URLOpenerFactory(*args, **kw):
@@ -810,6 +828,8 @@
 
         urllib2.urlopen = URLOpenerFactory
         self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
+        self.merge_client = zuul.merger.client.MergeClient(
+            self.config, self.sched)
 
         self.smtp_messages = []
 
@@ -831,6 +851,7 @@
         self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
 
         self.sched.setLauncher(self.launcher)
+        self.sched.setMerger(self.merge_client)
         self.sched.registerTrigger(self.gerrit)
         self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
         self.sched.registerTrigger(self.timer)
@@ -871,6 +892,9 @@
     def shutdown(self):
         self.log.debug("Shutting down after tests")
         self.launcher.stop()
+        self.merge_server.stop()
+        self.merge_server.join()
+        self.merge_client.stop()
         self.worker.shutdown()
         self.gearman_server.shutdown()
         self.gerrit.stop()
@@ -944,11 +968,17 @@
         repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
         repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
         commit_messages = ['%s-1' % commit.subject for commit in commits]
+        self.log.debug("Checking if job %s has changes; commit_messages %s;"
+                       " repo_messages %s; sha %s" % (job, commit_messages,
+                                                      repo_messages, sha))
         for msg in commit_messages:
             if msg not in repo_messages:
+                self.log.debug("  messages do not match")
                 return False
         if repo_shas[0] != sha:
+            self.log.debug("  sha does not match")
             return False
+        self.log.debug("  OK")
         return True
 
     def registerJobs(self):
@@ -983,13 +1013,15 @@
             done = True
             for connection in self.gearman_server.active_connections:
                 if (connection.functions and
-                    connection.client_id != 'Zuul RPC Listener'):
+                    connection.client_id not in ['Zuul RPC Listener',
+                                                 'Zuul Merger']):
                     done = False
             if done:
                 break
             time.sleep(0)
         self.gearman_server.functions = set()
         self.rpc.register()
+        self.merge_server.register()
 
     def haveAllBuildsReported(self):
         # See if Zuul is waiting on a meta job to complete
@@ -1075,13 +1107,17 @@
                 self.fake_gerrit.event_queue.join()
                 self.sched.trigger_event_queue.join()
                 self.sched.result_event_queue.join()
+                self.sched.run_handler_lock.acquire()
                 if (self.sched.trigger_event_queue.empty() and
                     self.sched.result_event_queue.empty() and
                     self.fake_gerrit.event_queue.empty() and
+                    not self.merge_client.build_sets and
                     self.areAllBuildsWaiting()):
+                    self.sched.run_handler_lock.release()
                     self.worker.lock.release()
                     self.log.debug("...settled.")
                     return
+                self.sched.run_handler_lock.release()
             self.worker.lock.release()
             self.sched.wake_event.wait(0.1)
 
@@ -2343,6 +2379,10 @@
     def test_merger_repack_large_change(self):
         "Test that the merger works with large changes after a repack"
         # https://bugs.launchpad.net/zuul/+bug/1078946
+        # This test assumes the repo is already cloned; make sure it is
+        url = self.sched.triggers['gerrit'].getGitUrl(
+            self.sched.layout.projects['org/project1'])
+        self.merge_server.merger.addProject('org/project1', url)
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         A.addPatchset(large=True)
         path = os.path.join(self.upstream_root, "org/project1")
@@ -2464,7 +2504,7 @@
 
     def test_zuul_url_return(self):
         "Test if ZUUL_URL is returning when zuul_url is set in zuul.conf"
-        self.assertTrue(self.sched.config.has_option('zuul', 'zuul_url'))
+        self.assertTrue(self.sched.config.has_option('merger', 'zuul_url'))
         self.worker.hold_jobs_in_build = True
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -2784,6 +2824,29 @@
         self.assertReportedStat('test-timing', '3|ms')
         self.assertReportedStat('test-guage', '12|g')
 
+    def test_stuck_job_cleanup(self):
+        "Test that pending jobs are cleaned up if removed from layout"
+        self.gearman_server.hold_jobs_in_queue = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        self.assertEqual(len(self.gearman_server.getQueue()), 1)
+
+        self.config.set('zuul', 'layout_config',
+                        'tests/fixtures/layout-no-jobs.yaml')
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
+        self.gearman_server.release('noop')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.gearman_server.getQueue()), 0)
+        self.assertTrue(self.sched._areAllBuildsComplete())
+
+        self.assertEqual(len(self.history), 1)
+        self.assertEqual(self.history[0].name, 'noop')
+        self.assertEqual(self.history[0].result, 'SUCCESS')
+
     def test_file_jobs(self):
         "Test that file jobs run only when appropriate"
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -3025,39 +3088,6 @@
         self.assertEqual(B.data['status'], 'MERGED')
         self.assertEqual(B.reported, 2)
 
-    def test_push_urls(self):
-        "Test that Zuul can push refs to multiple URLs"
-        upstream_path = os.path.join(self.upstream_root, 'org/project')
-        replica1 = os.path.join(self.upstream_root, 'replica1')
-        replica2 = os.path.join(self.upstream_root, 'replica2')
-
-        self.config.add_section('replication')
-        self.config.set('replication', 'url1', 'file://%s' % replica1)
-        self.config.set('replication', 'url2', 'file://%s' % replica2)
-        self.sched.reconfigure(self.config)
-
-        r1 = git.Repo.clone_from(upstream_path, replica1 + '/org/project.git')
-        r2 = git.Repo.clone_from(upstream_path, replica2 + '/org/project.git')
-
-        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        A.addApproval('CRVW', 2)
-        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
-        B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B')
-        B.addApproval('CRVW', 2)
-        self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
-        self.waitUntilSettled()
-        count = 0
-        for ref in r1.refs:
-            if ref.path.startswith('refs/zuul'):
-                count += 1
-        self.assertEqual(count, 3)
-
-        count = 0
-        for ref in r2.refs:
-            if ref.path.startswith('refs/zuul'):
-                count += 1
-        self.assertEqual(count, 3)
-
     def test_timer(self):
         "Test that a periodic job is triggered"
         self.worker.hold_jobs_in_build = True
@@ -3274,6 +3304,7 @@
             self.assertEqual(
                 enqueue_times[str(item.change)], item.enqueue_time)
 
+        self.waitUntilSettled()
         self.worker.release('.*-merge')
         self.waitUntilSettled()
         self.worker.release('.*-merge')
@@ -3340,6 +3371,7 @@
         r = client.promote(pipeline='gate',
                            change_ids=['3,1'])
 
+        self.waitUntilSettled()
         self.worker.release('.*-merge')
         self.waitUntilSettled()
         self.worker.release('.*-merge')
@@ -3570,3 +3602,41 @@
         self.assertEqual(queue.window, 2)
         self.assertEqual(queue.window_floor, 1)
         self.assertEqual(C.data['status'], 'MERGED')
+
+    def test_worker_update_metadata(self):
+        "Test if a worker can send back metadata about itself"
+        self.worker.hold_jobs_in_build = True
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.launcher.builds), 1)
+
+        self.log.debug('Current builds:')
+        self.log.debug(self.launcher.builds)
+
+        start = time.time()
+        while True:
+            if time.time() - start > 10:
+                raise Exception("Timeout waiting for gearman server to report "
+                                + "back to the client")
+            build = self.launcher.builds.values()[0]
+            if build.worker.name == "My Worker":
+                break
+            else:
+                time.sleep(0)
+
+        self.log.debug(build)
+        self.assertEqual("My Worker", build.worker.name)
+        self.assertEqual("localhost", build.worker.hostname)
+        self.assertEqual(['127.0.0.1', '192.168.1.1'], build.worker.ips)
+        self.assertEqual("zuul.example.org", build.worker.fqdn)
+        self.assertEqual("FakeBuilder", build.worker.program)
+        self.assertEqual("v1.1", build.worker.version)
+        self.assertEqual({'something': 'else'}, build.worker.extra)
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
new file mode 100644
index 0000000..f046235
--- /dev/null
+++ b/zuul/cmd/merger.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import ConfigParser
+import daemon
+import extras
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+import logging
+import logging.config
+import os
+import sys
+import signal
+import traceback
+
+# No zuul imports here because they pull in paramiko which must not be
+# imported until after the daemonization.
+# https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
+
+
+def stack_dump_handler(signum, frame):
+    signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+    log_str = ""
+    for thread_id, stack_frame in sys._current_frames().items():
+        log_str += "Thread: %s\n" % thread_id
+        log_str += "".join(traceback.format_stack(stack_frame))
+    log = logging.getLogger("zuul.stack_dump")
+    log.debug(log_str)
+    signal.signal(signal.SIGUSR2, stack_dump_handler)
+
+
+class Merger(object):
+    def __init__(self):
+        self.args = None
+        self.config = None
+
+    def parse_arguments(self):
+        parser = argparse.ArgumentParser(description='Zuul merge worker.')
+        parser.add_argument('-c', dest='config',
+                            help='specify the config file')
+        parser.add_argument('-d', dest='nodaemon', action='store_true',
+                            help='do not run as a daemon')
+        parser.add_argument('--version', dest='version', action='store_true',
+                            help='show zuul version')
+        self.args = parser.parse_args()
+
+    def read_config(self):
+        self.config = ConfigParser.ConfigParser()
+        if self.args.config:
+            locations = [self.args.config]
+        else:
+            locations = ['/etc/zuul/zuul.conf',
+                         '~/zuul.conf']
+        for fp in locations:
+            if os.path.exists(os.path.expanduser(fp)):
+                self.config.read(os.path.expanduser(fp))
+                return
+        raise Exception("Unable to locate config file in %s" % locations)
+
+    def setup_logging(self, section, parameter):
+        if self.config.has_option(section, parameter):
+            fp = os.path.expanduser(self.config.get(section, parameter))
+            if not os.path.exists(fp):
+                raise Exception("Unable to read logging config file at %s" %
+                                fp)
+            logging.config.fileConfig(fp)
+        else:
+            logging.basicConfig(level=logging.DEBUG)
+
+    def exit_handler(self, signum, frame):
+        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+        self.merger.stop()
+        self.merger.join()
+
+    def main(self):
+        # See comment at top of file about zuul imports
+        import zuul.merger.server
+
+        self.setup_logging('merger', 'log_config')
+
+        self.merger = zuul.merger.server.MergeServer(self.config)
+        self.merger.start()
+
+        signal.signal(signal.SIGUSR1, self.exit_handler)
+        signal.signal(signal.SIGUSR2, stack_dump_handler)
+        while True:
+            try:
+                signal.pause()
+            except KeyboardInterrupt:
+                print "Ctrl + C: asking merger to exit nicely...\n"
+                self.exit_handler(signal.SIGINT, None)
+
+
+def main():
+    server = Merger()
+    server.parse_arguments()
+
+    if server.args.version:
+        from zuul.version import version_info as zuul_version_info
+        print "Zuul version: %s" % zuul_version_info.version_string()
+        sys.exit(0)
+
+    server.read_config()
+
+    if server.config.has_option('zuul', 'state_dir'):
+        state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
+    else:
+        state_dir = '/var/lib/zuul'
+    test_fn = os.path.join(state_dir, 'test')
+    try:
+        f = open(test_fn, 'w')
+        f.close()
+        os.unlink(test_fn)
+    except Exception:
+        print
+        print "Unable to write to state directory: %s" % state_dir
+        print
+        raise
+
+    if server.config.has_option('merger', 'pidfile'):
+        pid_fn = os.path.expanduser(server.config.get('merger', 'pidfile'))
+    else:
+        pid_fn = '/var/run/zuul-merger/merger.pid'
+    pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+    if server.args.nodaemon:
+        server.main()
+    else:
+        with daemon.DaemonContext(pidfile=pid):
+            server.main()
+
+
+if __name__ == "__main__":
+    sys.path.insert(0, '.')
+    main()
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 7901535..79a2538 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -154,7 +154,13 @@
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
             import gear
-            gear.Server(4730)
+            statsd_host = os.environ.get('STATSD_HOST')
+            statsd_port = int(os.environ.get('STATSD_PORT', 8125))
+            gear.Server(4730,
+                        statsd_host=statsd_host,
+                        statsd_port=statsd_port,
+                        statsd_prefix='zuul.geard')
+
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
             pipe_read.read()
@@ -172,6 +178,7 @@
         # See comment at top of file about zuul imports
         import zuul.scheduler
         import zuul.launcher.gearman
+        import zuul.merger.client
         import zuul.reporter.gerrit
         import zuul.reporter.smtp
         import zuul.trigger.gerrit
@@ -184,10 +191,12 @@
             self.start_gear_server()
 
         self.setup_logging('zuul', 'log_config')
+        self.log = logging.getLogger("zuul.Server")
 
         self.sched = zuul.scheduler.Scheduler()
 
         gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
+        merger = zuul.merger.client.MergeClient(self.config, self.sched)
         gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
         timer = zuul.trigger.timer.Timer(self.config, self.sched)
         webapp = zuul.webapp.WebApp(self.sched)
@@ -205,15 +214,19 @@
         )
 
         self.sched.setLauncher(gearman)
+        self.sched.setMerger(merger)
         self.sched.registerTrigger(gerrit)
         self.sched.registerTrigger(timer)
         self.sched.registerReporter(gerrit_reporter)
         self.sched.registerReporter(smtp_reporter)
 
+        self.log.info('Starting scheduler')
         self.sched.start()
         self.sched.reconfigure(self.config)
         self.sched.resume()
+        self.log.info('Starting Webapp')
         webapp.start()
+        self.log.info('Starting RPC')
         rpc.start()
 
         signal.signal(signal.SIGHUP, self.reconfigure_handler)
@@ -243,21 +256,6 @@
             path = None
         sys.exit(server.test_config(path))
 
-    if server.config.has_option('zuul', 'state_dir'):
-        state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
-    else:
-        state_dir = '/var/lib/zuul'
-    test_fn = os.path.join(state_dir, 'test')
-    try:
-        f = open(test_fn, 'w')
-        f.close()
-        os.unlink(test_fn)
-    except:
-        print
-        print "Unable to write to state directory: %s" % state_dir
-        print
-        raise
-
     if server.config.has_option('zuul', 'pidfile'):
         pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
     else:
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 3500445..3a690dc 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -155,7 +155,6 @@
         self.sched = sched
         self.builds = {}
         self.meta_jobs = {}  # A list of meta-jobs like stop or describe
-        self.zuul_server = config.get('zuul', 'zuul_url')
 
         server = config.get('gearman', 'server')
         if config.has_option('gearman', 'port'):
@@ -226,7 +225,7 @@
         params = dict(ZUUL_UUID=uuid,
                       ZUUL_PROJECT=item.change.project.name)
         params['ZUUL_PIPELINE'] = pipeline.name
-        params['ZUUL_URL'] = self.zuul_server
+        params['ZUUL_URL'] = item.current_build_set.zuul_url
         if hasattr(item.change, 'refspec'):
             changes_str = '^'.join(
                 ['%s:%s:%s' % (i.change.project.name, i.change.branch,
@@ -299,7 +298,7 @@
         if not self.isJobRegistered(gearman_job.name):
             self.log.error("Job %s is not registered with Gearman" %
                            gearman_job)
-            self.onBuildCompleted(gearman_job, 'LOST')
+            self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
             return build
 
         if pipeline.precedence == zuul.model.PRECEDENCE_NORMAL:
@@ -313,14 +312,14 @@
             self.gearman.submitJob(gearman_job, precedence=precedence)
         except Exception:
             self.log.exception("Unable to submit job to Gearman")
-            self.onBuildCompleted(gearman_job, 'LOST')
+            self.onBuildCompleted(gearman_job, 'EXCEPTION')
             return build
 
         if not gearman_job.handle:
             self.log.error("No job handle was received for %s after 30 seconds"
                            " marking as lost." %
                            gearman_job)
-            self.onBuildCompleted(gearman_job, 'LOST')
+            self.onBuildCompleted(gearman_job, 'NO_HANDLE')
 
         return build
 
@@ -381,6 +380,8 @@
         if build:
             # Allow URL to be updated
             build.url = data.get('url') or build.url
+            # Update information about worker
+            build.worker.updateFromData(data)
 
             if build.number is None:
                 self.log.info("Build %s started" % job)
@@ -395,7 +396,7 @@
 
     def onDisconnect(self, job):
         self.log.info("Gearman job %s lost due to disconnect" % job)
-        self.onBuildCompleted(job, 'LOST')
+        self.onBuildCompleted(job)
 
     def onUnknownJob(self, job):
         self.log.info("Gearman job %s lost due to unknown handle" % job)
diff --git a/zuul/merger.py b/zuul/merger.py
deleted file mode 100644
index a580a21..0000000
--- a/zuul/merger.py
+++ /dev/null
@@ -1,309 +0,0 @@
-# Copyright 2012 Hewlett-Packard Development Company, L.P.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import git
-import os
-import logging
-import model
-import threading
-
-
-class ZuulReference(git.Reference):
-    _common_path_default = "refs/zuul"
-    _points_to_commits_only = True
-
-
-class Repo(object):
-    log = logging.getLogger("zuul.Repo")
-
-    def __init__(self, remote, local, email, username):
-        self.remote_url = remote
-        self.local_path = local
-        self.email = email
-        self.username = username
-        self._initialized = False
-        try:
-            self._ensure_cloned()
-        except:
-            self.log.exception("Unable to initialize repo for %s" % remote)
-
-    def _ensure_cloned(self):
-        repo_is_cloned = os.path.exists(self.local_path)
-        if self._initialized and repo_is_cloned:
-            return
-        # If the repo does not exist, clone the repo.
-        if not repo_is_cloned:
-            self.log.debug("Cloning from %s to %s" % (self.remote_url,
-                                                      self.local_path))
-            git.Repo.clone_from(self.remote_url, self.local_path)
-        repo = git.Repo(self.local_path)
-        if self.email:
-            repo.config_writer().set_value('user', 'email',
-                                           self.email)
-        if self.username:
-            repo.config_writer().set_value('user', 'name',
-                                           self.username)
-        repo.config_writer().write()
-        self._initialized = True
-
-    def createRepoObject(self):
-        try:
-            self._ensure_cloned()
-            repo = git.Repo(self.local_path)
-        except:
-            self.log.exception("Unable to initialize repo for %s" %
-                               self.local_path)
-        return repo
-
-    def reset(self):
-        repo = self.createRepoObject()
-        self.log.debug("Resetting repository %s" % self.local_path)
-        self.update()
-        origin = repo.remotes.origin
-        for ref in origin.refs:
-            if ref.remote_head == 'HEAD':
-                continue
-            repo.create_head(ref.remote_head, ref, force=True)
-
-        # Reset to remote HEAD (usually origin/master)
-        repo.head.reference = origin.refs['HEAD']
-        repo.head.reset(index=True, working_tree=True)
-        repo.git.clean('-x', '-f', '-d')
-
-    def getBranchHead(self, branch):
-        repo = self.createRepoObject()
-        branch_head = repo.heads[branch]
-        return branch_head
-
-    def checkout(self, ref):
-        repo = self.createRepoObject()
-        self.log.debug("Checking out %s" % ref)
-        repo.head.reference = ref
-        repo.head.reset(index=True, working_tree=True)
-
-    def cherryPick(self, ref):
-        repo = self.createRepoObject()
-        self.log.debug("Cherry-picking %s" % ref)
-        self.fetch(ref)
-        repo.git.cherry_pick("FETCH_HEAD")
-
-    def merge(self, ref, strategy=None):
-        repo = self.createRepoObject()
-        args = []
-        if strategy:
-            args += ['-s', strategy]
-        args.append('FETCH_HEAD')
-        self.fetch(ref)
-        self.log.debug("Merging %s with args %s" % (ref, args))
-        repo.git.merge(*args)
-
-    def fetch(self, ref):
-        repo = self.createRepoObject()
-        # The git.remote.fetch method may read in git progress info and
-        # interpret it improperly causing an AssertionError. Because the
-        # data was fetched properly subsequent fetches don't seem to fail.
-        # So try again if an AssertionError is caught.
-        origin = repo.remotes.origin
-        try:
-            origin.fetch(ref)
-        except AssertionError:
-            origin.fetch(ref)
-
-    def createZuulRef(self, ref, commit='HEAD'):
-        repo = self.createRepoObject()
-        self.log.debug("CreateZuulRef %s at %s " % (ref, commit))
-        ref = ZuulReference.create(repo, ref, commit)
-        return ref.commit
-
-    def push(self, local, remote):
-        repo = self.createRepoObject()
-        self.log.debug("Pushing %s:%s to %s " % (local, remote,
-                                                 self.remote_url))
-        repo.remotes.origin.push('%s:%s' % (local, remote))
-
-    def push_url(self, url, refspecs):
-        repo = self.createRepoObject()
-        self.log.debug("Pushing %s to %s" % (refspecs, url))
-        repo.git.push([url] + refspecs)
-
-    def update(self):
-        repo = self.createRepoObject()
-        self.log.debug("Updating repository %s" % self.local_path)
-        origin = repo.remotes.origin
-        origin.update()
-
-
-class Merger(object):
-    log = logging.getLogger("zuul.Merger")
-
-    def __init__(self, working_root, sshkey, email, username, replicate_urls):
-        self.repos = {}
-        self.working_root = working_root
-        if not os.path.exists(working_root):
-            os.makedirs(working_root)
-        if sshkey:
-            self._makeSSHWrapper(sshkey)
-        self.email = email
-        self.username = username
-        self.replicate_urls = replicate_urls
-
-    def _makeSSHWrapper(self, key):
-        name = os.path.join(self.working_root, '.ssh_wrapper')
-        fd = open(name, 'w')
-        fd.write('#!/bin/bash\n')
-        fd.write('ssh -i %s $@\n' % key)
-        fd.close()
-        os.chmod(name, 0755)
-        os.environ['GIT_SSH'] = name
-
-    def addProject(self, project, url):
-        try:
-            path = os.path.join(self.working_root, project.name)
-            repo = Repo(url, path, self.email, self.username)
-
-            self.repos[project] = repo
-        except:
-            self.log.exception("Unable to add project %s" % project)
-
-    def getRepo(self, project):
-        return self.repos.get(project, None)
-
-    def updateRepo(self, project):
-        repo = self.getRepo(project)
-        try:
-            self.log.info("Updating local repository %s", project)
-            repo.update()
-        except:
-            self.log.exception("Unable to update %s", project)
-
-    def _mergeChange(self, change, ref, target_ref):
-        repo = self.getRepo(change.project)
-        try:
-            repo.checkout(ref)
-        except:
-            self.log.exception("Unable to checkout %s" % ref)
-            return False
-
-        try:
-            mode = change.project.merge_mode
-            if mode == model.MERGER_MERGE:
-                repo.merge(change.refspec)
-            elif mode == model.MERGER_MERGE_RESOLVE:
-                repo.merge(change.refspec, 'resolve')
-            elif mode == model.MERGER_CHERRY_PICK:
-                repo.cherryPick(change.refspec)
-            else:
-                raise Exception("Unsupported merge mode: %s" % mode)
-        except Exception:
-            # Log exceptions at debug level because they are
-            # usually benign merge conflicts
-            self.log.debug("Unable to merge %s" % change, exc_info=True)
-            return False
-
-        try:
-            # Keep track of the last commit, it's the commit that
-            # will be passed to jenkins because it's the commit
-            # for the triggering change
-            zuul_ref = change.branch + '/' + target_ref
-            commit = repo.createZuulRef(zuul_ref, 'HEAD').hexsha
-        except:
-            self.log.exception("Unable to set zuul ref %s for change %s" %
-                               (zuul_ref, change))
-            return False
-        return commit
-
-    def replicateRefspecs(self, refspecs):
-        threads = []
-        for url in self.replicate_urls:
-            t = threading.Thread(target=self._replicate,
-                                 args=(url, refspecs))
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
-
-    def _replicate(self, url, project_refspecs):
-        try:
-            for project, refspecs in project_refspecs.items():
-                repo = self.getRepo(project)
-                repo.push_url(os.path.join(url, project.name + '.git'),
-                              refspecs)
-        except Exception:
-            self.log.exception("Exception pushing to %s" % url)
-
-    def mergeChanges(self, items, target_ref=None):
-        # Merge shortcuts:
-        # if this is the only change just merge it against its branch.
-        # elif there are changes ahead of us that are from the same project and
-        # branch we can merge against the commit associated with that change
-        # instead of going back up the tree.
-        #
-        # Shortcuts assume some external entity is checking whether or not
-        # changes from other projects can merge.
-        commit = False
-        item = items[-1]
-        sibling_filter = lambda i: (i.change.project == item.change.project and
-                                    i.change.branch == item.change.branch)
-        sibling_items = filter(sibling_filter, items)
-        # Only current change to merge against tip of change.branch
-        if len(sibling_items) == 1:
-            repo = self.getRepo(item.change.project)
-            # we need to reset here in order to call getBranchHead
-            try:
-                repo.reset()
-            except:
-                self.log.exception("Unable to reset repo %s" % repo)
-                return False
-            commit = self._mergeChange(item.change,
-                                       repo.getBranchHead(item.change.branch),
-                                       target_ref=target_ref)
-        # Sibling changes exist. Merge current change against newest sibling.
-        elif (len(sibling_items) >= 2 and
-              sibling_items[-2].current_build_set.commit):
-            last_commit = sibling_items[-2].current_build_set.commit
-            commit = self._mergeChange(item.change, last_commit,
-                                       target_ref=target_ref)
-        # Either change did not merge or we did not need to merge as there were
-        # previous merge conflicts.
-        if not commit:
-            return commit
-
-        project_branches = []
-        replicate_refspecs = {}
-        for i in reversed(items):
-            # Here we create all of the necessary zuul refs and potentially
-            # push them back to Gerrit.
-            if (i.change.project, i.change.branch) in project_branches:
-                continue
-            repo = self.getRepo(i.change.project)
-            if (i.change.project != item.change.project or
-                i.change.branch != item.change.branch):
-                # Create a zuul ref for all dependent changes project
-                # branch combinations as this is the ref that jenkins will
-                # use to test. The ref for change has already been set so
-                # we skip it here.
-                try:
-                    zuul_ref = i.change.branch + '/' + target_ref
-                    repo.createZuulRef(zuul_ref, i.current_build_set.commit)
-                except:
-                    self.log.exception("Unable to set zuul ref %s for "
-                                       "change %s" % (zuul_ref, i.change))
-                    return False
-            ref = 'refs/zuul/' + i.change.branch + '/' + target_ref
-            refspecs = replicate_refspecs.get(i.change.project, [])
-            refspecs.append('%s:%s' % (ref, ref))
-            replicate_refspecs[i.change.project] = refspecs
-            project_branches.append((i.change.project, i.change.branch))
-        self.replicateRefspecs(replicate_refspecs)
-        return commit
diff --git a/zuul/merger/__init__.py b/zuul/merger/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/merger/__init__.py
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
new file mode 100644
index 0000000..72fd4c5
--- /dev/null
+++ b/zuul/merger/client.py
@@ -0,0 +1,117 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+from uuid import uuid4
+
+import gear
+
+
+def getJobData(job):
+    if not len(job.data):
+        return {}
+    d = job.data[-1]
+    if not d:
+        return {}
+    return json.loads(d)
+
+
+class MergeGearmanClient(gear.Client):
+    def __init__(self, merge_client):
+        super(MergeGearmanClient, self).__init__()
+        self.__merge_client = merge_client
+
+    def handleWorkComplete(self, packet):
+        job = super(MergeGearmanClient, self).handleWorkComplete(packet)
+        self.__merge_client.onBuildCompleted(job)
+        return job
+
+    def handleWorkFail(self, packet):
+        job = super(MergeGearmanClient, self).handleWorkFail(packet)
+        self.__merge_client.onBuildCompleted(job)
+        return job
+
+    def handleWorkException(self, packet):
+        job = super(MergeGearmanClient, self).handleWorkException(packet)
+        self.__merge_client.onBuildCompleted(job)
+        return job
+
+    def handleDisconnect(self, job):
+        job = super(MergeGearmanClient, self).handleDisconnect(job)
+        self.__merge_client.onBuildCompleted(job)
+
+
+class MergeClient(object):
+    log = logging.getLogger("zuul.MergeClient")
+
+    def __init__(self, config, sched):
+        self.config = config
+        self.sched = sched
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+        self.gearman = MergeGearmanClient(self)
+        self.gearman.addServer(server, port)
+        self.log.debug("Waiting for gearman")
+        self.gearman.waitForServer()
+        self.build_sets = {}
+
+    def stop(self):
+        self.gearman.shutdown()
+
+    def areMergesOutstanding(self):
+        if self.build_sets:
+            return True
+        return False
+
+    def submitJob(self, name, data, build_set):
+        uuid = str(uuid4().hex)
+        self.log.debug("Submitting job %s with data %s" % (name, data))
+        job = gear.Job(name,
+                       json.dumps(data),
+                       unique=uuid)
+        self.build_sets[uuid] = build_set
+        self.gearman.submitJob(job)
+
+    def mergeChanges(self, items, build_set):
+        data = dict(items=items)
+        self.submitJob('merger:merge', data, build_set)
+
+    def updateRepo(self, project, url, build_set):
+        data = dict(project=project,
+                    url=url)
+        self.submitJob('merger:update', data, build_set)
+
+    def onBuildCompleted(self, job):
+        build_set = self.build_sets.get(job.unique)
+        if build_set:
+            data = getJobData(job)
+            zuul_url = data.get('zuul_url')
+            merged = data.get('merged', False)
+            updated = data.get('updated', False)
+            commit = data.get('commit')
+            self.log.info("Merge %s complete, merged: %s, updated: %s, "
+                          "commit: %s" %
+                          (job, merged, updated, build_set.commit))
+            self.sched.onMergeCompleted(build_set, zuul_url,
+                                        merged, updated, commit)
+            # The test suite expects the build_set to be removed from
+            # the internal dict after the wake flag is set.
+            del self.build_sets[job.unique]
+        else:
+            self.log.error("Unable to find build set for uuid %s" % job.unique)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
new file mode 100644
index 0000000..10ce82c
--- /dev/null
+++ b/zuul/merger/merger.py
@@ -0,0 +1,288 @@
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import git
+import os
+import logging
+
+import zuul.model
+
+
+class ZuulReference(git.Reference):
+    _common_path_default = "refs/zuul"
+    _points_to_commits_only = True
+
+
+class Repo(object):
+    log = logging.getLogger("zuul.Repo")
+
+    def __init__(self, remote, local, email, username):
+        self.remote_url = remote
+        self.local_path = local
+        self.email = email
+        self.username = username
+        self._initialized = False
+        try:
+            self._ensure_cloned()
+        except:
+            self.log.exception("Unable to initialize repo for %s" % remote)
+
+    def _ensure_cloned(self):
+        repo_is_cloned = os.path.exists(self.local_path)
+        if self._initialized and repo_is_cloned:
+            return
+        # If the repo does not exist, clone the repo.
+        if not repo_is_cloned:
+            self.log.debug("Cloning from %s to %s" % (self.remote_url,
+                                                      self.local_path))
+            git.Repo.clone_from(self.remote_url, self.local_path)
+        repo = git.Repo(self.local_path)
+        if self.email:
+            repo.config_writer().set_value('user', 'email',
+                                           self.email)
+        if self.username:
+            repo.config_writer().set_value('user', 'name',
+                                           self.username)
+        repo.config_writer().write()
+        self._initialized = True
+
+    def createRepoObject(self):
+        try:
+            self._ensure_cloned()
+            repo = git.Repo(self.local_path)
+        except:
+            self.log.exception("Unable to initialize repo for %s" %
+                               self.local_path)
+        return repo
+
+    def reset(self):
+        repo = self.createRepoObject()
+        self.log.debug("Resetting repository %s" % self.local_path)
+        self.update()
+        origin = repo.remotes.origin
+        for ref in origin.refs:
+            if ref.remote_head == 'HEAD':
+                continue
+            repo.create_head(ref.remote_head, ref, force=True)
+
+        # Reset to remote HEAD (usually origin/master)
+        repo.head.reference = origin.refs['HEAD']
+        repo.head.reset(index=True, working_tree=True)
+        repo.git.clean('-x', '-f', '-d')
+
+    def getBranchHead(self, branch):
+        repo = self.createRepoObject()
+        branch_head = repo.heads[branch]
+        return branch_head.commit
+
+    def getCommitFromRef(self, refname):
+        repo = self.createRepoObject()
+        if not refname in repo.refs:
+            return None
+        ref = repo.refs[refname]
+        return ref.commit
+
+    def checkout(self, ref):
+        repo = self.createRepoObject()
+        self.log.debug("Checking out %s" % ref)
+        repo.head.reference = ref
+        repo.head.reset(index=True, working_tree=True)
+
+    def cherryPick(self, ref):
+        repo = self.createRepoObject()
+        self.log.debug("Cherry-picking %s" % ref)
+        self.fetch(ref)
+        repo.git.cherry_pick("FETCH_HEAD")
+        return repo.head.commit
+
+    def merge(self, ref, strategy=None):
+        repo = self.createRepoObject()
+        args = []
+        if strategy:
+            args += ['-s', strategy]
+        args.append('FETCH_HEAD')
+        self.fetch(ref)
+        self.log.debug("Merging %s with args %s" % (ref, args))
+        repo.git.merge(*args)
+        return repo.head.commit
+
+    def fetch(self, ref):
+        repo = self.createRepoObject()
+        # The git.remote.fetch method may read in git progress info and
+        # interpret it improperly causing an AssertionError. Because the
+        # data was fetched properly subsequent fetches don't seem to fail.
+        # So try again if an AssertionError is caught.
+        origin = repo.remotes.origin
+        try:
+            origin.fetch(ref)
+        except AssertionError:
+            origin.fetch(ref)
+
+    def createZuulRef(self, ref, commit='HEAD'):
+        repo = self.createRepoObject()
+        self.log.debug("CreateZuulRef %s at %s " % (ref, commit))
+        ref = ZuulReference.create(repo, ref, commit)
+        return ref.commit
+
+    def push(self, local, remote):
+        repo = self.createRepoObject()
+        self.log.debug("Pushing %s:%s to %s " % (local, remote,
+                                                 self.remote_url))
+        repo.remotes.origin.push('%s:%s' % (local, remote))
+
+    def update(self):
+        repo = self.createRepoObject()
+        self.log.debug("Updating repository %s" % self.local_path)
+        origin = repo.remotes.origin
+        origin.update()
+
+
+class Merger(object):
+    log = logging.getLogger("zuul.Merger")
+
+    def __init__(self, working_root, sshkey, email, username):
+        self.repos = {}
+        self.working_root = working_root
+        if not os.path.exists(working_root):
+            os.makedirs(working_root)
+        if sshkey:
+            self._makeSSHWrapper(sshkey)
+        self.email = email
+        self.username = username
+
+    def _makeSSHWrapper(self, key):
+        name = os.path.join(self.working_root, '.ssh_wrapper')
+        fd = open(name, 'w')
+        fd.write('#!/bin/bash\n')
+        fd.write('ssh -i %s $@\n' % key)
+        fd.close()
+        os.chmod(name, 0755)
+        os.environ['GIT_SSH'] = name
+
+    def addProject(self, project, url):
+        repo = None
+        try:
+            path = os.path.join(self.working_root, project)
+            repo = Repo(url, path, self.email, self.username)
+
+            self.repos[project] = repo
+        except Exception:
+            self.log.exception("Unable to add project %s" % project)
+        return repo
+
+    def getRepo(self, project, url):
+        if project in self.repos:
+            return self.repos[project]
+        if not url:
+            raise Exception("Unable to set up repo for project %s"
+                            " without a url" % (project,))
+        return self.addProject(project, url)
+
+    def updateRepo(self, project, url):
+        repo = self.getRepo(project, url)
+        try:
+            self.log.info("Updating local repository %s", project)
+            repo.update()
+        except:
+            self.log.exception("Unable to update %s", project)
+
+    def _mergeChange(self, item, ref):
+        repo = self.getRepo(item['project'], item['url'])
+        try:
+            repo.checkout(ref)
+        except Exception:
+            self.log.exception("Unable to checkout %s" % ref)
+            return None
+
+        try:
+            mode = item['merge_mode']
+            if mode == zuul.model.MERGER_MERGE:
+                commit = repo.merge(item['refspec'])
+            elif mode == zuul.model.MERGER_MERGE_RESOLVE:
+                commit = repo.merge(item['refspec'], 'resolve')
+            elif mode == zuul.model.MERGER_CHERRY_PICK:
+                commit = repo.cherryPick(item['refspec'])
+            else:
+                raise Exception("Unsupported merge mode: %s" % mode)
+        except git.GitCommandError:
+            # Log git exceptions at debug level because they are
+            # usually benign merge conflicts
+            self.log.debug("Unable to merge %s" % item, exc_info=True)
+            return None
+        except Exception:
+            self.log.exception("Exception while merging a change:")
+            return None
+
+        return commit
+
+    def _mergeItem(self, item, recent):
+        self.log.debug("Processing refspec %s for project %s / %s ref %s" %
+                       (item['refspec'], item['project'], item['branch'],
+                        item['ref']))
+        repo = self.getRepo(item['project'], item['url'])
+        key = (item['project'], item['branch'])
+        # See if we have a commit for this change already in this repo
+        zuul_ref = item['branch'] + '/' + item['ref']
+        commit = repo.getCommitFromRef(zuul_ref)
+        if commit:
+            self.log.debug("Found commit %s for ref %s" % (commit, zuul_ref))
+            # Store this as the most recent commit for this
+            # project-branch
+            recent[key] = commit
+            return commit
+        self.log.debug("Unable to find commit for ref %s" % (zuul_ref,))
+        # We need to merge the change
+        # Get the most recent commit for this project-branch
+        base = recent.get(key)
+        if not base:
+            # There is none, so use the branch tip
+            # we need to reset here in order to call getBranchHead
+            self.log.debug("No base commit found for %s" % (key,))
+            try:
+                repo.reset()
+            except Exception:
+                self.log.exception("Unable to reset repo %s" % repo)
+                return None
+            base = repo.getBranchHead(item['branch'])
+        else:
+            self.log.debug("Found base commit %s for %s" % (base, key,))
+        # Merge the change
+        commit = self._mergeChange(item, base)
+        if not commit:
+            return None
+        # Store this commit as the most recent for this project-branch
+        recent[key] = commit
+        # Set the Zuul ref for this item to point to the most recent
+        # commits of each project-branch
+        for key, commit in recent.items():
+            project, branch = key
+            try:
+                repo = self.getRepo(project, None)
+                zuul_ref = branch + '/' + item['ref']
+                repo.createZuulRef(zuul_ref, commit)
+            except Exception:
+                self.log.exception("Unable to set zuul ref %s for "
+                                   "item %s" % (zuul_ref, item))
+                return None
+        return commit
+
+    def mergeChanges(self, items):
+        recent = {}
+        commit = None
+        for item in items:
+            commit = self._mergeItem(item, recent)
+            if not commit:
+                return None
+        return commit.hexsha
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
new file mode 100644
index 0000000..d8bc1b8
--- /dev/null
+++ b/zuul/merger/server.py
@@ -0,0 +1,118 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import threading
+import traceback
+
+import gear
+
+import merger
+
+
+class MergeServer(object):
+    log = logging.getLogger("zuul.MergeServer")
+
+    def __init__(self, config):
+        self.config = config
+        self.zuul_url = config.get('merger', 'zuul_url')
+
+        if self.config.has_option('merger', 'git_dir'):
+            merge_root = self.config.get('merger', 'git_dir')
+        else:
+            merge_root = '/var/lib/zuul/git'
+
+        if self.config.has_option('merger', 'git_user_email'):
+            merge_email = self.config.get('merger', 'git_user_email')
+        else:
+            merge_email = None
+
+        if self.config.has_option('merger', 'git_user_name'):
+            merge_name = self.config.get('merger', 'git_user_name')
+        else:
+            merge_name = None
+
+        if self.config.has_option('gerrit', 'sshkey'):
+            sshkey = self.config.get('gerrit', 'sshkey')
+        else:
+            sshkey = None
+
+        self.merger = merger.Merger(merge_root, sshkey,
+                                    merge_email, merge_name)
+
+    def start(self):
+        self._running = True
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = gear.Worker('Zuul Merger')
+        self.worker.addServer(server, port)
+        self.log.debug("Waiting for server")
+        self.worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+        self.log.debug("Starting worker")
+        self.thread = threading.Thread(target=self.run)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def register(self):
+        self.worker.registerFunction("merger:merge")
+        self.worker.registerFunction("merger:update")
+
+    def stop(self):
+        self.log.debug("Stopping")
+        self._running = False
+        self.worker.shutdown()
+        self.log.debug("Stopped")
+
+    def join(self):
+        self.thread.join()
+
+    def run(self):
+        self.log.debug("Starting merge listener")
+        while self._running:
+            try:
+                job = self.worker.getJob()
+                try:
+                    if job.name == 'merger:merge':
+                        self.merge(job)
+                    elif job.name == 'merger:update':
+                        self.update(job)
+                    else:
+                        self.log.error("Unable to handle job %s" % job.name)
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(traceback.format_exc())
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def merge(self, job):
+        args = json.loads(job.arguments)
+        commit = self.merger.mergeChanges(args['items'])
+        result = dict(merged=(commit is not None),
+                      commit=commit,
+                      zuul_url=self.zuul_url)
+        job.sendWorkComplete(json.dumps(result))
+
+    def update(self, job):
+        args = json.loads(job.arguments)
+        self.merger.updateRepo(args['project'], args['url'])
+        result = dict(updated=True,
+                      zuul_url=self.zuul_url)
+        job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/model.py b/zuul/model.py
index 5da9cef..22475e6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -627,12 +627,44 @@
         self.canceled = False
         self.retry = False
         self.parameters = {}
+        self.worker = Worker()
 
     def __repr__(self):
-        return '<Build %s of %s>' % (self.uuid, self.job.name)
+        return ('<Build %s of %s on %s>' %
+                (self.uuid, self.job.name, self.worker))
+
+
+class Worker(object):
+    """A model of the worker running a job"""
+    def __init__(self):
+        self.name = "Unknown"
+        self.hostname = None
+        self.ips = []
+        self.fqdn = None
+        self.program = None
+        self.version = None
+        self.extra = {}
+
+    def updateFromData(self, data):
+        """Update worker information if contained in the WORK_DATA response."""
+        self.name = data.get('worker_name', self.name)
+        self.hostname = data.get('worker_hostname', self.hostname)
+        self.ips = data.get('worker_ips', self.ips)
+        self.fqdn = data.get('worker_fqdn', self.fqdn)
+        self.program = data.get('worker_program', self.program)
+        self.version = data.get('worker_version', self.version)
+        self.extra = data.get('worker_extra', self.extra)
+
+    def __repr__(self):
+        return '<Worker %s>' % self.name
 
 
 class BuildSet(object):
+    # Merge states:
+    NEW = 1
+    PENDING = 2
+    COMPLETE = 3
+
     def __init__(self, item):
         self.item = item
         self.other_changes = []
@@ -642,9 +674,11 @@
         self.previous_build_set = None
         self.ref = None
         self.commit = None
+        self.zuul_url = None
         self.unable_to_merge = False
         self.unable_to_merge_message = None
         self.failing_reasons = []
+        self.merge_state = self.NEW
 
     def setConfiguration(self):
         # The change isn't enqueued until after it's created
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 73034c7..815da8c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,7 +30,6 @@
 import layoutvalidator
 import model
 from model import ActionReporter, Pipeline, Project, ChangeQueue, EventFilter
-import merger
 from zuul import version as zuul_version
 
 statsd = extras.try_import('statsd.statsd')
@@ -109,6 +108,51 @@
         self.change_ids = change_ids
 
 
+class ResultEvent(object):
+    """An event that needs to modify the pipeline state due to a
+    result from an external system."""
+
+    pass
+
+
+class BuildStartedEvent(ResultEvent):
+    """A build has started.
+
+    :arg Build build: The build which has started.
+    """
+
+    def __init__(self, build):
+        self.build = build
+
+
+class BuildCompletedEvent(ResultEvent):
+    """A build has completed
+
+    :arg Build build: The build which has completed.
+    """
+
+    def __init__(self, build):
+        self.build = build
+
+
+class MergeCompletedEvent(ResultEvent):
+    """A remote merge operation has completed
+
+    :arg BuildSet build_set: The build_set which is ready.
+    :arg str zuul_url: The URL of the Zuul Merger.
+    :arg bool merged: Whether the merge succeeded (changes with refs).
+    :arg bool updated: Whether the repo was updated (changes without refs).
+    :arg str commit: The SHA of the merged commit (changes with refs).
+    """
+
+    def __init__(self, build_set, zuul_url, merged, updated, commit):
+        self.build_set = build_set
+        self.zuul_url = zuul_url
+        self.merged = merged
+        self.updated = updated
+        self.commit = commit
+
+
 class Scheduler(threading.Thread):
     log = logging.getLogger("zuul.Scheduler")
 
@@ -117,10 +161,12 @@
         self.daemon = True
         self.wake_event = threading.Event()
         self.layout_lock = threading.Lock()
+        self.run_handler_lock = threading.Lock()
         self._pause = False
         self._exit = False
         self._stopped = False
         self.launcher = None
+        self.merger = None
         self.triggers = dict()
         self.reporters = dict()
         self.config = None
@@ -351,44 +397,12 @@
 
         return layout
 
-    def _setupMerger(self):
-        if self.config.has_option('zuul', 'git_dir'):
-            merge_root = self.config.get('zuul', 'git_dir')
-        else:
-            merge_root = '/var/lib/zuul/git'
-
-        if self.config.has_option('zuul', 'git_user_email'):
-            merge_email = self.config.get('zuul', 'git_user_email')
-        else:
-            merge_email = None
-
-        if self.config.has_option('zuul', 'git_user_name'):
-            merge_name = self.config.get('zuul', 'git_user_name')
-        else:
-            merge_name = None
-
-        replicate_urls = []
-        if self.config.has_section('replication'):
-            for k, v in self.config.items('replication'):
-                replicate_urls.append(v)
-
-        if self.config.has_option('gerrit', 'sshkey'):
-            sshkey = self.config.get('gerrit', 'sshkey')
-        else:
-            sshkey = None
-
-        # TODO: The merger should have an upstream repo independent of
-        # triggers, and then each trigger should provide a fetch
-        # location.
-        self.merger = merger.Merger(merge_root, sshkey, merge_email,
-                                    merge_name, replicate_urls)
-        for project in self.layout.projects.values():
-            url = self.triggers['gerrit'].getGitUrl(project)
-            self.merger.addProject(project, url)
-
     def setLauncher(self, launcher):
         self.launcher = launcher
 
+    def setMerger(self, merger):
+        self.merger = merger
+
     def registerTrigger(self, trigger, name=None):
         if name is None:
             name = trigger.name
@@ -422,7 +436,8 @@
     def onBuildStarted(self, build):
         self.log.debug("Adding start event for build: %s" % build)
         build.start_time = time.time()
-        self.result_event_queue.put(('started', build))
+        event = BuildStartedEvent(build)
+        self.result_event_queue.put(event)
         self.wake_event.set()
         self.log.debug("Done adding start event for build: %s" % build)
 
@@ -442,10 +457,19 @@
                 statsd.incr(key)
         except:
             self.log.exception("Exception reporting runtime stats")
-        self.result_event_queue.put(('completed', build))
+        event = BuildCompletedEvent(build)
+        self.result_event_queue.put(event)
         self.wake_event.set()
         self.log.debug("Done adding complete event for build: %s" % build)
 
+    def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit):
+        self.log.debug("Adding merge complete event for build set: %s" %
+                       build_set)
+        event = MergeCompletedEvent(build_set, zuul_url,
+                                    merged, updated, commit)
+        self.result_event_queue.put(event)
+        self.wake_event.set()
+
     def reconfigure(self, config):
         self.log.debug("Prepare to reconfigure")
         event = ReconfigureEvent(config)
@@ -541,9 +565,9 @@
                         self.log.warning("No old pipeline matching %s found "
                                          "when reconfiguring" % name)
                     continue
-                self.log.debug("Re-enqueueing changes for pipeline %s" %
-                               name)
+                self.log.debug("Re-enqueueing changes for pipeline %s" % name)
                 items_to_remove = []
+                builds_to_remove = []
                 for shared_queue in old_pipeline.queues:
                     for item in shared_queue.queue:
                         item.item_ahead = None
@@ -558,21 +582,27 @@
                             items_to_remove.append(item)
                             continue
                         item.change.project = project
+                        for build in item.current_build_set.getBuilds():
+                            job = layout.jobs.get(build.job.name)
+                            if job:
+                                build.job = job
+                            else:
+                                builds_to_remove.append(build)
                         if not new_pipeline.manager.reEnqueueItem(item):
                             items_to_remove.append(item)
-                builds_to_remove = []
-                for build, item in old_pipeline.manager.building_jobs.items():
-                    if item in items_to_remove:
+                for item in items_to_remove:
+                    for build in item.current_build_set.getBuilds():
                         builds_to_remove.append(build)
-                        self.log.warning("Deleting running build %s for "
-                                         "change %s while reenqueueing" % (
-                                         build, item.change))
                 for build in builds_to_remove:
-                    del old_pipeline.manager.building_jobs[build]
-                new_pipeline.manager.building_jobs = \
-                    old_pipeline.manager.building_jobs
+                    self.log.warning(
+                        "Canceling build %s during reconfiguration" % (build,))
+                    try:
+                        self.launcher.cancel(build)
+                    except Exception:
+                        self.log.exception(
+                            "Exception while canceling build %s "
+                            "for change %s" % (build, item.change))
             self.layout = layout
-            self._setupMerger()
             for trigger in self.triggers.values():
                 trigger.postConfig()
             if statsd:
@@ -625,16 +655,19 @@
                 item.change,
                 enqueue_time=item.enqueue_time,
                 quiet=True)
-        while pipeline.manager.processQueue():
-            pass
 
     def _areAllBuildsComplete(self):
         self.log.debug("Checking if all builds are complete")
         waiting = False
+        if self.merger.areMergesOutstanding():
+            waiting = True
         for pipeline in self.layout.pipelines.values():
-            for build in pipeline.manager.building_jobs.keys():
-                self.log.debug("%s waiting on %s" % (pipeline.manager, build))
-                waiting = True
+            for item in pipeline.getAllItems():
+                for build in item.current_build_set.getBuilds():
+                    if build.result is None:
+                        self.log.debug("%s waiting on %s" %
+                                       (pipeline.manager, build))
+                        waiting = True
         if not waiting:
             self.log.debug("All builds are complete")
             return True
@@ -652,37 +685,40 @@
             self.wake_event.wait()
             self.wake_event.clear()
             if self._stopped:
+                self.log.debug("Run handler stopping")
                 return
             self.log.debug("Run handler awake")
+            self.run_handler_lock.acquire()
             try:
-                if not self.management_event_queue.empty():
+                while not self.management_event_queue.empty():
                     self.process_management_queue()
 
                 # Give result events priority -- they let us stop builds,
                 # whereas trigger evensts cause us to launch builds.
-                if not self.result_event_queue.empty():
+                while not self.result_event_queue.empty():
                     self.process_result_queue()
-                elif not self._pause:
-                    if not self.trigger_event_queue.empty():
+
+                if not self._pause:
+                    while not self.trigger_event_queue.empty():
                         self.process_event_queue()
 
                 if self._pause and self._areAllBuildsComplete():
                     self._doPauseEvent()
 
-                if not self._pause:
-                    if not (self.trigger_event_queue.empty() and
-                            self.result_event_queue.empty()):
-                        self.wake_event.set()
-                else:
-                    if not self.result_event_queue.empty():
-                        self.wake_event.set()
+                for pipeline in self.layout.pipelines.values():
+                    while pipeline.manager.processQueue():
+                        pass
 
                 if self._maintain_trigger_cache:
                     self.maintainTriggerCache()
                     self._maintain_trigger_cache = False
 
-            except:
+            except Exception:
                 self.log.exception("Exception in run handler:")
+                # There may still be more events to process
+                self.wake_event.set()
+            finally:
+                self.run_handler_lock.release()
 
     def maintainTriggerCache(self):
         relevant = set()
@@ -700,34 +736,23 @@
         self.log.debug("Fetching trigger event")
         event = self.trigger_event_queue.get()
         self.log.debug("Processing trigger event %s" % event)
-        project = self.layout.projects.get(event.project_name)
-        if not project:
-            self.log.warning("Project %s not found" % event.project_name)
+        try:
+            project = self.layout.projects.get(event.project_name)
+            if not project:
+                self.log.warning("Project %s not found" % event.project_name)
+                return
+
+            for pipeline in self.layout.pipelines.values():
+                change = event.getChange(project,
+                                         self.triggers.get(event.trigger_name))
+                if event.type == 'patchset-created':
+                    pipeline.manager.removeOldVersionsOfChange(change)
+                if pipeline.manager.eventMatches(event, change):
+                    self.log.info("Adding %s, %s to %s" %
+                                  (project, change, pipeline))
+                    pipeline.manager.addChange(change)
+        finally:
             self.trigger_event_queue.task_done()
-            return
-
-        # Preprocessing for ref-update events
-        if event.ref:
-            # Make sure the local git repo is up-to-date with the remote one.
-            # We better have the new ref before enqueuing the changes.
-            # This is done before enqueuing the changes to avoid calling an
-            # update per pipeline accepting the change.
-            self.log.info("Fetching references for %s" % project)
-            self.merger.updateRepo(project)
-
-        for pipeline in self.layout.pipelines.values():
-            change = event.getChange(project,
-                                     self.triggers.get(event.trigger_name))
-            if event.type == 'patchset-created':
-                pipeline.manager.removeOldVersionsOfChange(change)
-            if pipeline.manager.eventMatches(event, change):
-                self.log.info("Adding %s, %s to %s" %
-                              (project, change, pipeline))
-                pipeline.manager.addChange(change)
-            while pipeline.manager.processQueue():
-                pass
-
-        self.trigger_event_queue.task_done()
 
     def process_management_queue(self):
         self.log.debug("Fetching management event")
@@ -747,19 +772,57 @@
 
     def process_result_queue(self):
         self.log.debug("Fetching result event")
-        event_type, build = self.result_event_queue.get()
-        self.log.debug("Processing result event %s" % build)
-        for pipeline in self.layout.pipelines.values():
-            if event_type == 'started':
-                if pipeline.manager.onBuildStarted(build):
-                    self.result_event_queue.task_done()
-                    return
-            elif event_type == 'completed':
-                if pipeline.manager.onBuildCompleted(build):
-                    self.result_event_queue.task_done()
-                    return
-        self.log.warning("Build %s not found by any queue manager" % (build))
-        self.result_event_queue.task_done()
+        event = self.result_event_queue.get()
+        self.log.debug("Processing result event %s" % event)
+        try:
+            if isinstance(event, BuildStartedEvent):
+                self._doBuildStartedEvent(event)
+            elif isinstance(event, BuildCompletedEvent):
+                self._doBuildCompletedEvent(event)
+            elif isinstance(event, MergeCompletedEvent):
+                self._doMergeCompletedEvent(event)
+            else:
+                self.log.error("Unable to handle event %s" % event)
+        finally:
+            self.result_event_queue.task_done()
+
+    def _doBuildStartedEvent(self, event):
+        build = event.build
+        if build.build_set is not build.build_set.item.current_build_set:
+            self.log.warning("Build %s is not in the current build set" %
+                             (build,))
+            return
+        pipeline = build.build_set.item.pipeline
+        if not pipeline:
+            self.log.warning("Build %s is not associated with a pipeline" %
+                             (build,))
+            return
+        pipeline.manager.onBuildStarted(event.build)
+
+    def _doBuildCompletedEvent(self, event):
+        build = event.build
+        if build.build_set is not build.build_set.item.current_build_set:
+            self.log.warning("Build %s is not in the current build set" %
+                             (build,))
+            return
+        pipeline = build.build_set.item.pipeline
+        if not pipeline:
+            self.log.warning("Build %s is not associated with a pipeline" %
+                             (build,))
+            return
+        pipeline.manager.onBuildCompleted(event.build)
+
+    def _doMergeCompletedEvent(self, event):
+        build_set = event.build_set
+        if build_set is not build_set.item.current_build_set:
+            self.log.warning("Build set %s is not current" % (build_set,))
+            return
+        pipeline = build_set.item.pipeline
+        if not pipeline:
+            self.log.warning("Build set %s is not associated with a pipeline" %
+                             (build_set,))
+            return
+        pipeline.manager.onMergeCompleted(event)
 
     def formatStatusHTML(self):
         ret = '<html><pre>'
@@ -822,7 +885,6 @@
     def __init__(self, sched, pipeline):
         self.sched = sched
         self.pipeline = pipeline
-        self.building_jobs = {}
         self.event_filters = []
         if self.sched.config and self.sched.config.has_option(
             'zuul', 'report_times'):
@@ -1045,26 +1107,41 @@
                 self.dequeueItem(item)
                 self.reportStats(item)
 
+    def _makeMergerItem(self, item):
+        # Create a dictionary with all info about the item needed by
+        # the merger.
+        return dict(project=item.change.project.name,
+                    url=self.pipeline.trigger.getGitUrl(
+                        item.change.project),
+                    merge_mode=item.change.project.merge_mode,
+                    refspec=item.change.refspec,
+                    branch=item.change.branch,
+                    ref=item.current_build_set.ref,
+                    )
+
     def prepareRef(self, item):
-        # Returns False on success.
-        # Returns True if we were unable to prepare the ref.
-        ref = item.current_build_set.ref
+        # Returns True if the ref is ready, false otherwise
+        build_set = item.current_build_set
+        if build_set.merge_state == build_set.COMPLETE:
+            return True
+        if build_set.merge_state == build_set.PENDING:
+            return False
+        build_set.merge_state = build_set.PENDING
+        ref = build_set.ref
         if hasattr(item.change, 'refspec') and not ref:
             self.log.debug("Preparing ref for: %s" % item.change)
             item.current_build_set.setConfiguration()
-            ref = item.current_build_set.ref
             dependent_items = self.getDependentItems(item)
             dependent_items.reverse()
             all_items = dependent_items + [item]
-            commit = self.sched.merger.mergeChanges(all_items, ref)
-            item.current_build_set.commit = commit
-            if not commit:
-                self.log.info("Unable to merge change %s" % item.change)
-                msg = ("This change was unable to be automatically merged "
-                       "with the current state of the repository. Please "
-                       "rebase your change and upload a new patchset.")
-                self.pipeline.setUnableToMerge(item, msg)
-                return True
+            merger_items = map(self._makeMergerItem, all_items)
+            self.sched.merger.mergeChanges(merger_items,
+                                           item.current_build_set)
+        else:
+            self.log.debug("Preparing update repo for: %s" % item.change)
+            url = self.pipeline.trigger.getGitUrl(item.change.project)
+            self.sched.merger.updateRepo(item.change.project.name,
+                                         url, build_set)
         return False
 
     def _launchJobs(self, item, jobs):
@@ -1076,7 +1153,6 @@
                 build = self.sched.launcher.launch(job, item,
                                                    self.pipeline,
                                                    dependent_items)
-                self.building_jobs[build] = item
                 self.log.debug("Adding build %s of job %s to item %s" %
                                (build, job, item))
                 item.addBuild(build)
@@ -1092,24 +1168,17 @@
     def cancelJobs(self, item, prime=True):
         self.log.debug("Cancel jobs for change %s" % item.change)
         canceled = False
-        to_remove = []
+        old_build_set = item.current_build_set
         if prime and item.current_build_set.ref:
             item.resetAllBuilds()
-        for build, build_item in self.building_jobs.items():
-            if build_item == item:
-                self.log.debug("Found build %s for change %s to cancel" %
-                               (build, item.change))
-                try:
-                    self.sched.launcher.cancel(build)
-                except:
-                    self.log.exception("Exception while canceling build %s "
-                                       "for change %s" % (build, item.change))
-                to_remove.append(build)
-                canceled = True
-        for build in to_remove:
-            self.log.debug("Removing build %s from running builds" % build)
+        for build in old_build_set.getBuilds():
+            try:
+                self.sched.launcher.cancel(build)
+            except:
+                self.log.exception("Exception while canceling build %s "
+                                   "for change %s" % (build, item.change))
             build.result = 'CANCELED'
-            del self.building_jobs[build]
+            canceled = True
         for item_behind in item.items_behind:
             self.log.debug("Canceling jobs for change %s, behind change %s" %
                            (item_behind.change, item.change))
@@ -1117,7 +1186,7 @@
                 canceled = True
         return canceled
 
-    def _processOneItem(self, item, nnfi):
+    def _processOneItem(self, item, nnfi, ready_ahead):
         changed = False
         item_ahead = item.item_ahead
         change_queue = self.pipeline.getQueue(item.change.project)
@@ -1134,10 +1203,11 @@
                 self.reportItem(item)
             except MergeFailure:
                 pass
-            return (True, nnfi)
+            return (True, nnfi, ready_ahead)
         dep_item = self.getFailingDependentItem(item)
         actionable = change_queue.isActionable(item)
         item.active = actionable
+        ready = False
         if dep_item:
             failing_reasons.append('a needed change is failing')
             self.cancelJobs(item, prime=False)
@@ -1157,10 +1227,13 @@
                 changed = True
                 self.cancelJobs(item)
             if actionable:
-                self.prepareRef(item)
+                ready = self.prepareRef(item)
                 if item.current_build_set.unable_to_merge:
                     failing_reasons.append("it has a merge conflict")
-        if actionable and self.launchJobs(item):
+                    ready = False
+        if not ready:
+            ready_ahead = False
+        if actionable and ready_ahead and self.launchJobs(item):
             changed = True
         if self.pipeline.didAnyJobFail(item):
             failing_reasons.append("at least one job failed")
@@ -1182,7 +1255,7 @@
         if failing_reasons:
             self.log.debug("%s is a failing item because %s" %
                            (item, failing_reasons))
-        return (changed, nnfi)
+        return (changed, nnfi, ready_ahead)
 
     def processQueue(self):
         # Do whatever needs to be done for each change in the queue
@@ -1191,8 +1264,10 @@
         for queue in self.pipeline.queues:
             queue_changed = False
             nnfi = None  # Nearest non-failing item
+            ready_ahead = True  # All build sets ahead are ready
             for item in queue.queue[:]:
-                item_changed, nnfi = self._processOneItem(item, nnfi)
+                item_changed, nnfi, ready_ahhead = self._processOneItem(
+                    item, nnfi, ready_ahead)
                 if item_changed:
                     queue_changed = True
                 self.reportStats(item)
@@ -1219,38 +1294,36 @@
                 self.sched.launcher.setBuildDescription(build, desc)
 
     def onBuildStarted(self, build):
-        if build not in self.building_jobs:
-            # Or triggered externally, or triggered before zuul started,
-            # or restarted
-            return False
-
         self.log.debug("Build %s started" % build)
         self.updateBuildDescriptions(build.build_set)
-        while self.processQueue():
-            pass
         return True
 
     def onBuildCompleted(self, build):
-        if build not in self.building_jobs:
-            # Or triggered externally, or triggered before zuul started,
-            # or restarted
-            return False
-
         self.log.debug("Build %s completed" % build)
-        change = self.building_jobs[build]
-        self.log.debug("Found change %s which triggered completed build %s" %
-                       (change, build))
+        item = build.build_set.item
 
-        del self.building_jobs[build]
-
-        self.pipeline.setResult(change, build)
-        self.log.debug("Change %s status is now:\n %s" %
-                       (change, self.pipeline.formatStatus(change)))
+        self.pipeline.setResult(item, build)
+        self.log.debug("Item %s status is now:\n %s" %
+                       (item, self.pipeline.formatStatus(item)))
         self.updateBuildDescriptions(build.build_set)
-        while self.processQueue():
-            pass
         return True
 
+    def onMergeCompleted(self, event):
+        build_set = event.build_set
+        item = build_set.item
+        build_set.merge_state = build_set.COMPLETE
+        build_set.zuul_url = event.zuul_url
+        if event.merged:
+            build_set.commit = event.commit
+        elif event.updated:
+            build_set.commit = item.change.newrev
+        if not build_set.commit:
+            self.log.info("Unable to merge change %s" % item.change)
+            msg = ("This change was unable to be automatically merged "
+                   "with the current state of the repository. Please "
+                   "rebase your change and upload a new patchset.")
+            self.pipeline.setUnableToMerge(item, msg)
+
     def reportItem(self, item):
         if item.reported:
             raise Exception("Already reported change %s" % item.change)
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index f055a50..904fa7a 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -86,7 +86,8 @@
         raise Exception("Timer trigger does not support changes.")
 
     def getGitUrl(self, project):
-        pass
+        # For the moment, the timer trigger requires gerrit.
+        return self.sched.triggers['gerrit'].getGitUrl(project)
 
     def getGitwebUrl(self, project, sha=None):
         url = '%s/gitweb?p=%s.git' % (self.baseurl, project)