Merge "Add support for result data in child jobs" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index 2eadd4f..ede4391 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -45,8 +45,7 @@
     name: openstack-infra/zuul
     check:
       jobs:
-        - build-openstack-infra-sphinx-docs:
-            success-url: 'html/feature/zuulv3/'
+        - build-openstack-sphinx-docs:
             irrelevant-files:
               - zuul/cmd/migrate.py
               - playbooks/zuul-migrate/.*
@@ -67,8 +66,10 @@
               - playbooks/zuul-migrate/.*
     gate:
       jobs:
-        - build-openstack-infra-sphinx-docs:
-            success-url: 'html/feature/zuulv3/'
+        - build-openstack-sphinx-docs:
+            irrelevant-files:
+              - zuul/cmd/migrate.py
+              - playbooks/zuul-migrate/.*
         - tox-pep8
         - tox-py35:
             irrelevant-files:
@@ -77,5 +78,5 @@
         - zuul-stream-functional
     post:
       jobs:
-        - publish-openstack-python-docs-infra
+        - publish-openstack-sphinx-docs-infra
         - publish-openstack-python-branch-tarball
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index fbb8cbc..b20aba7 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -101,6 +101,27 @@
 
       An openssl file containing the client private key in PEM format.
 
+.. attr:: statsd
+
+   Information about the optional statsd server.  If the ``statsd``
+   python module is installed and this section is configured,
+   statistics will be reported to statsd.  See :ref:`statsd` for more
+   information.
+
+   .. attr:: server
+
+      Hostname or IP address of the statsd server.
+
+   .. attr:: port
+      :default: 8125
+
+      The UDP port on which the statsd server is listening.
+
+   .. attr:: prefix
+
+      If present, this will be prefixed to all of the keys before
+      transmitting to the statsd server.
+
 .. NOTE: this is a white lie at this point, since only the scheduler
    uses this, however, we expect other components to use it later, so
    it's reasonable for admins to plan for this now.
@@ -115,6 +136,11 @@
       A list of zookeeper hosts for Zuul to use when communicating
       with Nodepool.
 
+   .. attr:: session_timeout
+      :default: 10.0
+
+      The ZooKeeper session timeout, in seconds.
+
 
 .. _scheduler:
 
@@ -260,6 +286,22 @@
 
       Directory in which Zuul should clone git repositories.
 
+   .. attr:: git_http_low_speed_limit
+      :default: 1000
+
+      If the HTTP transfer speed is less then git_http_low_speed_limit for
+      longer then git_http_low_speed_time, the transfer is aborted.
+
+      Value in bytes, setting to 0 will disable.
+
+   .. attr:: git_http_low_speed_time
+      :default: 30
+
+      If the HTTP transfer speed is less then git_http_low_speed_limit for
+      longer then git_http_low_speed_time, the transfer is aborted.
+
+      Value in seconds, setting to 0 will disable.
+
    .. attr:: git_user_email
 
       Value to pass to `git config user.email
@@ -463,6 +505,32 @@
       `bubblewrap` has become integral to securely operating Zuul.  If you
       have a valid use case for it, we encourage you to let us know.
 
+   .. attr:: load_multiplier
+      :default: 2.5
+
+      When an executor host gets too busy, the system may suffer
+      timeouts and other ill effects. The executor will stop accepting
+      more than 1 job at a time until load has lowered below a safe
+      level.  This level is determined by multiplying the number of
+      CPU's by `load_multiplier`.
+
+      So for example, if the system has 2 CPUs, and load_multiplier
+      is 2.5, the safe load for the system is 5.00. Any time the
+      system load average is over 5.00, the executor will quit
+      accepting multiple jobs at one time.
+
+      The executor will observe system load and determine whether
+      to accept more jobs every 30 seconds.
+
+   .. attr:: hostname
+      :default: hostname of the server
+
+      The executor needs to know its hostname under which it is reachable by
+      zuul-web. Otherwise live console log streaming doesn't work. In most cases
+      This is automatically detected correctly. But when running in environments
+      where it cannot determine its hostname correctly this can be overridden
+      here.
+
 .. attr:: merger
 
    .. attr:: git_user_email
diff --git a/doc/source/admin/drivers/sql.rst b/doc/source/admin/drivers/sql.rst
index e208467..a269f5d 100644
--- a/doc/source/admin/drivers/sql.rst
+++ b/doc/source/admin/drivers/sql.rst
@@ -28,6 +28,21 @@
       <http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#database-urls>`_
       for more information.
 
+      The driver will automatically set up the database creating and managing
+      the necesssary tables. Therefore the provided user should have sufficient
+      permissions to manage the database. For example:
+
+      .. code-block:: sql
+
+        GRANT ALL ON my_database TO 'my_user'@'%';
+
+   .. attr:: pool_recycle
+      :default: 1
+
+      Tune the pool_recycle value. See `The SQLAlchemy manual on pooling
+      <http://docs.sqlalchemy.org/en/latest/core/pooling.html#setting-pool-recycle>`_
+      for more information.
+
 Reporter Configuration
 ----------------------
 
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 4fed1f9..55f1908 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -3,6 +3,8 @@
 Monitoring
 ==========
 
+.. _statsd:
+
 Statsd reporting
 ----------------
 
@@ -13,37 +15,24 @@
 Configuration
 ~~~~~~~~~~~~~
 
-Statsd support uses the statsd python module. Note that Zuul will start without
-the statsd python module, so an existing Zuul installation may be missing it.
+Statsd support uses the ``statsd`` python module.  Note that support
+is optional and Zuul will start without the statsd python module
+present.
 
-The configuration is done via environment variables STATSD_HOST and
-STATSD_PORT. They are interpreted by the statsd module directly and there is no
-such parameter in zuul.conf yet. Your init script will have to initialize both
-of them before executing Zuul.
-
-Your init script most probably loads a configuration file named
-``/etc/default/zuul`` which would contain the environment variables::
-
-  $ cat /etc/default/zuul
-  STATSD_HOST=10.0.0.1
-  STATSD_PORT=8125
+Configuration is in the :attr:`statsd` section of ``zuul.conf``.
 
 Metrics
 ~~~~~~~
 
 These metrics are emitted by the Zuul :ref:`scheduler`:
 
-.. stat:: gerrit.event.<type>
+.. stat:: zuul.event.<driver>.event.<type>
    :type: counter
 
-   Gerrit emits different kinds of messages over its `stream-events`
-   interface.  Zuul will report counters for each type of event it
-   receives from Gerrit.
+   Zuul will report counters for each type of event it receives from
+   each of its configured drivers.
 
-   Refer to your Gerrit installation documentation for a complete
-   list of Gerrit event types.
-
-.. stat:: zuul.pipeline
+.. stat:: zuul.tenant.<tenant>.pipeline
 
    Holds metrics specific to jobs. This hierarchy includes:
 
@@ -63,22 +52,60 @@
          The number of items currently being processed by this
          pipeline.
 
-      .. stat:: job
+      .. stat:: project
 
-         Subtree detailing per jobs statistics:
+         This hierarchy holds more specific metrics for each project
+         participating in the pipeline.
 
-         .. stat:: <jobname>
+         .. stat:: <canonical_hostname>
 
-            The triggered job name.
+            The canonical hostname for the triggering project.
+            Embedded ``.`` characters will be translated to ``_``.
 
-            .. stat:: <result>
-               :type: counter, timer
+            .. stat:: <project>
 
-               A counter for each type of result (e.g., ``SUCCESS`` or
-               ``FAILURE``, ``ERROR``, etc.) for the job.  If the
-               result is ``SUCCESS`` or ``FAILURE``, Zuul will
-               additionally report the duration of the build as a
-               timer.
+               The name of the triggering project.  Embedded ``/`` or
+               ``.`` characters will be translated to ``_``.
+
+               .. stat:: <branch>
+
+                  The name of the triggering branch.  Embedded ``/`` or
+                  ``.`` characters will be translated to ``_``.
+
+                  .. stat:: job
+
+                     Subtree detailing per-project job statistics:
+
+                     .. stat:: <jobname>
+
+                        The triggered job name.
+
+                        .. stat:: <result>
+                           :type: counter, timer
+
+                           A counter for each type of result (e.g., ``SUCCESS`` or
+                           ``FAILURE``, ``ERROR``, etc.) for the job.  If the
+                           result is ``SUCCESS`` or ``FAILURE``, Zuul will
+                           additionally report the duration of the build as a
+                           timer.
+
+                  .. stat:: current_changes
+                     :type: gauge
+
+                     The number of items of this project currently being
+                     processed by this pipeline.
+
+                  .. stat:: resident_time
+                     :type: timer
+
+                     A timer metric reporting how long each item for this
+                     project has been in the pipeline.
+
+                  .. stat:: total_changes
+                     :type: counter
+
+                     The number of changes for this project processed by the
+                     pipeline since Zuul started.
 
       .. stat:: resident_time
          :type: timer
@@ -98,34 +125,111 @@
          How long each item spent in the pipeline before its first job
          started.
 
-      .. stat:: <project>
+.. stat:: zuul.executor.<executor>
 
-         This hierarchy holds more specific metrics for each project
-         participating in the pipeline.  If the project name contains
-         a ``/`` character, it will be replaced with a ``.``.
+   Holds metrics emitted by individual executors.  The ``<executor>``
+   component of the key will be replaced with the hostname of the
+   executor.
 
-         .. stat:: current_changes
-            :type: gauge
+   .. stat:: builds
+      :type: counter
 
-            The number of items of this project currently being
-            processed by this pipeline.
+      Incremented each time the executor starts a build.
 
-         .. stat:: resident_time
-            :type: timer
+   .. stat:: running_builds
+      :type: gauge
 
-            A timer metric reporting how long each item for this
-            project has been in the pipeline.
+      The number of builds currently running on this executor.
 
-         .. stat:: total_changes
-            :type: counter
+   .. stat:: load_average
+      :type: gauge
 
-            The number of changes for this project processed by the
-            pipeline since Zuul started.
+      The one-minute load average of this executor, multiplied by 100.
 
-As an example, given a job named `myjob` triggered by the `gate` pipeline
-which took 40 seconds to build, the Zuul scheduler will emit the following
-statsd events:
+.. stat:: zuul.nodepool
 
-  * ``zuul.pipeline.gate.job.myjob.SUCCESS`` +1
-  * ``zuul.pipeline.gate.job.myjob``  40 seconds
-  * ``zuul.pipeline.gate.all_jobs`` +1
+   Holds metrics related to Zuul requests from Nodepool.
+
+   .. stat:: requested
+      :type: counter
+
+      Incremented each time a node request is submitted to Nodepool.
+
+      .. stat:: label.<label>
+         :type: counter
+
+         Incremented each time a request for a specific label is
+         submitted to Nodepool.
+
+      .. stat:: size.<size>
+         :type: counter
+
+         Incremented each time a request of a specific size is submitted
+         to Nodepool.  For example, a request for 3 nodes would use the
+         key ``zuul.nodepool.requested.size.3``.
+
+   .. stat:: canceled
+      :type: counter, timer
+
+      The counter is incremented each time a node request is canceled
+      by Zuul.  The timer records the elapsed time from request to
+      cancelation.
+
+      .. stat:: label.<label>
+         :type: counter, timer
+
+         The same, for a specific label.
+
+      .. stat:: size.<size>
+         :type: counter, timer
+
+         The same, for a specific request size.
+
+   .. stat:: fulfilled
+      :type: counter, timer
+
+      The counter is incremented each time a node request is fulfilled
+      by Nodepool.  The timer records the elapsed time from request to
+      fulfillment.
+
+      .. stat:: label.<label>
+         :type: counter, timer
+
+         The same, for a specific label.
+
+      .. stat:: size.<size>
+         :type: counter, timer
+
+         The same, for a specific request size.
+
+   .. stat:: failed
+      :type: counter, timer
+
+      The counter is incremented each time Nodepool fails to fulfill a
+      node request.  The timer records the elapsed time from request
+      to failure.
+
+      .. stat:: label.<label>
+         :type: counter, timer
+
+         The same, for a specific label.
+
+      .. stat:: size.<size>
+         :type: counter, timer
+
+         The same, for a specific request size.
+
+   .. stat:: current_requests
+      :type: gauge
+
+      The number of outstanding nodepool requests from Zuul.
+
+
+As an example, given a job named `myjob` in `mytenant` triggered by a
+change to `myproject` on the `master` branch in the `gate` pipeline
+which took 40 seconds to build, the Zuul scheduler will emit the
+following statsd events:
+
+  * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` +1
+  * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS``  40 seconds
+  * ``zuul.tenant.mytenant.pipeline.gate.all_jobs`` +1
diff --git a/doc/source/user/config.rst b/doc/source/user/config.rst
index 025ea71..b2074d1 100644
--- a/doc/source/user/config.rst
+++ b/doc/source/user/config.rst
@@ -653,10 +653,22 @@
         configuration, Zuul reads the ``master`` branch of a given
         project first, then other branches in alphabetical order.
 
+      * In the case of a job variant defined within a :ref:`project`,
+        if the project definition is in a :term:`config-project`, no
+        implied branch specifier is used.  If it appears in an
+        :term:`untrusted-project`, with no branch specifier, the
+        branch containing the project definition is used as an implied
+        branch specifier.
+
+      * In the case of a job variant defined within a
+        :ref:`project-template`, if no branch specifier appears, the
+        implied branch specifier for the :ref:`project` definition which
+        uses the project-template will be used.
+
       * Any further job variants other than the reference definition
         in an untrusted-project will, if they do not have a branch
-        specifier, will have an implied branch specifier for the
-        current branch applied.
+        specifier, have an implied branch specifier for the current
+        branch applied.
 
       This allows for the very simple and expected workflow where if a
       project defines a job on the ``master`` branch with no branch
@@ -940,12 +952,12 @@
 
       A boolean value which indicates whether this job may only be
       used in pipelines where :attr:`pipeline.post-review` is
-      ``true``.  This is automatically set to ``true`` if this job is
-      defined in a :term:`untrusted-project`.  It may be explicitly
-      set to obtain the same behavior for jobs defined in
-      :term:`config projects <config-project>`.  Once this is set to
-      ``true`` anywhere in the inheritance hierarchy for a job, it
-      will remain set for all child jobs and variants (it can not be
+      ``true``.  This is automatically set to ``true`` if this job
+      uses a :ref:`secret` and is defined in a :term:`untrusted-project`.
+      It may be explicitly set to obtain the same behavior for jobs
+      defined in :term:`config projects <config-project>`.  Once this
+      is set to ``true`` anywhere in the inheritance hierarchy for a job,
+      it will remain set for all child jobs and variants (it can not be
       set to ``false``).
 
 .. _project:
@@ -961,6 +973,12 @@
 project-pipeline definition is what determines how a project
 participates in a pipeline.
 
+Multiple project definitions may appear for the same project (for
+example, in a central :term:`config projects <config-project>` as wall
+as in a repo's own ``.zuul.yaml``).  In this case, all of the project
+definitions are combined (the jobs listed in all of the definitions
+will be run).
+
 Consider the following project definition::
 
   - project:
@@ -1007,8 +1025,7 @@
 
 .. attr:: project
 
-   In addition to a project-pipeline definition for one or more
-   pipelines, the following attributes may appear in a project:
+   The following attributes may appear in a project:
 
    .. attr:: name
       :required:
diff --git a/doc/source/user/jobs.rst b/doc/source/user/jobs.rst
index 4d80d1e..f65ee19 100644
--- a/doc/source/user/jobs.rst
+++ b/doc/source/user/jobs.rst
@@ -252,6 +252,13 @@
          A boolean indicating whether this project appears in the
          :attr:`job.required-projects` list for this job.
 
+   .. var:: _projects
+      :type: dict
+
+      The same as ``projects`` but a dictionary indexed by the
+      ``name`` value of each entry.  ``projects`` will be converted to
+      this.
+
    .. var:: tenant
 
       The name of the current Zuul tenant.
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index 70e999e..50dbed5 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -53,6 +53,7 @@
             'msg_id': '#zuul_msg',
             'pipelines_id': '#zuul_pipelines',
             'queue_events_num': '#zuul_queue_events_num',
+            'queue_management_events_num': '#zuul_queue_management_events_num',
             'queue_results_num': '#zuul_queue_results_num',
         }, options);
 
@@ -281,38 +282,38 @@
 
             change_header: function(change) {
                 var change_id = change.id || 'NA';
-                if (change_id.length === 40) {
-                    change_id = change_id.substr(0, 7);
-                }
 
                 var $change_link = $('<small />');
                 if (change.url !== null) {
-                    var github_id = change.id.match(/^([0-9]+),([0-9a-f]{40})$/);
+                    var github_id = change_id.match(/^([0-9]+),([0-9a-f]{40})$/);
                     if (github_id) {
                         $change_link.append(
                             $('<a />').attr('href', change.url).append(
                                 $('<abbr />')
-                                    .attr('title', change.id)
+                                    .attr('title', change_id)
                                     .text('#' + github_id[1])
                             )
                         );
-                    } else if (/^[0-9a-f]{40}$/.test(change.id)) {
-                        var change_id_short = change.id.slice(0, 7);
+                    } else if (/^[0-9a-f]{40}$/.test(change_id)) {
+                        var change_id_short = change_id.slice(0, 7);
                         $change_link.append(
                             $('<a />').attr('href', change.url).append(
                                 $('<abbr />')
-                                    .attr('title', change.id)
+                                    .attr('title', change_id)
                                     .text(change_id_short)
                             )
                         );
                     }
                     else {
                         $change_link.append(
-                            $('<a />').attr('href', change.url).text(change.id)
+                            $('<a />').attr('href', change.url).text(change_id)
                         );
                     }
                 }
                 else {
+                    if (change_id.length === 40) {
+                        change_id = change_id.substr(0, 7);
+                    }
                     $change_link.text(change_id);
                 }
 
@@ -713,6 +714,10 @@
                             data.trigger_event_queue ?
                                 data.trigger_event_queue.length : '0'
                         );
+                        $(options.queue_management_events_num).text(
+                            data.management_event_queue ?
+                                data.management_event_queue.length : '0'
+                        );
                         $(options.queue_results_num).text(
                             data.result_event_queue ?
                                 data.result_event_queue.length : '0'
diff --git a/etc/status/public_html/zuul.app.js b/etc/status/public_html/zuul.app.js
index ae950e8..7ceb2dd 100644
--- a/etc/status/public_html/zuul.app.js
+++ b/etc/status/public_html/zuul.app.js
@@ -33,7 +33,7 @@
         + '<div class="zuul-container" id="zuul-container">'
         + '<div style="display: none;" class="alert" id="zuul_msg"></div>'
         + '<button class="btn pull-right zuul-spinner">updating <span class="glyphicon glyphicon-refresh"></span></button>'
-        + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_results_num">0</span> results.</p>'
+        + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_management_events_num">0</span> management events, <span id="zuul_queue_results_num">0</span> results.</p>'
         + '<div id="zuul_controls"></div>'
         + '<div id="zuul_pipelines" class="row"></div>'
         + '<p>Zuul version: <span id="zuul-version-span"></span></p>'
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index ba7aace..76494ad 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -5,6 +5,9 @@
 ;ssl_cert=/path/to/client.pem
 ;ssl_key=/path/to/client.key
 
+[statsd]
+server=127.0.0.1
+
 [zookeeper]
 hosts=127.0.0.1:2181
 
diff --git a/requirements.txt b/requirements.txt
index cdffda2..4b8be3c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,12 +2,16 @@
 
 # pull from master until https://github.com/sigmavirus24/github3.py/pull/671
 # is in a release
--e git+https://github.com/sigmavirus24/github3.py.git@develop#egg=Github3.py
+git+https://github.com/sigmavirus24/github3.py.git@develop#egg=Github3.py
 PyYAML>=3.1.0
 Paste
 WebOb>=1.2.3
 paramiko>=1.8.0,<2.0.0
-GitPython>=0.3.3,<2.1.2
+# Using a local fork of gitpython until at least these changes are in a
+# release.
+# https://github.com/gitpython-developers/GitPython/pull/682
+# https://github.com/gitpython-developers/GitPython/pull/686
+git+https://github.com/jeblair/GitPython.git@zuul#egg=GitPython
 python-daemon>=2.0.4,<2.1.0
 extras
 statsd>=1.0.0,<3.0
diff --git a/tests/base.py b/tests/base.py
index 2e3d682..a02ee5a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -20,7 +20,6 @@
 import datetime
 import gc
 import hashlib
-import importlib
 from io import StringIO
 import json
 import logging
@@ -48,7 +47,6 @@
 import kazoo.client
 import kazoo.exceptions
 import pymysql
-import statsd
 import testtools
 import testtools.content
 import testtools.content_type
@@ -1363,6 +1361,63 @@
         return repos
 
 
+class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
+    def doMergeChanges(self, merger, items, repo_state):
+        # Get a merger in order to update the repos involved in this job.
+        commit = super(RecordingAnsibleJob, self).doMergeChanges(
+            merger, items, repo_state)
+        if not commit:  # merge conflict
+            self.recordResult('MERGER_FAILURE')
+        return commit
+
+    def recordResult(self, result):
+        build = self.executor_server.job_builds[self.job.unique]
+        self.executor_server.lock.acquire()
+        self.executor_server.build_history.append(
+            BuildHistory(name=build.name, result=result, changes=build.changes,
+                         node=build.node, uuid=build.unique,
+                         ref=build.parameters['zuul']['ref'],
+                         parameters=build.parameters, jobdir=build.jobdir,
+                         pipeline=build.parameters['zuul']['pipeline'])
+        )
+        self.executor_server.running_builds.remove(build)
+        del self.executor_server.job_builds[self.job.unique]
+        self.executor_server.lock.release()
+
+    def runPlaybooks(self, args):
+        build = self.executor_server.job_builds[self.job.unique]
+        build.jobdir = self.jobdir
+
+        result = super(RecordingAnsibleJob, self).runPlaybooks(args)
+        self.recordResult(result)
+        return result
+
+    def runAnsible(self, cmd, timeout, playbook, wrapped=True):
+        build = self.executor_server.job_builds[self.job.unique]
+
+        if self.executor_server._run_ansible:
+            result = super(RecordingAnsibleJob, self).runAnsible(
+                cmd, timeout, playbook, wrapped)
+        else:
+            if playbook.path:
+                result = build.run()
+            else:
+                result = (self.RESULT_NORMAL, 0)
+        return result
+
+    def getHostList(self, args):
+        self.log.debug("hostlist")
+        hosts = super(RecordingAnsibleJob, self).getHostList(args)
+        for host in hosts:
+            host['host_vars']['ansible_connection'] = 'local'
+
+        hosts.append(dict(
+            name='localhost',
+            host_vars=dict(ansible_connection='local'),
+            host_keys=[]))
+        return hosts
+
+
 class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
     """An Ansible executor to be used in tests.
 
@@ -1374,6 +1429,9 @@
         be explicitly released.
 
     """
+
+    _job_class = RecordingAnsibleJob
+
     def __init__(self, *args, **kw):
         self._run_ansible = kw.pop('_run_ansible', False)
         self._test_root = kw.pop('_test_root', False)
@@ -1428,8 +1486,7 @@
         args = json.loads(job.arguments)
         args['zuul']['_test'] = dict(test_root=self._test_root)
         job.arguments = json.dumps(args)
-        self.job_workers[job.unique] = RecordingAnsibleJob(self, job)
-        self.job_workers[job.unique].run()
+        super(RecordingExecutorServer, self).executeJob(job)
 
     def stopJob(self, job):
         self.log.debug("handle stop")
@@ -1447,63 +1504,6 @@
         super(RecordingExecutorServer, self).stop()
 
 
-class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
-    def doMergeChanges(self, merger, items, repo_state):
-        # Get a merger in order to update the repos involved in this job.
-        commit = super(RecordingAnsibleJob, self).doMergeChanges(
-            merger, items, repo_state)
-        if not commit:  # merge conflict
-            self.recordResult('MERGER_FAILURE')
-        return commit
-
-    def recordResult(self, result):
-        build = self.executor_server.job_builds[self.job.unique]
-        self.executor_server.lock.acquire()
-        self.executor_server.build_history.append(
-            BuildHistory(name=build.name, result=result, changes=build.changes,
-                         node=build.node, uuid=build.unique,
-                         ref=build.parameters['zuul']['ref'],
-                         parameters=build.parameters, jobdir=build.jobdir,
-                         pipeline=build.parameters['zuul']['pipeline'])
-        )
-        self.executor_server.running_builds.remove(build)
-        del self.executor_server.job_builds[self.job.unique]
-        self.executor_server.lock.release()
-
-    def runPlaybooks(self, args):
-        build = self.executor_server.job_builds[self.job.unique]
-        build.jobdir = self.jobdir
-
-        result = super(RecordingAnsibleJob, self).runPlaybooks(args)
-        self.recordResult(result)
-        return result
-
-    def runAnsible(self, cmd, timeout, playbook):
-        build = self.executor_server.job_builds[self.job.unique]
-
-        if self.executor_server._run_ansible:
-            result = super(RecordingAnsibleJob, self).runAnsible(
-                cmd, timeout, playbook)
-        else:
-            if playbook.path:
-                result = build.run()
-            else:
-                result = (self.RESULT_NORMAL, 0)
-        return result
-
-    def getHostList(self, args):
-        self.log.debug("hostlist")
-        hosts = super(RecordingAnsibleJob, self).getHostList(args)
-        for host in hosts:
-            host['host_vars']['ansible_connection'] = 'local'
-
-        hosts.append(dict(
-            name='localhost',
-            host_vars=dict(ansible_connection='local'),
-            host_keys=[]))
-        return hosts
-
-
 class FakeGearmanServer(gear.Server):
     """A Gearman server for use in tests.
 
@@ -1518,6 +1518,7 @@
 
     def __init__(self, use_ssl=False):
         self.hold_jobs_in_queue = False
+        self.hold_merge_jobs_in_queue = False
         if use_ssl:
             ssl_ca = os.path.join(FIXTURE_DIR, 'gearman/root-ca.pem')
             ssl_cert = os.path.join(FIXTURE_DIR, 'gearman/server.pem')
@@ -1537,6 +1538,8 @@
                 if not hasattr(job, 'waiting'):
                     if job.name.startswith(b'executor:execute'):
                         job.waiting = self.hold_jobs_in_queue
+                    elif job.name.startswith(b'merger:'):
+                        job.waiting = self.hold_merge_jobs_in_queue
                     else:
                         job.waiting = False
                 if job.waiting:
@@ -1562,10 +1565,15 @@
                 len(self.low_queue))
         self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
         for job in self.getQueue():
-            if job.name != b'executor:execute':
-                continue
-            parameters = json.loads(job.arguments.decode('utf8'))
-            if not regex or re.match(regex, parameters.get('job')):
+            match = False
+            if job.name == b'executor:execute':
+                parameters = json.loads(job.arguments.decode('utf8'))
+                if not regex or re.match(regex, parameters.get('job')):
+                    match = True
+            if job.name.startswith(b'merger:'):
+                if not regex:
+                    match = True
+            if match:
                 self.log.debug("releasing queued job %s" %
                                job.unique)
                 job.waiting = False
@@ -2039,14 +2047,9 @@
         self.config.set('executor', 'state_dir', self.executor_state_root)
 
         self.statsd = FakeStatsd()
-        # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
-        # see: https://github.com/jsocol/pystatsd/issues/61
-        os.environ['STATSD_HOST'] = '127.0.0.1'
-        os.environ['STATSD_PORT'] = str(self.statsd.port)
+        if self.config.has_section('statsd'):
+            self.config.set('statsd', 'port', str(self.statsd.port))
         self.statsd.start()
-        # the statsd client object is configured in the statsd module import
-        importlib.reload(statsd)
-        importlib.reload(zuul.scheduler)
 
         self.gearman_server = FakeGearmanServer(self.use_ssl)
 
@@ -2381,7 +2384,7 @@
         # before noticing they should exit, but they should exit on their own.
         # Further the pydevd threads also need to be whitelisted so debugging
         # e.g. in PyCharm is possible without breaking shutdown.
-        whitelist = ['executor-watchdog',
+        whitelist = ['watchdog',
                      'pydevd.CommandThread',
                      'pydevd.Reader',
                      'pydevd.Writer',
@@ -2556,6 +2559,26 @@
             return False
         return True
 
+    def areAllMergeJobsWaiting(self):
+        for client_job in list(self.merge_client.jobs):
+            if not client_job.handle:
+                self.log.debug("%s has no handle" % client_job)
+                return False
+            server_job = self.gearman_server.jobs.get(client_job.handle)
+            if not server_job:
+                self.log.debug("%s is not known to the gearman server" %
+                               client_job)
+                return False
+            if not hasattr(server_job, 'waiting'):
+                self.log.debug("%s is being enqueued" % server_job)
+                return False
+            if server_job.waiting:
+                self.log.debug("%s is waiting" % server_job)
+                continue
+            self.log.debug("%s is not waiting" % server_job)
+            return False
+        return True
+
     def eventQueuesEmpty(self):
         for event_queue in self.event_queues:
             yield event_queue.empty()
@@ -2592,7 +2615,7 @@
                 # processed
                 self.eventQueuesJoin()
                 self.sched.run_handler_lock.acquire()
-                if (not self.merge_client.jobs and
+                if (self.areAllMergeJobsWaiting() and
                     self.haveAllBuildsReported() and
                     self.areAllBuildsWaiting() and
                     self.areAllNodeRequestsComplete() and
@@ -2620,6 +2643,13 @@
                 return build
         raise Exception("Unable to find build %s" % name)
 
+    def assertJobNotInHistory(self, name, project=None):
+        for job in self.history:
+            if (project is None or
+                job.parameters['zuul']['project']['name'] == project):
+                self.assertNotEqual(job.name, name,
+                                    'Job %s found in history' % name)
+
     def getJobFromHistory(self, name, project=None):
         for job in self.history:
             if (job.name == name and
diff --git a/tests/encrypt_secret.py b/tests/encrypt_secret.py
index 0b0cf19..4bc514a 100644
--- a/tests/encrypt_secret.py
+++ b/tests/encrypt_secret.py
@@ -12,6 +12,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import base64
 import sys
 import os
 
@@ -27,8 +28,10 @@
         private_key, public_key = \
             encryption.deserialize_rsa_keypair(f.read())
 
-    ciphertext = encryption.encrypt_pkcs1_oaep(sys.argv[1], public_key)
-    print(ciphertext.encode('base64'))
+    plaintext = sys.argv[1].encode('utf-8')
+
+    ciphertext = encryption.encrypt_pkcs1_oaep(plaintext, public_key)
+    print(base64.b64encode(ciphertext).decode('utf-8'))
 
 
 if __name__ == '__main__':
diff --git a/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-logs.yaml b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-logs.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-logs.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-ssh.yaml b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-ssh.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/post-ssh.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/pre.yaml b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/pre.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/playbooks/base/pre.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
new file mode 100644
index 0000000..89d98a9
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
@@ -0,0 +1,25 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: base
+    parent: null
+    pre-run: playbooks/base/pre
+    post-run:
+      - playbooks/base/post-ssh
+      - playbooks/base/post-logs
+
+- project:
+    name: project-config
+    check:
+      jobs: []
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
new file mode 100644
index 0000000..2545208
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
@@ -0,0 +1,24 @@
+- job:
+    name: puppet-base
+    pre-run: playbooks/prepare-node-common
+
+- job:
+    name: puppet-module-base
+    parent: puppet-base
+    pre-run: playbooks/prepare-node-unit
+
+- job:
+    name: puppet-lint
+    parent: puppet-module-base
+    run: playbooks/run-lint
+
+- project-template:
+    name: puppet-check-jobs
+    check:
+      jobs:
+        - puppet-lint
+    
+- project:
+    name: puppet-integration
+    templates:
+      - puppet-check-jobs
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/README b/tests/fixtures/config/branch-variants/git/puppet-integration/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-common.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-common.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-common.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-unit.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-unit.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/prepare-node-unit.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/run-lint.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/run-lint.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/playbooks/run-lint.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/branch-variants/main.yaml b/tests/fixtures/config/branch-variants/main.yaml
new file mode 100644
index 0000000..ad2b2f6
--- /dev/null
+++ b/tests/fixtures/config/branch-variants/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - project-config
+        untrusted-projects:
+          - puppet-integration
diff --git a/tests/fixtures/config/in-repo/git/common-config/playbooks/template-job.yaml b/tests/fixtures/config/in-repo/git/common-config/playbooks/template-job.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/in-repo/git/common-config/playbooks/template-job.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/in-repo/git/common-config/zuul.yaml b/tests/fixtures/config/in-repo/git/common-config/zuul.yaml
index 5623467..a97af51 100644
--- a/tests/fixtures/config/in-repo/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/in-repo/git/common-config/zuul.yaml
@@ -76,6 +76,15 @@
 - job:
     name: common-config-test
 
+- job:
+    name: template-job
+
+- project-template:
+    name: common-config-template
+    check:
+      jobs:
+        - template-job
+
 - project:
     name: common-config
     check:
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
new file mode 100644
index 0000000..bd46718
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+  tasks:
+    - shell: echo "Failure test {{ zuul.executor.src_root }}"
+    - shell: exit 1
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index a83f0bc..f182d8d 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -20,8 +20,19 @@
     parent: base
     name: job-output
 
+- job:
+    name: job-output-failure
+    run: playbooks/job-output
+    post-run: playbooks/job-output-failure-post
+
 - project:
     name: org/project
     check:
       jobs:
         - job-output
+
+- project:
+    name: org/project2
+    check:
+      jobs:
+        - job-output-failure
diff --git a/tests/fixtures/config/job-output/git/org_project2/README b/tests/fixtures/config/job-output/git/org_project2/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project2/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
index 208e274..14b382f 100644
--- a/tests/fixtures/config/job-output/main.yaml
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -6,3 +6,4 @@
           - common-config
         untrusted-projects:
           - org/project
+          - org/project2
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
new file mode 100644
index 0000000..40b3ca7
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
@@ -0,0 +1,13 @@
+- hosts: all
+  tasks:
+    - debug: var=waitpath
+    - file:
+        path: "{{zuul._test.test_root}}/{{zuul.build}}.post_start.flag"
+        state: touch
+    # Do not finish until test creates the flag file
+    - wait_for:
+        state: present
+        path: "{{waitpath}}"
+    - file:
+        path: "{{zuul._test.test_root}}/{{zuul.build}}.post_end.flag"
+        state: touch
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
new file mode 100644
index 0000000..92a5515
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
@@ -0,0 +1,24 @@
+- pipeline:
+    name: check
+    manager: independent
+    post-review: true
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    name: python27
+    pre-run: playbooks/pre
+    post-run: playbooks/post
+    vars:
+      waitpath: '{{zuul._test.test_root}}/{{zuul.build}}/test_wait'
diff --git a/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
new file mode 100644
index 0000000..89a5674
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
@@ -0,0 +1,5 @@
+- project:
+    name: org/project
+    check:
+      jobs:
+        - python27
diff --git a/tests/fixtures/config/post-playbook/git/org_project/README b/tests/fixtures/config/post-playbook/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/post-playbook/main.yaml b/tests/fixtures/config/post-playbook/main.yaml
new file mode 100644
index 0000000..6033879
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/main.yaml
@@ -0,0 +1,9 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - common-config
+        untrusted-projects:
+          - org/project
+
diff --git a/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml b/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
index 14f43f4..2160ef9 100644
--- a/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
@@ -39,6 +39,7 @@
       gerrit:
         - event: ref-updated
           ref: ^(?!refs/).*$
+    precedence: low
 
 - job:
     name: base
diff --git a/tests/fixtures/config/sql-driver/git/common-config/playbooks/project-test3.yaml b/tests/fixtures/config/sql-driver/git/common-config/playbooks/project-test3.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/sql-driver/git/common-config/playbooks/project-test3.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml b/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml
index b8f4d67..8fce9e7 100644
--- a/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/sql-driver/git/common-config/zuul.yaml
@@ -27,6 +27,9 @@
 - job:
     name: project-test2
 
+- job:
+    name: project-test3
+
 - project:
     name: org/project
     check:
@@ -36,3 +39,9 @@
             dependencies: project-merge
         - project-test2:
             dependencies: project-merge
+        # Make sure we have a "SKIPPED" result
+        - project-test3:
+            dependencies: project-test1
+        # The noop job can have timing quirks
+        - noop:
+            dependencies: project-test2
diff --git a/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/project.yaml b/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/project.yaml
new file mode 100644
index 0000000..6d1892c
--- /dev/null
+++ b/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/project.yaml
@@ -0,0 +1,4 @@
+- project:
+    name: untrusted-config
+    templates:
+      - test-one-and-two
diff --git a/tests/fixtures/config/templated-project/git/common-config/zuul.d/templates.yaml b/tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/templates.yaml
similarity index 100%
rename from tests/fixtures/config/templated-project/git/common-config/zuul.d/templates.yaml
rename to tests/fixtures/config/templated-project/git/untrusted-config/zuul.d/templates.yaml
diff --git a/tests/fixtures/config/templated-project/main.yaml b/tests/fixtures/config/templated-project/main.yaml
index e59b396..bb59838 100644
--- a/tests/fixtures/config/templated-project/main.yaml
+++ b/tests/fixtures/config/templated-project/main.yaml
@@ -5,5 +5,6 @@
         config-projects:
           - common-config
         untrusted-projects:
+          - untrusted-config
           - org/templated-project
           - org/layered-project
diff --git a/tests/fixtures/fake_git.sh b/tests/fixtures/fake_git.sh
new file mode 100755
index 0000000..5b787b7
--- /dev/null
+++ b/tests/fixtures/fake_git.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+echo $*
+case "$1" in
+    clone)
+        dest=$3
+        mkdir -p $dest/.git
+        ;;
+    version)
+        echo "git version 1.0.0"
+        exit 0
+        ;;
+esac
+sleep 30
diff --git a/tests/fixtures/layouts/delayed-repo-init.yaml b/tests/fixtures/layouts/delayed-repo-init.yaml
index e97d37a..c89e2fa 100644
--- a/tests/fixtures/layouts/delayed-repo-init.yaml
+++ b/tests/fixtures/layouts/delayed-repo-init.yaml
@@ -67,7 +67,7 @@
             dependencies: project-merge
     gate:
       jobs:
-        - project-merge:
+        - project-merge
         - project-test1:
             dependencies: project-merge
         - project-test2:
diff --git a/tests/fixtures/layouts/job-vars.yaml b/tests/fixtures/layouts/job-vars.yaml
new file mode 100644
index 0000000..22fc5c2
--- /dev/null
+++ b/tests/fixtures/layouts/job-vars.yaml
@@ -0,0 +1,75 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    name: parentjob
+    parent: base
+    required-projects:
+      - org/project0
+    vars:
+      override: 0
+      child1override: 0
+      parent: 0
+
+- job:
+    name: child1
+    parent: parentjob
+    required-projects:
+      - org/project1
+    vars:
+      override: 1
+      child1override: 1
+      child1: 1
+
+- job:
+    name: child2
+    parent: parentjob
+    required-projects:
+      - org/project2
+    vars:
+      override: 2
+      child2: 2
+
+- job:
+    name: child3
+    parent: parentjob
+
+- project:
+    name: org/project
+    check:
+      jobs:
+        - parentjob
+        - child1
+        - child2
+        - child3:
+            required-projects:
+              - org/project3
+            vars:
+              override: 3
+              child3: 3
+
+- project:
+    name: org/project0
+
+- project:
+    name: org/project1
+
+- project:
+    name: org/project2
+
+- project:
+    name: org/project3
diff --git a/tests/fixtures/layouts/matcher-test.yaml b/tests/fixtures/layouts/matcher-test.yaml
new file mode 100644
index 0000000..b511a2f
--- /dev/null
+++ b/tests/fixtures/layouts/matcher-test.yaml
@@ -0,0 +1,63 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- pipeline:
+    name: gate
+    manager: dependent
+    success-message: Build succeeded (gate).
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - Approved: 1
+    success:
+      gerrit:
+        Verified: 2
+        submit: true
+    failure:
+      gerrit:
+        Verified: -2
+    start:
+      gerrit:
+        Verified: 0
+    precedence: high
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    name: project-test1
+    nodeset:
+      nodes:
+        - name: controller
+          label: label1
+
+- job:
+    name: ignore-branch
+    branches: ^(?!featureA).*$
+    nodeset:
+      nodes:
+        - name: controller
+          label: label2
+
+- project:
+    name: org/project
+    check:
+      jobs:
+        - project-test1
+        - ignore-branch
+    gate:
+      jobs:
+        - project-test1
+        - ignore-branch
diff --git a/tests/fixtures/layouts/multiple-templates.yaml b/tests/fixtures/layouts/multiple-templates.yaml
new file mode 100644
index 0000000..7272cad
--- /dev/null
+++ b/tests/fixtures/layouts/multiple-templates.yaml
@@ -0,0 +1,44 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    name: py27
+
+- project-template:
+    name: python-jobs
+    check:
+      jobs:
+        - py27
+
+- project-template:
+    name: python-trusty-jobs
+    check:
+      jobs:
+        - py27:
+            tags:
+              - trusty
+
+- project:
+    name: org/project1
+    templates:
+      - python-jobs
+      - python-trusty-jobs
+
+- project:
+    name: org/project2
+    templates:
+      - python-jobs
diff --git a/tests/fixtures/zuul-executor-hostname.conf b/tests/fixtures/zuul-executor-hostname.conf
new file mode 100644
index 0000000..8199aba
--- /dev/null
+++ b/tests/fixtures/zuul-executor-hostname.conf
@@ -0,0 +1,32 @@
+[gearman]
+server=127.0.0.1
+
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
+[scheduler]
+tenant_config=main.yaml
+
+[merger]
+git_dir=/tmp/zuul-test/merger-git
+git_user_email=zuul@example.com
+git_user_name=zuul
+
+[executor]
+git_dir=/tmp/zuul-test/executor-git
+hostname=test-executor-hostname.example.com
+
+[connection gerrit]
+driver=gerrit
+server=review.example.com
+user=jenkins
+sshkey=fake_id_rsa_path
+
+[connection smtp]
+driver=smtp
+server=localhost
+port=25
+default_from=zuul@example.com
+default_to=you@example.com
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index 7bc8c59..e6f997c 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -1,6 +1,11 @@
 [gearman]
 server=127.0.0.1
 
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
 [scheduler]
 tenant_config=main.yaml
 
diff --git a/tests/print_layout.py b/tests/print_layout.py
index a295886..055270f 100644
--- a/tests/print_layout.py
+++ b/tests/print_layout.py
@@ -59,6 +59,14 @@
             if fn in ['zuul.yaml', '.zuul.yaml']:
                 print_file('File: ' + os.path.join(gitrepo, fn),
                            os.path.join(reporoot, fn))
+        for subdir in ['.zuul.d', 'zuul.d']:
+            zuuld = os.path.join(reporoot, subdir)
+            if not os.path.exists(zuuld):
+                continue
+            filenames = os.listdir(zuuld)
+            for fn in filenames:
+                print_file('File: ' + os.path.join(gitrepo, subdir, fn),
+                           os.path.join(zuuld, fn))
 
 
 if __name__ == '__main__':
diff --git a/tests/unit/test_change_matcher.py b/tests/unit/test_change_matcher.py
index 6b161a1..3d5345f 100644
--- a/tests/unit/test_change_matcher.py
+++ b/tests/unit/test_change_matcher.py
@@ -60,7 +60,7 @@
         self.assertTrue(self.matcher.matches(self.change))
 
     def test_matches_returns_true_on_matching_ref(self):
-        self.change.branch = 'bar'
+        delattr(self.change, 'branch')
         self.change.ref = 'foo'
         self.assertTrue(self.matcher.matches(self.change))
 
@@ -69,11 +69,6 @@
         self.change.ref = 'baz'
         self.assertFalse(self.matcher.matches(self.change))
 
-    def test_matches_returns_false_for_missing_attrs(self):
-        delattr(self.change, 'branch')
-        # ref is by default not an attribute
-        self.assertFalse(self.matcher.matches(self.change))
-
 
 class TestFileMatcher(BaseTestMatcher):
 
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index 4214e9f..719f307 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -69,11 +69,12 @@
         insp = sa.engine.reflection.Inspector(
             self.connections.connections['resultsdb'].engine)
 
-        self.assertEqual(10, len(insp.get_columns(buildset_table)))
+        self.assertEqual(13, len(insp.get_columns(buildset_table)))
         self.assertEqual(10, len(insp.get_columns(build_table)))
 
     def test_sql_results(self):
         "Test results are entered into an sql table"
+        self.executor_server.hold_jobs_in_build = True
         # Grab the sa tables
         tenant = self.sched.abide.tenants.get('tenant-one')
         reporter = _get_reporter_from_connection_name(
@@ -85,6 +86,8 @@
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
         self.waitUntilSettled()
+        self.orderedRelease()
+        self.waitUntilSettled()
 
         # Add a failed result
         B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@@ -92,6 +95,8 @@
         self.executor_server.failJob('project-test1', B)
         self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
         self.waitUntilSettled()
+        self.orderedRelease()
+        self.waitUntilSettled()
 
         conn = self.connections.connections['resultsdb'].engine.connect()
         result = conn.execute(
@@ -109,6 +114,8 @@
         self.assertEqual('SUCCESS', buildset0['result'])
         self.assertEqual('Build succeeded.', buildset0['message'])
         self.assertEqual('tenant-one', buildset0['tenant'])
+        self.assertEqual('https://hostname/%d' % buildset0['change'],
+                         buildset0['ref_url'])
 
         buildset0_builds = conn.execute(
             sa.sql.select([reporter.connection.zuul_build_table]).
@@ -141,15 +148,15 @@
             )
         ).fetchall()
 
-        # Check the second last result, which should be the project-test1 job
+        # Check the second result, which should be the project-test1 job
         # which failed
-        self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
-        self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
+        self.assertEqual('project-test1', buildset1_builds[1]['job_name'])
+        self.assertEqual("FAILURE", buildset1_builds[1]['result'])
         self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
                 hostname=self.executor_server.hostname,
-                uuid=buildset1_builds[-2]['uuid']),
-            buildset1_builds[-2]['log_url'])
+                uuid=buildset1_builds[1]['uuid']),
+            buildset1_builds[1]['log_url'])
 
     def test_multiple_sql_connections(self):
         "Test putting results in different databases"
diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py
index 9c45645..f051ec4 100755
--- a/tests/unit/test_executor.py
+++ b/tests/unit/test_executor.py
@@ -401,3 +401,12 @@
         node['ssh_port'] = 22022
         keys = self.test_job.getHostList({'nodes': [node]})[0]['host_keys']
         self.assertEqual(keys[0], '[localhost]:22022 fake-host-key')
+
+
+class TestExecutorHostname(ZuulTestCase):
+    config_file = 'zuul-executor-hostname.conf'
+    tenant_config_file = 'config/single-tenant/main.yaml'
+
+    def test_executor_hostname(self):
+        self.assertEqual('test-executor-hostname.example.com',
+                         self.executor_server.hostname)
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 8aafabf..ec30a2b 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -19,9 +19,10 @@
 import os
 
 import git
+import testtools
 
 from zuul.merger.merger import Repo
-from tests.base import ZuulTestCase
+from tests.base import ZuulTestCase, FIXTURE_DIR
 
 
 class TestMergerRepo(ZuulTestCase):
@@ -49,7 +50,7 @@
             msg='.git file in submodule should be a file')
 
         work_repo = Repo(parent_path, self.workspace_root,
-                         'none@example.org', 'User Name')
+                         'none@example.org', 'User Name', '0', '0')
         self.assertTrue(
             os.path.isdir(os.path.join(self.workspace_root, 'subdir')),
             msg='Cloned repository has a submodule placeholder directory')
@@ -60,7 +61,7 @@
         sub_repo = Repo(
             os.path.join(self.upstream_root, 'org/project2'),
             os.path.join(self.workspace_root, 'subdir'),
-            'none@example.org', 'User Name')
+            'none@example.org', 'User Name', '0', '0')
         self.assertTrue(os.path.exists(
             os.path.join(self.workspace_root, 'subdir', '.git')),
             msg='Cloned over the submodule placeholder')
@@ -74,3 +75,28 @@
             os.path.join(self.upstream_root, 'org/project2'),
             sub_repo.createRepoObject().remotes[0].url,
             message="Sub repository points to upstream project2")
+
+    def test_clone_timeout(self):
+        parent_path = os.path.join(self.upstream_root, 'org/project1')
+        self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+                   os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+        work_repo = Repo(parent_path, self.workspace_root,
+                         'none@example.org', 'User Name', '0', '0',
+                         git_timeout=0.001)
+        # TODO: have the merger and repo classes catch fewer
+        # exceptions, including this one on initialization.  For the
+        # test, we try cloning again.
+        with testtools.ExpectedException(git.exc.GitCommandError,
+                                         '.*exit code\(-9\)'):
+            work_repo._ensure_cloned()
+
+    def test_fetch_timeout(self):
+        parent_path = os.path.join(self.upstream_root, 'org/project1')
+        work_repo = Repo(parent_path, self.workspace_root,
+                         'none@example.org', 'User Name', '0', '0')
+        work_repo.git_timeout = 0.001
+        self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+                   os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+        with testtools.ExpectedException(git.exc.GitCommandError,
+                                         '.*exit code\(-9\)'):
+            work_repo.update()
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index c457ff0..628a45c 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -246,7 +246,11 @@
         })
         layout.addJob(python27essex)
 
-        project_config = configloader.ProjectParser.fromYaml(tenant, layout, [{
+        project_template_parser = configloader.ProjectTemplateParser(
+            tenant, layout)
+        project_parser = configloader.ProjectParser(
+            tenant, layout, project_template_parser)
+        project_config = project_parser.fromYaml([{
             '_source_context': self.context,
             '_start_mark': self.start_mark,
             'name': 'project',
@@ -262,7 +266,7 @@
         # Test master
         change.branch = 'master'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -291,7 +295,7 @@
         # Test diablo
         change.branch = 'stable/diablo'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -321,7 +325,7 @@
         # Test essex
         change.branch = 'stable/essex'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -505,6 +509,7 @@
     def test_job_inheritance_job_tree(self):
         tenant = model.Tenant('tenant')
         layout = model.Layout(tenant)
+
         tpc = model.TenantProjectConfig(self.project)
         tenant.addUntrustedProject(tpc)
 
@@ -539,7 +544,11 @@
         })
         layout.addJob(python27diablo)
 
-        project_config = configloader.ProjectParser.fromYaml(tenant, layout, [{
+        project_template_parser = configloader.ProjectTemplateParser(
+            tenant, layout)
+        project_parser = configloader.ProjectParser(
+            tenant, layout, project_template_parser)
+        project_config = project_parser.fromYaml([{
             '_source_context': self.context,
             '_start_mark': self.start_mark,
             'name': 'project',
@@ -554,7 +563,7 @@
         change = model.Change(self.project)
         change.branch = 'master'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -568,7 +577,7 @@
 
         change.branch = 'stable/diablo'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -609,7 +618,11 @@
         })
         layout.addJob(python27)
 
-        project_config = configloader.ProjectParser.fromYaml(tenant, layout, [{
+        project_template_parser = configloader.ProjectTemplateParser(
+            tenant, layout)
+        project_parser = configloader.ProjectParser(
+            tenant, layout, project_template_parser)
+        project_config = project_parser.fromYaml([{
             '_source_context': self.context,
             '_start_mark': self.start_mark,
             'name': 'project',
@@ -625,7 +638,7 @@
         change.branch = 'master'
         change.files = ['/COMMIT_MSG', 'ignored-file']
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertFalse(python27.changeMatches(change))
@@ -682,8 +695,12 @@
         context2 = model.SourceContext(project2, 'master',
                                        'test', True)
 
-        project2_config = configloader.ProjectParser.fromYaml(
-            self.tenant, self.layout, [{
+        project_template_parser = configloader.ProjectTemplateParser(
+            self.tenant, self.layout)
+        project_parser = configloader.ProjectParser(
+            self.tenant, self.layout, project_template_parser)
+        project2_config = project_parser.fromYaml(
+            [{
                 '_source_context': context2,
                 '_start_mark': self.start_mark,
                 'name': 'project2',
@@ -700,7 +717,7 @@
         # Test master
         change.branch = 'master'
         item = self.queue.enqueueChange(change)
-        item.current_build_set.layout = self.layout
+        item.layout = self.layout
         with testtools.ExpectedException(
                 Exception,
                 "Project project2 is not allowed to run job job"):
@@ -718,8 +735,12 @@
 
         self.layout.addJob(job)
 
-        project_config = configloader.ProjectParser.fromYaml(
-            self.tenant, self.layout, [{
+        project_template_parser = configloader.ProjectTemplateParser(
+            self.tenant, self.layout)
+        project_parser = configloader.ProjectParser(
+            self.tenant, self.layout, project_template_parser)
+        project_config = project_parser.fromYaml(
+            [{
                 '_source_context': self.context,
                 '_start_mark': self.start_mark,
                 'name': 'project',
@@ -736,7 +757,7 @@
         # Test master
         change.branch = 'master'
         item = self.queue.enqueueChange(change)
-        item.current_build_set.layout = self.layout
+        item.layout = self.layout
         with testtools.ExpectedException(
                 Exception,
                 "Pre-review pipeline gate does not allow post-review job"):
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d3f9ddb 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -29,6 +29,7 @@
     def setUp(self):
         super(TestNodepool, self).setUp()
 
+        self.statsd = None
         self.zk_chroot_fixture = self.useFixture(
             ChrootedKazooFixture(self.id()))
         self.zk_config = '%s:%s%s' % (
@@ -76,7 +77,7 @@
         self.assertEqual(request.state, 'fulfilled')
 
         # Accept the nodes
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request.id)
         nodeset = request.nodeset
 
         for node in nodeset.getNodes():
@@ -125,3 +126,47 @@
 
         self.waitForRequests()
         self.assertEqual(len(self.provisioned_requests), 0)
+
+    def test_accept_nodes_resubmitted(self):
+        # Test that a resubmitted request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        # Accept the nodes, passing a different ID
+        self.nodepool.acceptNodes(request, "invalid")
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
+
+    def test_accept_nodes_lost_request(self):
+        # Test that a lost request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        self.zk.deleteNodeRequest(request)
+
+        # Accept the nodes
+        self.nodepool.acceptNodes(request, request.id)
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 2dcd9bf..3608ef0 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -89,25 +89,44 @@
         self.assertEqual(self.getJobFromHistory('project-test2').node,
                          'label1')
 
+        for stat in self.statsd.stats:
+            k, v = stat.decode('utf-8').split(':')
+            self.log.debug('stat %s:%s', k, v)
         # TODOv3(jeblair): we may want to report stats by tenant (also?).
         # Per-driver
         self.assertReportedStat('zuul.event.gerrit.comment-added', value='1|c')
         # Per-driver per-connection
         self.assertReportedStat('zuul.event.gerrit.gerrit.comment-added',
                                 value='1|c')
-        self.assertReportedStat('zuul.pipeline.gate.current_changes',
-                                value='1|g')
-        self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
-                                kind='ms')
-        self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
-                                value='1|c')
-        self.assertReportedStat('zuul.pipeline.gate.resident_time', kind='ms')
-        self.assertReportedStat('zuul.pipeline.gate.total_changes',
-                                value='1|c')
         self.assertReportedStat(
-            'zuul.pipeline.gate.org.project.resident_time', kind='ms')
+            'zuul.tenant.tenant-one.pipeline.gate.current_changes',
+            value='1|g')
         self.assertReportedStat(
-            'zuul.pipeline.gate.org.project.total_changes', value='1|c')
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.job.project-merge.SUCCESS', kind='ms')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.job.project-merge.SUCCESS', value='1|c')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.resident_time', kind='ms')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.total_changes', value='1|c')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.resident_time', kind='ms')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.total_changes', value='1|c')
+        exec_key = 'zuul.executor.%s' % self.executor_server.hostname
+        self.assertReportedStat(exec_key + '.builds', value='1|c')
+        self.assertReportedStat('zuul.nodepool.requested', value='1|c')
+        self.assertReportedStat('zuul.nodepool.requested.label.label1',
+                                value='1|c')
+        self.assertReportedStat('zuul.nodepool.fulfilled.label.label1',
+                                value='1|c')
+        self.assertReportedStat('zuul.nodepool.requested.size.1', value='1|c')
+        self.assertReportedStat('zuul.nodepool.fulfilled.size.1', value='1|c')
+        self.assertReportedStat('zuul.nodepool.current_requests', value='1|g')
 
         for build in self.history:
             self.assertTrue(build.parameters['zuul']['voting'])
@@ -1494,6 +1513,31 @@
                 held_nodes += 1
         self.assertEqual(held_nodes, 1)
 
+    @simple_layout('layouts/autohold.yaml')
+    def test_autohold_list(self):
+        client = zuul.rpcclient.RPCClient('127.0.0.1',
+                                          self.gearman_server.port)
+        self.addCleanup(client.shutdown)
+
+        r = client.autohold('tenant-one', 'org/project', 'project-test2',
+                            "reason text", 1)
+        self.assertTrue(r)
+
+        autohold_requests = client.autohold_list()
+        self.assertNotEqual({}, autohold_requests)
+        self.assertEqual(1, len(autohold_requests.keys()))
+
+        # The single dict key should be a CSV string value
+        key = list(autohold_requests.keys())[0]
+        tenant, project, job = key.split(',')
+
+        self.assertEqual('tenant-one', tenant)
+        self.assertIn('org/project', project)
+        self.assertEqual('project-test2', job)
+
+        # Note: the value is converted from set to list by json.
+        self.assertEqual([1, "reason text"], autohold_requests[key])
+
     @simple_layout('layouts/three-projects.yaml')
     def test_dependent_behind_dequeue(self):
         # This particular test does a large amount of merges and needs a little
@@ -2143,8 +2187,7 @@
 
     def test_statsd(self):
         "Test each of the statsd methods used in the scheduler"
-        import extras
-        statsd = extras.try_import('statsd.statsd')
+        statsd = self.sched.statsd
         statsd.incr('test-incr')
         statsd.timing('test-timing', 3)
         statsd.gauge('test-gauge', 12)
@@ -2269,6 +2312,58 @@
         self.assertEqual(set(['project-test-nomatch-starts-empty',
                               'project-test-nomatch-starts-full']), run_jobs)
 
+    @simple_layout('layouts/job-vars.yaml')
+    def test_inherited_job_variables(self):
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='parentjob', result='SUCCESS'),
+            dict(name='child1', result='SUCCESS'),
+            dict(name='child2', result='SUCCESS'),
+            dict(name='child3', result='SUCCESS'),
+        ], ordered=False)
+        j = self.getJobFromHistory('parentjob')
+        rp = set([p['name'] for p in j.parameters['projects']])
+        self.assertEqual(j.parameters['vars']['override'], 0)
+        self.assertEqual(j.parameters['vars']['child1override'], 0)
+        self.assertEqual(j.parameters['vars']['parent'], 0)
+        self.assertFalse('child1' in j.parameters['vars'])
+        self.assertFalse('child2' in j.parameters['vars'])
+        self.assertFalse('child3' in j.parameters['vars'])
+        self.assertEqual(rp, set(['org/project', 'org/project0',
+                                  'org/project0']))
+        j = self.getJobFromHistory('child1')
+        rp = set([p['name'] for p in j.parameters['projects']])
+        self.assertEqual(j.parameters['vars']['override'], 1)
+        self.assertEqual(j.parameters['vars']['child1override'], 1)
+        self.assertEqual(j.parameters['vars']['parent'], 0)
+        self.assertEqual(j.parameters['vars']['child1'], 1)
+        self.assertFalse('child2' in j.parameters['vars'])
+        self.assertFalse('child3' in j.parameters['vars'])
+        self.assertEqual(rp, set(['org/project', 'org/project0',
+                                  'org/project1']))
+        j = self.getJobFromHistory('child2')
+        rp = set([p['name'] for p in j.parameters['projects']])
+        self.assertEqual(j.parameters['vars']['override'], 2)
+        self.assertEqual(j.parameters['vars']['child1override'], 0)
+        self.assertEqual(j.parameters['vars']['parent'], 0)
+        self.assertFalse('child1' in j.parameters['vars'])
+        self.assertEqual(j.parameters['vars']['child2'], 2)
+        self.assertFalse('child3' in j.parameters['vars'])
+        self.assertEqual(rp, set(['org/project', 'org/project0',
+                                  'org/project2']))
+        j = self.getJobFromHistory('child3')
+        rp = set([p['name'] for p in j.parameters['projects']])
+        self.assertEqual(j.parameters['vars']['override'], 3)
+        self.assertEqual(j.parameters['vars']['child1override'], 0)
+        self.assertEqual(j.parameters['vars']['parent'], 0)
+        self.assertFalse('child1' in j.parameters['vars'])
+        self.assertFalse('child2' in j.parameters['vars'])
+        self.assertEqual(j.parameters['vars']['child3'], 3)
+        self.assertEqual(rp, set(['org/project', 'org/project0',
+                                  'org/project3']))
+
     def test_queue_names(self):
         "Test shared change queue names"
         tenant = self.sched.abide.tenants.get('tenant-one')
@@ -2411,6 +2506,28 @@
         self.assertIn('project-merge', status_jobs[1]['dependencies'])
         self.assertIn('project-merge', status_jobs[2]['dependencies'])
 
+    def test_reconfigure_merge(self):
+        """Test that two reconfigure events are merged"""
+
+        tenant = self.sched.abide.tenants['tenant-one']
+        (trusted, project) = tenant.getProject('org/project')
+
+        self.sched.run_handler_lock.acquire()
+        self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
+        self.sched.reconfigureTenant(tenant, project)
+        self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+        self.sched.reconfigureTenant(tenant, project)
+        # The second event should have been combined with the first
+        # so we should still only have one entry.
+        self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+        self.sched.run_handler_lock.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
     def test_live_reconfiguration(self):
         "Test that live reconfiguration works"
         self.executor_server.hold_jobs_in_build = True
@@ -4544,6 +4661,50 @@
         self.assertEqual(B.data['status'], 'MERGED')
         self.assertEqual(B.reported, 2)
 
+    def test_job_aborted(self):
+        "Test that if a execute server aborts a job, it is run again"
+        self.executor_server.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.executor_server.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 2)
+
+        # first abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        # second abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        # third abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        # fourth abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.history), 7)
+        self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 4)
+        self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
+
     def test_rerun_on_abort(self):
         "Test that if a execute server fails to run a job, it is run again"
 
@@ -4613,6 +4774,78 @@
         self.assertIn('project-test1 : SKIPPED', A.messages[1])
         self.assertIn('project-test2 : SKIPPED', A.messages[1])
 
+    def test_nodepool_priority(self):
+        "Test that nodes are requested at the correct priority"
+
+        self.fake_nodepool.paused = True
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
+
+        B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+
+        C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
+        C.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
+
+        self.waitUntilSettled()
+
+        reqs = self.fake_nodepool.getNodeRequests()
+
+        # The requests come back sorted by oid. Since we have three requests
+        # for the three changes each with a different priority.
+        # Also they get a serial number based on order they were received
+        # so the number on the endof the oid should map to order submitted.
+
+        # * gate first - high priority - change C
+        self.assertEqual(reqs[0]['_oid'], '100-0000000002')
+        self.assertEqual(reqs[0]['node_types'], ['label1'])
+        # * check second - normal priority - change B
+        self.assertEqual(reqs[1]['_oid'], '200-0000000001')
+        self.assertEqual(reqs[1]['node_types'], ['label1'])
+        # * post third - low priority - change A
+        # additionally, the post job defined uses an ubuntu-xenial node,
+        # so we include that check just as an extra verification
+        self.assertEqual(reqs[2]['_oid'], '300-0000000000')
+        self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
+
+        self.fake_nodepool.paused = False
+        self.waitUntilSettled()
+
+    @simple_layout('layouts/multiple-templates.yaml')
+    def test_multiple_project_templates(self):
+        # Test that applying multiple project templates to a project
+        # doesn't alter them when used for a second project.
+        A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        build = self.getJobFromHistory('py27')
+        self.assertEqual(build.parameters['zuul']['jobtags'], [])
+
+    def test_pending_merge_in_reconfig(self):
+        # Test that if we are waiting for an outstanding merge on
+        # reconfiguration that we continue to do so.
+        self.gearman_server.hold_merge_jobs_in_queue = True
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+        A.setMerged()
+        self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
+        self.waitUntilSettled()
+        # Reconfigure while we still have an outstanding merge job
+        self.sched.reconfigureTenant(self.sched.abide.tenants['tenant-one'],
+                                     None)
+        self.waitUntilSettled()
+        # Verify the merge job is still running and that the item is
+        # in the pipeline
+        self.assertEqual(len(self.sched.merger.jobs), 1)
+        tenant = self.sched.abide.tenants.get('tenant-one')
+        pipeline = tenant.layout.pipelines['post']
+        self.assertEqual(len(pipeline.getAllItems()), 1)
+        self.gearman_server.hold_merge_jobs_in_queue = False
+        self.gearman_server.release()
+        self.waitUntilSettled()
+
 
 class TestExecutor(ZuulTestCase):
     tenant_config_file = 'config/single-tenant/main.yaml'
@@ -4830,6 +5063,42 @@
         self.assertEqual(self.getJobFromHistory('project-test6').result,
                          'SUCCESS')
 
+    def test_unimplied_branch_matchers(self):
+        # This tests that there are no implied branch matchers added
+        # by project templates.
+        self.create_branch('org/layered-project', 'stable')
+
+        A = self.fake_gerrit.addFakeChange(
+            'org/layered-project', 'stable', 'A')
+
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(self.getJobFromHistory('project-test1').result,
+                         'SUCCESS')
+        print(self.getJobFromHistory('project-test1').
+              parameters['zuul']['_inheritance_path'])
+
+    def test_implied_branch_matchers(self):
+        # This tests that there is an implied branch matcher when a
+        # template is used on an in-repo project pipeline definition.
+        self.create_branch('untrusted-config', 'stable')
+        self.fake_gerrit.addEvent(
+            self.fake_gerrit.getFakeBranchCreatedEvent(
+                'untrusted-config', 'stable'))
+        self.waitUntilSettled()
+
+        A = self.fake_gerrit.addFakeChange(
+            'untrusted-config', 'stable', 'A')
+
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(self.getJobFromHistory('project-test1').result,
+                         'SUCCESS')
+        print(self.getJobFromHistory('project-test1').
+              parameters['zuul']['_inheritance_path'])
+
 
 class TestSchedulerSuccessURL(ZuulTestCase):
     tenant_config_file = 'config/success-url/main.yaml'
@@ -5503,7 +5772,7 @@
                 queue = queue_candidate
                 break
         queue_item = queue.queue[0]
-        item_dynamic_layout = queue_item.current_build_set.layout
+        item_dynamic_layout = queue_item.layout
         dynamic_test_semaphore = \
             item_dynamic_layout.semaphores.get('test-semaphore')
         self.assertEqual(dynamic_test_semaphore.max, 1)
@@ -5556,3 +5825,30 @@
         self.assertEqual(A.reported, 2)
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
+
+
+class TestSchedulerBranchMatcher(ZuulTestCase):
+
+    @simple_layout('layouts/matcher-test.yaml')
+    def test_job_branch_ignored(self):
+        '''
+        Test that branch matching logic works.
+
+        The 'ignore-branch' job has a branch matcher that is supposed to
+        match every branch except for the 'featureA' branch, so it should
+        not be run on a change to that branch.
+        '''
+        self.create_branch('org/project', 'featureA')
+        A = self.fake_gerrit.addFakeChange('org/project', 'featureA', 'A')
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.printHistory()
+        self.assertEqual(self.getJobFromHistory('project-test1').result,
+                         'SUCCESS')
+        self.assertJobNotInHistory('ignore-branch')
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.assertEqual(A.reported, 2,
+                         "A should report start and success")
+        self.assertIn('gate', A.messages[1],
+                      "A should transit gate")
diff --git a/tests/unit/test_stack_dump.py b/tests/unit/test_stack_dump.py
index 824e04c..13e83c6 100644
--- a/tests/unit/test_stack_dump.py
+++ b/tests/unit/test_stack_dump.py
@@ -31,4 +31,5 @@
 
         zuul.cmd.stack_dump_handler(signal.SIGUSR2, None)
         self.assertIn("Thread", self.log_fixture.output)
+        self.assertIn("MainThread", self.log_fixture.output)
         self.assertIn("test_stack_dump_logs", self.log_fixture.output)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index cfb9885..b4702f9 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,9 +14,14 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import io
 import json
+import logging
 import os
 import textwrap
+import gc
+import time
+from unittest import skip
 
 import testtools
 
@@ -151,6 +156,29 @@
         self.assertIn('Unable to inherit from final job', A.messages[0])
 
 
+class TestBranchVariants(ZuulTestCase):
+    tenant_config_file = 'config/branch-variants/main.yaml'
+
+    def test_branch_variants(self):
+        # Test branch variants of jobs with inheritance
+        self.executor_server.hold_jobs_in_build = True
+        # This creates a new branch with a copy of the config in master
+        self.create_branch('puppet-integration', 'stable')
+        self.fake_gerrit.addEvent(
+            self.fake_gerrit.getFakeBranchCreatedEvent(
+                'puppet-integration', 'stable'))
+        self.waitUntilSettled()
+
+        A = self.fake_gerrit.addFakeChange('puppet-integration', 'stable', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds[0].parameters['pre_playbooks']), 3)
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
+
 class TestInRepoConfig(ZuulTestCase):
     # A temporary class to hold new tests while others are disabled
 
@@ -170,6 +198,39 @@
         self.assertIn('tenant-one-gate', A.messages[1],
                       "A should transit tenant-one gate")
 
+    @skip("This test is useful, but not reliable")
+    def test_full_and_dynamic_reconfig(self):
+        self.executor_server.hold_jobs_in_build = True
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+
+            - project:
+                name: org/project
+                tenant-one-gate:
+                  jobs:
+                    - project-test1
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
+        gc.collect()
+        pipelines = [obj for obj in gc.get_objects()
+                     if isinstance(obj, zuul.model.Pipeline)]
+        self.assertEqual(len(pipelines), 4)
+
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
     def test_dynamic_config(self):
         in_repo_conf = textwrap.dedent(
             """
@@ -222,6 +283,31 @@
             dict(name='project-test2', result='SUCCESS', changes='1,1'),
             dict(name='project-test2', result='SUCCESS', changes='2,1')])
 
+    def test_dynamic_template(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+
+            - project-template:
+                name: common-config-template
+                check:
+                  jobs:
+                    - project-test1
+
+            - project:
+                name: org/project
+                templates: [common-config-template]
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='template-job', result='SUCCESS', changes='1,1')])
+
     def test_dynamic_config_non_existing_job(self):
         """Test that requesting a non existent job fails"""
         in_repo_conf = textwrap.dedent(
@@ -696,6 +782,47 @@
         self.assertIn('the only project definition permitted', A.messages[0],
                       "A should have a syntax error reported")
 
+    def test_untrusted_depends_on_trusted(self):
+        with open(os.path.join(FIXTURE_DIR,
+                               'config/in-repo/git/',
+                               'common-config/zuul.yaml')) as f:
+            common_config = f.read()
+
+        common_config += textwrap.dedent(
+            """
+            - job:
+                name: project-test9
+            """)
+
+        file_dict = {'zuul.yaml': common_config}
+        A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+                                           files=file_dict)
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project:
+                name: org/project
+                check:
+                  jobs:
+                    - project-test9
+            """)
+
+        file_dict = {'zuul.yaml': in_repo_conf}
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+                                           files=file_dict)
+        B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            B.subject, A.data['id'])
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(B.reported, 1,
+                         "B should report failure")
+        self.assertIn('depends on a change to a config project',
+                      B.messages[0],
+                      "A should have a syntax error reported")
+
     def test_duplicate_node_error(self):
         in_repo_conf = textwrap.dedent(
             """
@@ -816,6 +943,150 @@
                       A.messages[0],
                       "A should have a syntax error reported")
 
+    def test_job_list_in_project_template_not_dict_error(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project-template:
+                name: some-jobs
+                check:
+                  jobs:
+                    - project-test1:
+                        - required-projects:
+                            org/project2
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('expected str for dictionary value',
+                      A.messages[0], "A should have a syntax error reported")
+
+    def test_job_list_in_project_not_dict_error(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project:
+                name: org/project1
+                check:
+                  jobs:
+                    - project-test1:
+                        - required-projects:
+                            org/project2
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('expected str for dictionary value',
+                      A.messages[0], "A should have a syntax error reported")
+
+    def test_project_template(self):
+        # Tests that a project template is not modified when used, and
+        # can therefore be used in subsequent reconfigurations.
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project-template:
+                name: some-jobs
+                tenant-one-gate:
+                  jobs:
+                    - project-test1:
+                        required-projects:
+                          - org/project1
+            - project:
+                name: org/project
+                templates:
+                  - some-jobs
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+        self.waitUntilSettled()
+        in_repo_conf = textwrap.dedent(
+            """
+            - project:
+                name: org/project1
+                templates:
+                  - some-jobs
+            """)
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B',
+                                           files=file_dict)
+        B.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.assertEqual(B.data['status'], 'MERGED')
+
+    def test_job_remove_add(self):
+        # Tests that a job can be removed from one repo and added in another.
+        # First, remove the current config for project1 since it
+        # references the job we want to remove.
+        file_dict = {'.zuul.yaml': None}
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
+                                           files=file_dict)
+        A.setMerged()
+        self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+        self.waitUntilSettled()
+        # Then propose a change to delete the job from one repo...
+        file_dict = {'.zuul.yaml': None}
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+                                           files=file_dict)
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        # ...and a second that depends on it that adds it to another repo.
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+
+            - project:
+                name: org/project1
+                check:
+                  jobs:
+                    - project-test1
+            """)
+        in_repo_playbook = textwrap.dedent(
+            """
+            - hosts: all
+              tasks: []
+            """)
+        file_dict = {'.zuul.yaml': in_repo_conf,
+                     'playbooks/project-test1.yaml': in_repo_playbook}
+        C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C',
+                                           files=file_dict,
+                                           parent='refs/changes/1/1/1')
+        C.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            C.subject, B.data['id'])
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
+        ], ordered=False)
+
     def test_multi_repo(self):
         downstream_repo_conf = textwrap.dedent(
             """
@@ -1215,6 +1486,40 @@
                         "The file %s should exist" % post_flag_path)
 
 
+class TestPostPlaybooks(AnsibleZuulTestCase):
+    tenant_config_file = 'config/post-playbook/main.yaml'
+
+    def test_post_playbook_abort(self):
+        # Test that when we abort a job in the post playbook, that we
+        # don't send back POST_FAILURE.
+        self.executor_server.verbose = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        while not len(self.builds):
+            time.sleep(0.1)
+        build = self.builds[0]
+
+        post_start = os.path.join(self.test_root, build.uuid +
+                                  '.post_start.flag')
+        start = time.time()
+        while time.time() < start + 90:
+            if os.path.exists(post_start):
+                break
+            time.sleep(0.1)
+        # The post playbook has started, abort the job
+        self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
+        self.waitUntilSettled()
+
+        build = self.getJobFromHistory('python27')
+        self.assertEqual('ABORTED', build.result)
+
+        post_end = os.path.join(self.test_root, build.uuid +
+                                '.post_end.flag')
+        self.assertTrue(os.path.exists(post_start))
+        self.assertFalse(os.path.exists(post_end))
+
+
 class TestBrokenConfig(ZuulTestCase):
     # Test that we get an appropriate syntax error if we start with a
     # broken config.
@@ -1664,7 +1969,8 @@
             return f.read()
 
     def test_job_output(self):
-        # Verify that command standard output appears in the job output
+        # Verify that command standard output appears in the job output,
+        # and that failures in the final playbook get logged.
 
         # This currently only verifies we receive output from
         # localhost.  Notably, it does not verify we receive output
@@ -1689,3 +1995,36 @@
         self.assertIn(token,
                       self._get_file(self.history[0],
                                      'work/logs/job-output.txt'))
+
+    def test_job_output_failure_log(self):
+        logger = logging.getLogger('zuul.AnsibleJob')
+        output = io.StringIO()
+        logger.addHandler(logging.StreamHandler(output))
+
+        # Verify that a failure in the last post playbook emits the contents
+        # of the json output to the log
+        self.executor_server.keep_jobdir = True
+        A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='job-output-failure',
+                 result='POST_FAILURE', changes='1,1'),
+        ], ordered=False)
+
+        token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+        j = json.loads(self._get_file(self.history[0],
+                                      'work/logs/job-output.json'))
+        self.assertEqual(token,
+                         j[0]['plays'][0]['tasks'][0]
+                         ['hosts']['localhost']['stdout'])
+
+        print(self._get_file(self.history[0],
+                             'work/logs/job-output.json'))
+        self.assertIn(token,
+                      self._get_file(self.history[0],
+                                     'work/logs/job-output.txt'))
+
+        log_output = output.getvalue()
+        self.assertIn('Final playbook failed', log_output)
+        self.assertIn('Failure test', log_output)
diff --git a/tools/run-migration.sh b/tools/run-migration.sh
index be297f4..618fc56 100755
--- a/tools/run-migration.sh
+++ b/tools/run-migration.sh
@@ -47,12 +47,12 @@
 
 BASE_DIR=$(cd $(dirname $0)/../..; pwd)
 cd $BASE_DIR/project-config
-if [[ $FINAL ]] ; then
+if [[ $FINAL = 1 ]] ; then
     git reset --hard
 fi
 python3 $BASE_DIR/zuul/zuul/cmd/migrate.py  --mapping=zuul/mapping.yaml \
     zuul/layout.yaml jenkins/jobs nodepool/nodepool.yaml . $VERBOSE
-if [[ $FINAL ]] ; then
+if [[ $FINAL = 1 ]] ; then
     find ../openstack-zuul-jobs/playbooks/legacy -maxdepth 1 -mindepth 1 \
         -type d  | xargs rm -rf
     mv zuul.d/zuul-legacy-* ../openstack-zuul-jobs/zuul.d/
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 8b854c7..d258354 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -20,14 +20,15 @@
 
 parser = argparse.ArgumentParser()
 parser.add_argument('url', help='The URL of the running Zuul instance')
-parser.add_argument('pipeline_name', help='The name of the Zuul pipeline')
+parser.add_argument('tenant', help='The Zuul tenant')
+parser.add_argument('pipeline', help='The name of the Zuul pipeline')
 options = parser.parse_args()
 
 data = urllib2.urlopen('%s/status.json' % options.url).read()
 data = json.loads(data)
 
 for pipeline in data['pipelines']:
-    if pipeline['name'] != options.pipeline_name:
+    if pipeline['name'] != options.pipeline:
         continue
     for queue in pipeline['change_queues']:
         for head in queue['heads']:
@@ -36,9 +37,10 @@
                     continue
                 cid, cps = change['id'].split(',')
                 print(
-                    "zuul enqueue --trigger gerrit --pipeline %s "
-                    "--project %s --change %s,%s" % (
-                        options.pipeline_name,
+                    "zuul enqueue --tenant %s --trigger gerrit "
+                    "--pipeline %s --project %s --change %s,%s" % (
+                        options.tenant,
+                        options.pipeline,
                         change['project'],
                         cid, cps)
                 )
diff --git a/tox.ini b/tox.ini
index cc5ea58..7e84677 100644
--- a/tox.ini
+++ b/tox.ini
@@ -5,10 +5,7 @@
 
 [testenv]
 basepython = python3
-# Set STATSD env variables so that statsd code paths are tested.
-setenv = STATSD_HOST=127.0.0.1
-         STATSD_PORT=8125
-         VIRTUAL_ENV={envdir}
+setenv = VIRTUAL_ENV={envdir}
          OS_TEST_TIMEOUT=120
 passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE OS_LOG_DEFAULTS
 usedevelop = True
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 0a266df..8ba3b86 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -168,32 +168,8 @@
                 host=host.name,
                 filename=included_file._filename))
 
-    def _emit_playbook_banner(self):
-        # Get the hostvars from just one host - the vars we're looking for will
-        # be identical on all of them
-        hostvars = next(iter(self._play._variable_manager._hostvars.values()))
-        self._playbook_name = None
-
-        phase = hostvars.get('zuul_execution_phase', '')
-        playbook = hostvars.get('zuul_execution_canonical_name_and_path')
-        trusted = hostvars.get('zuul_execution_trusted')
-        trusted = 'trusted' if trusted == "True" else 'untrusted'
-        branch = hostvars.get('zuul_execution_branch')
-
-        if phase and phase != 'run':
-            phase = '{phase}-run'.format(phase=phase)
-        phase = phase.upper()
-
-        self._log("{phase} [{trusted} : {playbook}@{branch}]".format(
-            trusted=trusted, phase=phase, playbook=playbook, branch=branch))
-
     def v2_playbook_on_play_start(self, play):
         self._play = play
-
-        # We can't fill in this information until the first play
-        if self._playbook_name:
-            self._emit_playbook_banner()
-
         # Log an extra blank line to get space before each play
         self._log("")
 
diff --git a/zuul/ansible/filter/zuul_filters.py b/zuul/ansible/filter/zuul_filters.py
index 17ef2bb..fa21f6b 100644
--- a/zuul/ansible/filter/zuul_filters.py
+++ b/zuul/ansible/filter/zuul_filters.py
@@ -14,10 +14,19 @@
 
 
 def zuul_legacy_vars(zuul):
-    # omitted:
-    # ZUUL_URL
-    # ZUUL_REF
+    # intentionally omitted:
+    # BASE_LOG_PATH
+    # JOB_TAGS
+    # LOG_PATH
     # ZUUL_COMMIT
+    # ZUUL_REF
+    # ZUUL_URL
+    #
+    # newly added to all builds:
+    # ZUUL_SHORT_PROJECT_NAME
+    #
+    # existing in most builds but newly added for periodic:
+    # ZUUL_BRANCH
 
     short_name = zuul['project']['name'].split('/')[-1]
     params = dict(ZUUL_UUID=zuul['build'],
@@ -26,6 +35,8 @@
                   ZUUL_PIPELINE=zuul['pipeline'],
                   ZUUL_VOTING=zuul['voting'],
                   WORKSPACE='/home/zuul/workspace')
+    if 'timeout' in zuul and zuul['timeout'] is not None:
+        params['BUILD_TIMEOUT'] = str(int(zuul['timeout']) * 1000)
     if 'branch' in zuul:
         params['ZUUL_BRANCH'] = zuul['branch']
 
@@ -34,7 +45,7 @@
             ['%s:%s:refs/changes/%s/%s/%s' % (
                 i['project']['name'],
                 i['branch'],
-                str(i['change'])[:-2:],
+                str(i['change'])[-2:],
                 i['change'],
                 i['patchset'])
              for i in zuul['items']])
diff --git a/zuul/ansible/paths.py b/zuul/ansible/paths.py
index 04daef4..05f9fdc 100644
--- a/zuul/ansible/paths.py
+++ b/zuul/ansible/paths.py
@@ -24,7 +24,7 @@
 
 def _is_safe_path(path):
     full_path = os.path.realpath(os.path.abspath(os.path.expanduser(path)))
-    if not full_path.startswith(os.path.abspath(os.path.curdir)):
+    if not full_path.startswith(os.path.abspath(os.path.expanduser('~'))):
         return False
     return True
 
diff --git a/zuul/change_matcher.py b/zuul/change_matcher.py
index baea217..7f6673d 100644
--- a/zuul/change_matcher.py
+++ b/zuul/change_matcher.py
@@ -60,11 +60,13 @@
 class BranchMatcher(AbstractChangeMatcher):
 
     def matches(self, change):
-        return (
-            (hasattr(change, 'branch') and self.regex.match(change.branch)) or
-            (hasattr(change, 'ref') and
-             change.ref is not None and self.regex.match(change.ref))
-        )
+        if hasattr(change, 'branch'):
+            if self.regex.match(change.branch):
+                return True
+            return False
+        if self.regex.match(change.ref):
+            return True
+        return False
 
 
 class FileMatcher(AbstractChangeMatcher):
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index a5db9a6..86f7f12 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -23,8 +23,10 @@
 import signal
 import sys
 import traceback
+import threading
 
 yappi = extras.try_import('yappi')
+objgraph = extras.try_import('objgraph')
 
 from zuul.ansible import logconfig
 import zuul.lib.connections
@@ -37,23 +39,50 @@
 
 def stack_dump_handler(signum, frame):
     signal.signal(signal.SIGUSR2, signal.SIG_IGN)
-    log_str = ""
-    for thread_id, stack_frame in sys._current_frames().items():
-        log_str += "Thread: %s\n" % thread_id
-        log_str += "".join(traceback.format_stack(stack_frame))
     log = logging.getLogger("zuul.stack_dump")
-    log.debug(log_str)
-    if yappi:
-        if not yappi.is_running():
-            yappi.start()
-        else:
-            yappi.stop()
-            yappi_out = io.BytesIO()
-            yappi.get_func_stats().print_all(out=yappi_out)
-            yappi.get_thread_stats().print_all(out=yappi_out)
-            log.debug(yappi_out.getvalue())
-            yappi_out.close()
-            yappi.clear_stats()
+    log.debug("Beginning debug handler")
+    try:
+        threads = {}
+        for t in threading.enumerate():
+            threads[t.ident] = t
+        log_str = ""
+        for thread_id, stack_frame in sys._current_frames().items():
+            thread = threads.get(thread_id)
+            if thread:
+                thread_name = thread.name
+            else:
+                thread_name = thread.ident
+            log_str += "Thread: %s %s\n" % (thread_id, thread_name)
+            log_str += "".join(traceback.format_stack(stack_frame))
+        log.debug(log_str)
+    except Exception:
+        log.exception("Thread dump error:")
+    try:
+        if yappi:
+            if not yappi.is_running():
+                log.debug("Starting Yappi")
+                yappi.start()
+            else:
+                log.debug("Stopping Yappi")
+                yappi.stop()
+                yappi_out = io.StringIO()
+                yappi.get_func_stats().print_all(out=yappi_out)
+                yappi.get_thread_stats().print_all(out=yappi_out)
+                log.debug(yappi_out.getvalue())
+                yappi_out.close()
+                yappi.clear_stats()
+    except Exception:
+        log.exception("Yappi error:")
+    try:
+        if objgraph:
+            log.debug("Most common types:")
+            objgraph_out = io.StringIO()
+            objgraph.show_growth(limit=100, file=objgraph_out)
+            log.debug(objgraph_out.getvalue())
+            objgraph_out.close()
+    except Exception:
+        log.exception("Objgraph error:")
+    log.debug("End debug handler")
     signal.signal(signal.SIGUSR2, stack_dump_handler)
 
 
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 177283e..c9e399a 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -61,6 +61,10 @@
                                   required=False, type=int, default=1)
         cmd_autohold.set_defaults(func=self.autohold)
 
+        cmd_autohold_list = subparsers.add_parser(
+            'autohold-list', help='list autohold requests')
+        cmd_autohold_list.set_defaults(func=self.autohold_list)
+
         cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
         cmd_enqueue.add_argument('--tenant', help='tenant name',
                                  required=True)
@@ -162,6 +166,27 @@
                             count=self.args.count)
         return r
 
+    def autohold_list(self):
+        client = zuul.rpcclient.RPCClient(
+            self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
+        autohold_requests = client.autohold_list()
+
+        if len(autohold_requests.keys()) == 0:
+            print("No autohold requests found")
+            return True
+
+        table = prettytable.PrettyTable(
+            field_names=['Tenant', 'Project', 'Job', 'Count', 'Reason'])
+
+        for key, value in autohold_requests.items():
+            # The key comes to us as a CSV string because json doesn't like
+            # non-str keys.
+            tenant_name, project_name, job_name = key.split(',')
+            count, reason = value
+            table.add_row([tenant_name, project_name, job_name, count, reason])
+        print(table)
+        return True
+
     def enqueue(self):
         client = zuul.rpcclient.RPCClient(
             self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py
index 63c621d..70c80c5 100755
--- a/zuul/cmd/executor.py
+++ b/zuul/cmd/executor.py
@@ -22,6 +22,7 @@
 # instead it depends on lockfile-0.9.1 which uses pidfile.
 pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
 
+import grp
 import logging
 import os
 import pwd
@@ -101,7 +102,10 @@
         if os.getuid() != 0:
             return
         pw = pwd.getpwnam(self.user)
-        os.setgroups([])
+        # get a list of supplementary groups for the target user, and make sure
+        # we set them when dropping privileges.
+        groups = [g.gr_gid for g in grp.getgrall() if self.user in g.gr_mem]
+        os.setgroups(groups)
         os.setgid(pw.pw_gid)
         os.setuid(pw.pw_uid)
         os.umask(0o022)
diff --git a/zuul/cmd/migrate.py b/zuul/cmd/migrate.py
index 3ab3902..efb2796 100644
--- a/zuul/cmd/migrate.py
+++ b/zuul/cmd/migrate.py
@@ -42,6 +42,9 @@
 import jenkins_jobs.parser
 import yaml
 
+JOB_MATCHERS = {}  # type: Dict[str, Dict[str, Dict]]
+TEMPLATES_TO_EXPAND = {}  # type: Dict[str, List]
+JOBS_FOR_EXPAND = collections.defaultdict(dict)  # type: ignore
 JOBS_BY_ORIG_TEMPLATE = {}  # type: ignore
 SUFFIXES = []  # type: ignore
 ENVIRONMENT = '{{ zuul | zuul_legacy_vars }}'
@@ -186,6 +189,37 @@
     return
 
 
+def normalize_project_expansions():
+    remove_from_job_matchers = []
+    template = None
+    # First find the matchers that are the same for all jobs
+    for job_name, project in copy.deepcopy(JOBS_FOR_EXPAND).items():
+        JOB_MATCHERS[job_name] = None
+        for project_name, expansion in project.items():
+            template = expansion['template']
+            if not JOB_MATCHERS[job_name]:
+                JOB_MATCHERS[job_name] = copy.deepcopy(expansion['info'])
+            else:
+                if JOB_MATCHERS[job_name] != expansion['info']:
+                    # We have different expansions for this job, it can't be
+                    # done at the job level
+                    remove_from_job_matchers.append(job_name)
+
+    for job_name in remove_from_job_matchers:
+        JOB_MATCHERS.pop(job_name, None)
+
+    # Second, find out which projects need to expand a given template
+    for job_name, project in copy.deepcopy(JOBS_FOR_EXPAND).items():
+        # There is a job-level expansion for this one
+        if job_name in JOB_MATCHERS.keys():
+            continue
+        for project_name, expansion in project.items():
+            TEMPLATES_TO_EXPAND[project_name] = []
+            if expansion['info']:
+                # There is an expansion for this project
+                TEMPLATES_TO_EXPAND[project_name].append(expansion['template'])
+
+
 # from :
 # http://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data  flake8: noqa
 def should_use_block(value):
@@ -885,7 +919,7 @@
         elif 'openstack/puppet-openstack-integration' in expanded_projects:
             output['parent'] = 'legacy-puppet-openstack-integration'
         elif has_artifacts:
-            output['parent'] = 'publish-openstack-artifacts'
+            output['parent'] = 'legacy-publish-openstack-artifacts'
         elif has_draft:
             output['success-url'] = 'html/'
         output['run'] = os.path.join(self.job_path, 'run')
@@ -898,8 +932,6 @@
         timeout = self.getTimeout()
         if timeout:
             output['timeout'] = timeout
-            output.setdefault('vars', {})
-            output['vars']['BUILD_TIMEOUT'] = str(timeout * 1000)
 
         if self.nodes:
             if len(self.nodes) == 1:
@@ -910,6 +942,14 @@
         if expanded_projects:
             output['required-projects'] = sorted(list(set(expanded_projects)))
 
+        if self.name in JOB_MATCHERS:
+            for k, v in JOB_MATCHERS[self.name].items():
+                if k in output:
+                    self.log.error(
+                        'Job %s has attributes directly and from matchers',
+                        self.name)
+                output[k] = v
+
         return output
 
     def toPipelineDict(self):
@@ -1345,7 +1385,7 @@
         for pipeline, value in template.items():
             if pipeline == 'name':
                 continue
-            if pipeline not in project:
+            if pipeline not in project or 'jobs' not in project[pipeline]:
                 project[pipeline] = dict(jobs=[])
             project[pipeline]['jobs'].extend(value['jobs'])
 
@@ -1355,7 +1395,7 @@
                 return job.orig
         return None
 
-    def applyProjectMatchers(self, matchers, project):
+    def applyProjectMatchers(self, matchers, project, final=False):
         '''
         Apply per-project job matchers to the given project.
 
@@ -1373,7 +1413,8 @@
                         self.log.debug(
                             "Applied irrelevant-files to job %s in project %s",
                             job, project['name'])
-                        job = {job: {'irrelevant-files': list(set(files))}}
+                        job = {job: {'irrelevant-files':
+                                     sorted(list(set(files)))}}
                 elif isinstance(job, dict):
                     job = job.copy()
                     job_name = get_single_key(job)
@@ -1387,8 +1428,8 @@
                         if 'irrelevant-files' not in extras:
                             extras['irrelevant-files'] = []
                         extras['irrelevant-files'].extend(files)
-                        extras['irrelevant-files'] = list(
-                            set(extras['irrelevant-files']))
+                        extras['irrelevant-files'] = sorted(list(
+                            set(extras['irrelevant-files'])))
                     job[job_name] = extras
                 new_jobs.append(job)
             return new_jobs
@@ -1398,17 +1439,61 @@
                 if k in ('templates', 'name'):
                     continue
                 project[k]['jobs'] = processPipeline(
-                    project[k]['jobs'], job_name_regex, files)
+                    project[k].get('jobs', []), job_name_regex, files)
 
-        for matcher in matchers:
-            # find the project-specific section
-            for skipper in matcher.get('skip-if', []):
-                if skipper.get('project'):
-                    if re.search(skipper['project'], project['name']):
-                        if 'all-files-match-any' in skipper:
-                            applyIrrelevantFiles(
-                                matcher['name'],
-                                skipper['all-files-match-any'])
+        if matchers:
+            for matcher in matchers:
+                # find the project-specific section
+                for skipper in matcher.get('skip-if', []):
+                    if skipper.get('project'):
+                        if re.search(skipper['project'], project['name']):
+                            if 'all-files-match-any' in skipper:
+                                applyIrrelevantFiles(
+                                    matcher['name'],
+                                    skipper['all-files-match-any'])
+
+        if not final:
+            return
+
+        for k, v in project.items():
+            if k in ('templates', 'name'):
+                continue
+            jobs = []
+            for job in project[k].get('jobs', []):
+                if isinstance(job, dict):
+                    job_name = get_single_key(job)
+                else:
+                    job_name = job
+                if job_name in JOB_MATCHERS:
+                    jobs.append(job)
+                    continue
+                orig_name = self.getOldJobName(job_name)
+                if not orig_name:
+                    jobs.append(job)
+                    continue
+                orig_name = orig_name.format(
+                    name=project['name'].split('/')[1])
+                info = {}
+                for layout_job in self.mapping.layout.get('jobs', []):
+                    if 'parameter-function' in layout_job:
+                        continue
+                    if 'skip-if' in layout_job:
+                        continue
+                    if re.search(layout_job['name'], orig_name):
+                        if not layout_job.get('voting', True):
+                            info['voting'] = False
+                        if layout_job.get('branch'):
+                            info['branches'] = layout_job['branch']
+                        if layout_job.get('files'):
+                            info['files'] = layout_job['files']
+                        if not isinstance(job, dict):
+                            job = {job: info}
+                        else:
+                            job[job_name].update(info)
+
+                jobs.append(job)
+            if jobs:
+                project[k]['jobs'] = jobs
 
     def writeProject(self, project):
         '''
@@ -1423,12 +1508,7 @@
         if 'name' in project:
             new_project['name'] = project['name']
 
-        job_matchers = self.scanForProjectMatchers(project['name'])
-        if job_matchers:
-            exp_template_names = self.findReferencedTemplateNames(
-                job_matchers, project['name'])
-        else:
-            exp_template_names = []
+        exp_template_names = TEMPLATES_TO_EXPAND.get(project['name'], [])
 
         templates_to_expand = []
         if 'template' in project:
@@ -1447,6 +1527,51 @@
                 new_project[key] = collections.OrderedDict()
                 if key == 'gate':
                     for queue in self.change_queues:
+                        if (project['name'] not in queue.getProjects() or
+                                len(queue.getProjects()) == 1):
+                            continue
+                        new_project[key]['queue'] = queue.name
+                tmp = [job for job in self.makeNewJobs(value)]
+                # Don't insert into self.job_objects - that was done
+                # in the speculative pass
+                jobs = [job.toPipelineDict() for job in tmp]
+                if jobs:
+                    new_project[key]['jobs'] = jobs
+                if not new_project[key]:
+                    del new_project[key]
+
+        for name in templates_to_expand:
+            self.expandTemplateIntoProject(name, new_project)
+
+        job_matchers = self.scanForProjectMatchers(project['name'])
+
+        # Need a deep copy after expansion, else our templates end up
+        # also getting this change.
+        new_project = copy.deepcopy(new_project)
+        self.applyProjectMatchers(job_matchers, new_project, final=True)
+
+        return new_project
+
+    def checkSpeculativeProject(self, project):
+        '''
+        Create a new v3 project definition expanding all templates.
+        '''
+        new_project = collections.OrderedDict()
+        if 'name' in project:
+            new_project['name'] = project['name']
+
+        templates_to_expand = []
+        for template in project.get('template', []):
+            templates_to_expand.append(template['name'])
+
+        # We have to do this section to expand self.job_objects
+        for key, value in project.items():
+            if key in ('name', 'template'):
+                continue
+            else:
+                new_project[key] = collections.OrderedDict()
+                if key == 'gate':
+                    for queue in self.change_queues:
                         if project['name'] not in queue.getProjects():
                             continue
                         if len(queue.getProjects()) == 1:
@@ -1454,18 +1579,60 @@
                         new_project[key]['queue'] = queue.name
                 tmp = [job for job in self.makeNewJobs(value)]
                 self.job_objects.extend(tmp)
-                jobs = [job.toPipelineDict() for job in tmp]
-                new_project[key]['jobs'] = jobs
 
         for name in templates_to_expand:
-            self.expandTemplateIntoProject(name, new_project)
 
-        # Need a deep copy after expansion, else our templates end up
-        # also getting this change.
-        new_project = copy.deepcopy(new_project)
-        self.applyProjectMatchers(job_matchers, new_project)
+            expand_project = copy.deepcopy(new_project)
+            self.expandTemplateIntoProject(name, expand_project)
 
-        return new_project
+            # Need a deep copy after expansion, else our templates end up
+            # also getting this change.
+            expand_project = copy.deepcopy(expand_project)
+            job_matchers = self.scanForProjectMatchers(project['name'])
+            self.applyProjectMatchers(job_matchers, expand_project)
+
+            # We should now have a project-pipeline with only the
+            # jobs expanded from this one template
+            for project_part in expand_project.values():
+                # The pipelines are dicts - we only want pipelines
+                if isinstance(project_part, dict):
+                    if 'jobs' not in project_part:
+                        continue
+                    self.processProjectTemplateExpansion(
+                        project_part, project, name)
+
+    def processProjectTemplateExpansion(self, project_part, project, template):
+        # project_part should be {'jobs': []}
+        job_list = project_part['jobs']
+        for new_job in job_list:
+            if isinstance(new_job, dict):
+                new_job_name = get_single_key(new_job)
+                info = new_job[new_job_name]
+            else:
+                new_job_name = new_job
+                info = None
+            orig_name = self.getOldJobName(new_job_name)
+            if not orig_name:
+                self.log.error("Job without old name: %s", new_job_name)
+                continue
+            orig_name = orig_name.format(name=project['name'].split('/')[1])
+
+            for layout_job in self.mapping.layout.get('jobs', []):
+                if 'parameter-function' in layout_job:
+                    continue
+                if re.search(layout_job['name'], orig_name):
+                    if not info:
+                        info = {}
+                    if not layout_job.get('voting', True):
+                        info['voting'] = False
+                    if layout_job.get('branch'):
+                        info['branches'] = layout_job['branch']
+                    if layout_job.get('files'):
+                        info['files'] = layout_job['files']
+
+            if info:
+                expansion = dict(info=info, template=template)
+                JOBS_FOR_EXPAND[new_job_name][project['name']] = expansion
 
     def writeJobs(self):
         output_dir = self.setupDir()
@@ -1487,13 +1654,17 @@
             template_config,
             key=lambda template: template['project-template']['name'])
 
+        for project in self.layout.get('projects', []):
+            self.checkSpeculativeProject(project)
+        normalize_project_expansions()
+
         project_names = []
         for project in self.layout.get('projects', []):
             project_names.append(project['name'])
             project_dict = self.writeProject(project)
             merge_project_dict(
                 project_dicts, project['name'],
-                self.writeProject(project))
+                project_dict)
         project_config = project_dicts_to_list(project_dicts)
 
         seen_jobs = []
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index a9923c6..d920d8e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -29,6 +29,7 @@
 
 import zuul.cmd
 from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd_config
 
 # No zuul imports here because they pull in paramiko which must not be
 # imported until after the daemonization.
@@ -97,8 +98,14 @@
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
             import zuul.lib.gearserver
-            statsd_host = os.environ.get('STATSD_HOST')
-            statsd_port = int(os.environ.get('STATSD_PORT', 8125))
+
+            (statsd_host, statsd_port, statsd_prefix) = get_statsd_config(
+                self.config)
+            if statsd_prefix:
+                statsd_prefix += '.zuul.geard'
+            else:
+                statsd_prefix = 'zuul.geard'
+
             host = get_default(self.config, 'gearman_server', 'listen_address')
             port = int(get_default(self.config, 'gearman_server', 'port',
                                    4730))
@@ -112,7 +119,7 @@
                                            host=host,
                                            statsd_host=statsd_host,
                                            statsd_port=statsd_port,
-                                           statsd_prefix='zuul.geard')
+                                           statsd_prefix=statsd_prefix)
 
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
@@ -154,8 +161,10 @@
         zookeeper = zuul.zk.ZooKeeper()
         zookeeper_hosts = get_default(self.config, 'zookeeper',
                                       'hosts', '127.0.0.1:2181')
+        zookeeper_timeout = float(get_default(self.config, 'zookeeper',
+                                              'session_timeout', 10.0))
 
-        zookeeper.connect(zookeeper_hosts)
+        zookeeper.connect(zookeeper_hosts, timeout=zookeeper_timeout)
 
         cache_expiry = get_default(self.config, 'webapp', 'status_expiry', 1)
         listen_address = get_default(self.config, 'webapp', 'listen_address',
diff --git a/zuul/configloader.py b/zuul/configloader.py
index afdf329..426842b 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -11,6 +11,7 @@
 # under the License.
 
 import base64
+import collections
 from contextlib import contextmanager
 import copy
 import os
@@ -383,57 +384,58 @@
 class JobParser(object):
     ANSIBLE_ROLE_RE = re.compile(r'^(ansible[-_.+]*)*(role[-_.+]*)*')
 
-    @staticmethod
-    def getSchema():
-        zuul_role = {vs.Required('zuul'): str,
-                     'name': str}
+    zuul_role = {vs.Required('zuul'): str,
+                 'name': str}
 
-        galaxy_role = {vs.Required('galaxy'): str,
-                       'name': str}
+    galaxy_role = {vs.Required('galaxy'): str,
+                   'name': str}
 
-        role = vs.Any(zuul_role, galaxy_role)
+    role = vs.Any(zuul_role, galaxy_role)
 
-        job_project = {vs.Required('name'): str,
-                       'override-branch': str}
+    job_project = {vs.Required('name'): str,
+                   'override-branch': str}
 
-        secret = {vs.Required('name'): str,
-                  vs.Required('secret'): str}
+    secret = {vs.Required('name'): str,
+              vs.Required('secret'): str}
 
-        job = {vs.Required('name'): str,
-               'parent': vs.Any(str, None),
-               'final': bool,
-               'failure-message': str,
-               'success-message': str,
-               'failure-url': str,
-               'success-url': str,
-               'hold-following-changes': bool,
-               'voting': bool,
-               'semaphore': str,
-               'tags': to_list(str),
-               'branches': to_list(str),
-               'files': to_list(str),
-               'secrets': to_list(vs.Any(secret, str)),
-               'irrelevant-files': to_list(str),
-               # validation happens in NodeSetParser
-               'nodeset': vs.Any(dict, str),
-               'timeout': int,
-               'attempts': int,
-               'pre-run': to_list(str),
-               'post-run': to_list(str),
-               'run': str,
-               '_source_context': model.SourceContext,
-               '_start_mark': ZuulMark,
-               'roles': to_list(role),
-               'required-projects': to_list(vs.Any(job_project, str)),
-               'vars': dict,
-               'dependencies': to_list(str),
-               'allowed-projects': to_list(str),
-               'override-branch': str,
-               'description': str,
-               'post-review': bool
-               }
+    # Attributes of a job that can also be used in Project and ProjectTemplate
+    job_attributes = {'parent': vs.Any(str, None),
+                      'final': bool,
+                      'failure-message': str,
+                      'success-message': str,
+                      'failure-url': str,
+                      'success-url': str,
+                      'hold-following-changes': bool,
+                      'voting': bool,
+                      'semaphore': str,
+                      'tags': to_list(str),
+                      'branches': to_list(str),
+                      'files': to_list(str),
+                      'secrets': to_list(vs.Any(secret, str)),
+                      'irrelevant-files': to_list(str),
+                      # validation happens in NodeSetParser
+                      'nodeset': vs.Any(dict, str),
+                      'timeout': int,
+                      'attempts': int,
+                      'pre-run': to_list(str),
+                      'post-run': to_list(str),
+                      'run': str,
+                      '_source_context': model.SourceContext,
+                      '_start_mark': ZuulMark,
+                      'roles': to_list(role),
+                      'required-projects': to_list(vs.Any(job_project, str)),
+                      'vars': dict,
+                      'dependencies': to_list(str),
+                      'allowed-projects': to_list(str),
+                      'override-branch': str,
+                      'description': str,
+                      'post-review': bool}
 
-        return vs.Schema(job)
+    job_name = {vs.Required('name'): str}
+
+    job = dict(collections.ChainMap(job_name, job_attributes))
+
+    schema = vs.Schema(job)
 
     simple_attributes = [
         'final',
@@ -456,58 +458,69 @@
         # the reference definition of this job, and this is a project
         # repo, add an implicit branch matcher for this branch
         # (assuming there are no explicit branch matchers).  But only
-        # for top-level job definitions and variants.
-        # Project-pipeline job variants should more closely attach to
-        # their branch if they appear in a project-repo.
+        # for top-level job definitions and variants.  Never for
+        # project-templates.  They, and in-project project-pipeline
+        # job variants, should more closely attach to their branch if
+        # they appear in a project-repo.  That's handled in the
+        # ProjectParser.
         if (reference and
             reference.source_context and
             reference.source_context.branch != job.source_context.branch):
-            same_context = False
+            same_branch = False
         else:
-            same_context = True
+            same_branch = True
 
         if (job.source_context and
             (not job.source_context.trusted) and
-            ((not same_context) or project_pipeline)):
+            (not project_pipeline) and
+            (not same_branch)):
             return [job.source_context.branch]
         return None
 
     @staticmethod
-    def fromYaml(tenant, layout, conf, project_pipeline=False):
-        with configuration_exceptions('job', conf):
-            JobParser.getSchema()(conf)
+    def fromYaml(tenant, layout, conf, project_pipeline=False,
+                 name=None, validate=True):
+        if validate:
+            with configuration_exceptions('job', conf):
+                JobParser.schema(conf)
+
+        if name is None:
+            name = conf['name']
 
         # NB: The default detection system in the Job class requires
         # that we always assign values directly rather than modifying
         # them (e.g., "job.run = ..." rather than
         # "job.run.append(...)").
 
-        reference = layout.jobs.get(conf['name'], [None])[0]
+        reference = layout.jobs.get(name, [None])[0]
 
-        job = model.Job(conf['name'])
+        job = model.Job(name)
         job.source_context = conf.get('_source_context')
+        job.source_line = conf.get('_start_mark').line + 1
 
-        is_variant = layout.hasJob(conf['name'])
-        if 'parent' in conf:
-            if conf['parent'] is not None:
-                # Parent job is explicitly specified, so inherit from it.
-                parent = layout.getJob(conf['parent'])
-                job.inheritFrom(parent)
+        is_variant = layout.hasJob(name)
+        if not is_variant:
+            if 'parent' in conf:
+                if conf['parent'] is not None:
+                    # Parent job is explicitly specified, so inherit from it.
+                    parent = layout.getJob(conf['parent'])
+                    job.inheritFrom(parent)
+                else:
+                    # Parent is explicitly set as None, so user intends
+                    # this to be a base job.  That's only okay if we're in
+                    # a config project.
+                    if not conf['_source_context'].trusted:
+                        raise Exception(
+                            "Base jobs must be defined in config projects")
             else:
-                # Parent is explicitly set as None, so user intends
-                # this to be a base job.  That's only okay if we're in
-                # a config project.
-                if not conf['_source_context'].trusted:
-                    raise Exception(
-                        "Base jobs must be defined in config projects")
-        else:
-            # Parent is not explicitly set, so inherit from the
-            # default -- but only if this is the primary definition
-            # for the job (ie, not a variant -- variants don't need to
-            # have a parent as long as the main job does).
-            if not is_variant:
                 parent = layout.getJob(tenant.default_base_job)
                 job.inheritFrom(parent)
+        else:
+            if 'parent' in conf:
+                # TODO(jeblair): warn the user that we're ignoring the
+                # parent setting on this variant job definition.
+                pass
+
         # Secrets are part of the playbook context so we must establish
         # them earlier than playbooks.
         secrets = []
@@ -668,10 +681,7 @@
         if (not branches) and ('branches' in conf):
             branches = as_list(conf['branches'])
         if branches:
-            matchers = []
-            for branch in branches:
-                matchers.append(change_matcher.BranchMatcher(branch))
-            job.branch_matcher = change_matcher.MatchAny(matchers)
+            job.setBranchMatcher(branches)
         if 'files' in conf:
             matchers = []
             for fn in as_list(conf['files']):
@@ -709,10 +719,13 @@
 
 
 class ProjectTemplateParser(object):
-    log = logging.getLogger("zuul.ProjectTemplateParser")
+    def __init__(self, tenant, layout):
+        self.log = logging.getLogger("zuul.ProjectTemplateParser")
+        self.tenant = tenant
+        self.layout = layout
+        self.schema = self.getSchema()
 
-    @staticmethod
-    def getSchema(layout):
+    def getSchema(self):
         project_template = {
             vs.Required('name'): str,
             'description': str,
@@ -723,47 +736,41 @@
             '_start_mark': ZuulMark,
         }
 
-        for p in layout.pipelines.values():
-            project_template[p.name] = {'queue': str,
-                                        'jobs': [vs.Any(str, dict)]}
+        job = {str: vs.Any(str, JobParser.job_attributes)}
+        job_list = [vs.Any(str, job)]
+        pipeline_contents = {'queue': str, 'jobs': job_list}
+
+        for p in self.layout.pipelines.values():
+            project_template[p.name] = pipeline_contents
         return vs.Schema(project_template)
 
-    @staticmethod
-    def fromYaml(tenant, layout, conf):
-        with configuration_exceptions('project or project-template', conf):
-            ProjectTemplateParser.getSchema(layout)(conf)
-        # Make a copy since we modify this later via pop
-        conf = copy.deepcopy(conf)
+    def fromYaml(self, conf, validate=True):
+        if validate:
+            with configuration_exceptions('project-template', conf):
+                self.schema(conf)
         project_template = model.ProjectConfig(conf['name'])
         source_context = conf['_source_context']
         start_mark = conf['_start_mark']
-        for pipeline in layout.pipelines.values():
+        for pipeline in self.layout.pipelines.values():
             conf_pipeline = conf.get(pipeline.name)
             if not conf_pipeline:
                 continue
             project_pipeline = model.ProjectPipelineConfig()
             project_template.pipelines[pipeline.name] = project_pipeline
             project_pipeline.queue_name = conf_pipeline.get('queue')
-            ProjectTemplateParser._parseJobList(
-                tenant, layout, conf_pipeline.get('jobs', []),
+            self.parseJobList(
+                conf_pipeline.get('jobs', []),
                 source_context, start_mark, project_pipeline.job_list)
         return project_template
 
-    @staticmethod
-    def _parseJobList(tenant, layout, conf, source_context,
-                      start_mark, job_list):
+    def parseJobList(self, conf, source_context, start_mark, job_list):
         for conf_job in conf:
             if isinstance(conf_job, str):
-                attrs = dict(name=conf_job)
+                jobname = conf_job
+                attrs = {}
             elif isinstance(conf_job, dict):
                 # A dictionary in a job tree may override params
                 jobname, attrs = list(conf_job.items())[0]
-                if attrs:
-                    # We are overriding params, so make a new job def
-                    attrs['name'] = jobname
-                else:
-                    # Not overriding, so add a blank job
-                    attrs = dict(name=jobname)
             else:
                 raise Exception("Job must be a string or dictionary")
             attrs['_source_context'] = source_context
@@ -772,17 +779,22 @@
             # validate that the job is existing
             with configuration_exceptions('project or project-template',
                                           attrs):
-                layout.getJob(attrs['name'])
+                self.layout.getJob(jobname)
 
-            job_list.addJob(JobParser.fromYaml(tenant, layout, attrs,
-                                               project_pipeline=True))
+            job_list.addJob(JobParser.fromYaml(self.tenant, self.layout,
+                                               attrs, project_pipeline=True,
+                                               name=jobname, validate=False))
 
 
 class ProjectParser(object):
-    log = logging.getLogger("zuul.ProjectParser")
+    def __init__(self, tenant, layout, project_template_parser):
+        self.log = logging.getLogger("zuul.ProjectParser")
+        self.tenant = tenant
+        self.layout = layout
+        self.project_template_parser = project_template_parser
+        self.schema = self.getSchema()
 
-    @staticmethod
-    def getSchema(layout):
+    def getSchema(self):
         project = {
             vs.Required('name'): str,
             'description': str,
@@ -794,46 +806,52 @@
             '_start_mark': ZuulMark,
         }
 
-        for p in layout.pipelines.values():
-            project[p.name] = {'queue': str,
-                               'jobs': [vs.Any(str, dict)]}
+        job = {str: vs.Any(str, JobParser.job_attributes)}
+        job_list = [vs.Any(str, job)]
+        pipeline_contents = {'queue': str, 'jobs': job_list}
+
+        for p in self.layout.pipelines.values():
+            project[p.name] = pipeline_contents
         return vs.Schema(project)
 
-    @staticmethod
-    def fromYaml(tenant, layout, conf_list):
+    def fromYaml(self, conf_list):
         for conf in conf_list:
             with configuration_exceptions('project', conf):
-                ProjectParser.getSchema(layout)(conf)
+                self.schema(conf)
 
         with configuration_exceptions('project', conf_list[0]):
             project_name = conf_list[0]['name']
-            (trusted, project) = tenant.getProject(project_name)
+            (trusted, project) = self.tenant.getProject(project_name)
             if project is None:
                 raise ProjectNotFoundError(project_name)
             project_config = model.ProjectConfig(project.canonical_name)
 
         configs = []
         for conf in conf_list:
+            implied_branch = None
             with configuration_exceptions('project', conf):
                 if not conf['_source_context'].trusted:
                     if project != conf['_source_context'].project:
                         raise ProjectNotPermittedError()
 
-                # Make a copy since we modify this later via pop
-                conf = copy.deepcopy(conf)
-                conf_templates = conf.pop('templates', [])
+                conf_templates = conf.get('templates', [])
                 # The way we construct a project definition is by
                 # parsing the definition as a template, then applying
                 # all of the templates, including the newly parsed
                 # one, in order.
-                project_template = ProjectTemplateParser.fromYaml(
-                    tenant, layout, conf)
+                project_template = self.project_template_parser.fromYaml(
+                    conf, validate=False)
+                # If this project definition is in a place where it
+                # should get implied branch matchers, set it.
+                if (not conf['_source_context'].trusted):
+                    implied_branch = conf['_source_context'].branch
                 for name in conf_templates:
-                    if name not in layout.project_templates:
+                    if name not in self.layout.project_templates:
                         raise TemplateNotFoundError(name)
-                configs.extend([layout.project_templates[name]
+                configs.extend([(self.layout.project_templates[name],
+                                 implied_branch)
                                 for name in conf_templates])
-                configs.append(project_template)
+                configs.append((project_template, implied_branch))
                 # Set the following values to the first one that we
                 # find and ignore subsequent settings.
                 mode = conf.get('merge-mode')
@@ -848,21 +866,19 @@
             project_config.merge_mode = model.MERGER_MAP['merge-resolve']
         if project_config.default_branch is None:
             project_config.default_branch = 'master'
-        for pipeline in layout.pipelines.values():
+        for pipeline in self.layout.pipelines.values():
             project_pipeline = model.ProjectPipelineConfig()
             queue_name = None
             # For every template, iterate over the job tree and replace or
             # create the jobs in the final definition as needed.
             pipeline_defined = False
-            for template in configs:
+            for (template, implied_branch) in configs:
                 if pipeline.name in template.pipelines:
-                    ProjectParser.log.debug(
-                        "Applying template %s to pipeline %s" %
-                        (template.name, pipeline.name))
                     pipeline_defined = True
                     template_pipeline = template.pipelines[pipeline.name]
                     project_pipeline.job_list.inheritFrom(
-                        template_pipeline.job_list)
+                        template_pipeline.job_list,
+                        implied_branch)
                     if template_pipeline.queue_name:
                         queue_name = template_pipeline.queue_name
             if queue_name:
@@ -1358,7 +1374,12 @@
             # branch.  Remember the branch and then implicitly add a
             # branch selector to each job there.  This makes the
             # in-repo configuration apply only to that branch.
-            for branch in project.source.getProjectBranches(project, tenant):
+            branches = sorted(project.source.getProjectBranches(
+                project, tenant))
+            if 'master' in branches:
+                branches.remove('master')
+                branches = ['master'] + branches
+            for branch in branches:
                 new_project_unparsed_branch_config[project][branch] = \
                     model.UnparsedTenantConfig()
                 job = merger.getFiles(
@@ -1406,19 +1427,20 @@
                             (job.source_context,))
                         continue
                     loaded = conf_root
-                    job.source_context.path = fn
+                    source_context = job.source_context.copy()
+                    source_context.path = fn
                     TenantParser.log.info(
                         "Loading configuration from %s" %
-                        (job.source_context,))
-                    project = job.source_context.project
-                    branch = job.source_context.branch
-                    if job.source_context.trusted:
+                        (source_context,))
+                    project = source_context.project
+                    branch = source_context.branch
+                    if source_context.trusted:
                         incdata = TenantParser._parseConfigProjectLayout(
-                            job.files[fn], job.source_context)
+                            job.files[fn], source_context)
                         config_projects_config.extend(incdata)
                     else:
                         incdata = TenantParser._parseUntrustedProjectLayout(
-                            job.files[fn], job.source_context)
+                            job.files[fn], source_context)
                         untrusted_projects_config.extend(incdata)
                     new_project_unparsed_config[project].extend(incdata)
                     if branch in new_project_unparsed_branch_config.get(
@@ -1516,13 +1538,15 @@
                     continue
                 layout.addSemaphore(semaphore)
 
+        project_template_parser = ProjectTemplateParser(tenant, layout)
         for config_template in data.project_templates:
             classes = TenantParser._getLoadClasses(tenant, config_template)
             if 'project-template' not in classes:
                 continue
-            layout.addProjectTemplate(ProjectTemplateParser.fromYaml(
-                tenant, layout, config_template))
+            layout.addProjectTemplate(project_template_parser.fromYaml(
+                config_template))
 
+        project_parser = ProjectParser(tenant, layout, project_template_parser)
         for config_projects in data.projects.values():
             # Unlike other config classes, we expect multiple project
             # stanzas with the same name, so that a config repo can
@@ -1540,14 +1564,15 @@
             if not filtered_projects:
                 continue
 
-            layout.addProjectConfig(ProjectParser.fromYaml(
-                tenant, layout, filtered_projects))
+            layout.addProjectConfig(project_parser.fromYaml(
+                filtered_projects))
 
     @staticmethod
     def _parseLayout(base, tenant, data, scheduler, connections):
         # Don't call this method from dynamic reconfiguration because
         # it interacts with drivers and connections.
         layout = model.Layout(tenant)
+        TenantParser.log.debug("Created layout id %s", layout.uuid)
 
         TenantParser._parseLayoutItems(layout, tenant, data,
                                        scheduler, connections)
@@ -1615,9 +1640,22 @@
         for branch in branches:
             fns1 = []
             fns2 = []
-            files_list = files.connections.get(
+            files_entry = files.connections.get(
                 project.source.connection.connection_name, {}).get(
-                    project.name, {}).get(branch, {}).keys()
+                    project.name, {}).get(branch)
+            # If there is no files entry at all for this
+            # project-branch, then use the cached config.
+            if files_entry is None:
+                if trusted:
+                    incdata = project.unparsed_config
+                else:
+                    incdata = project.unparsed_branch_config.get(branch)
+                if incdata:
+                    config.extend(incdata)
+                continue
+            # Otherwise, do not use the cached config (even if the
+            # files are empty as that likely means they were deleted).
+            files_list = files_entry.keys()
             for fn in files_list:
                 if fn.startswith("zuul.d/"):
                     fns1.append(fn)
@@ -1649,14 +1687,6 @@
 
                     config.extend(incdata)
 
-            if not loaded:
-                if trusted:
-                    incdata = project.unparsed_config
-                else:
-                    incdata = project.unparsed_branch_config.get(branch)
-                if incdata:
-                    config.extend(incdata)
-
     def createDynamicLayout(self, tenant, files,
                             include_config_projects=False,
                             scheduler=None, connections=None):
@@ -1671,6 +1701,7 @@
             self._loadDynamicProjectData(config, project, files, False, tenant)
 
         layout = model.Layout(tenant)
+        self.log.debug("Created layout id %s", layout.uuid)
         if not include_config_projects:
             # NOTE: the actual pipeline objects (complete with queues
             # and enqueued items) are copied by reference here.  This
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 3655115..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -14,8 +14,6 @@
 
 import abc
 
-import extras
-
 
 class BaseConnection(object, metaclass=abc.ABCMeta):
     """Base class for connections.
@@ -42,20 +40,18 @@
         self.driver = driver
         self.connection_name = connection_name
         self.connection_config = connection_config
-        self.statsd = extras.try_import('statsd.statsd')
 
     def logEvent(self, event):
         self.log.debug(
-            'Scheduling {driver} event from {connection}: {event}'.format(
-                driver=self.driver.name,
+            'Scheduling event from {connection}: {event}'.format(
                 connection=self.connection_name,
-                event=event.type))
+                event=event))
         try:
-            if self.statsd:
-                self.statsd.incr(
+            if self.sched.statsd:
+                self.sched.statsd.incr(
                     'zuul.event.{driver}.{event}'.format(
                         driver=self.driver.name, event=event.type))
-                self.statsd.incr(
+                self.sched.statsd.incr(
                     'zuul.event.{driver}.{connection}.{event}'.format(
                         driver=self.driver.name,
                         connection=self.connection_name,
diff --git a/zuul/driver/sql/alembic.ini b/zuul/driver/sql/alembic.ini
deleted file mode 100644
index 0c59505..0000000
--- a/zuul/driver/sql/alembic.ini
+++ /dev/null
@@ -1,69 +0,0 @@
-# A generic, single database configuration.
-
-[alembic]
-# path to migration scripts
-# NOTE(jhesketh): We may use alembic for other db components of zuul in the
-# future. Use a sub-folder for the reporters own versions.
-script_location = alembic_reporter
-
-# template used to generate migration files
-# file_template = %%(rev)s_%%(slug)s
-
-# max length of characters to apply to the
-# "slug" field
-#truncate_slug_length = 40
-
-# set to 'true' to run the environment during
-# the 'revision' command, regardless of autogenerate
-# revision_environment = false
-
-# set to 'true' to allow .pyc and .pyo files without
-# a source .py file to be detected as revisions in the
-# versions/ directory
-# sourceless = false
-
-# version location specification; this defaults
-# to alembic/versions.  When using multiple version
-# directories, initial revisions must be specified with --version-path
-# version_locations = %(here)s/bar %(here)s/bat alembic/versions
-
-# the output encoding used when revision files
-# are written from script.py.mako
-# output_encoding = utf-8
-
-sqlalchemy.url = mysql+pymysql://user@localhost/database
-
-# Logging configuration
-[loggers]
-keys = root,sqlalchemy,alembic
-
-[handlers]
-keys = console
-
-[formatters]
-keys = generic
-
-[logger_root]
-level = WARN
-handlers = console
-qualname =
-
-[logger_sqlalchemy]
-level = WARN
-handlers =
-qualname = sqlalchemy.engine
-
-[logger_alembic]
-level = INFO
-handlers =
-qualname = alembic
-
-[handler_console]
-class = StreamHandler
-args = (sys.stderr,)
-level = NOTSET
-formatter = generic
-
-[formatter_generic]
-format = %(levelname)-5.5s [%(name)s] %(message)s
-datefmt = %H:%M:%S
diff --git a/zuul/driver/sql/alembic_reporter/README b/zuul/driver/sql/alembic/README
similarity index 100%
rename from zuul/driver/sql/alembic_reporter/README
rename to zuul/driver/sql/alembic/README
diff --git a/zuul/driver/sql/alembic_reporter/env.py b/zuul/driver/sql/alembic/env.py
similarity index 100%
rename from zuul/driver/sql/alembic_reporter/env.py
rename to zuul/driver/sql/alembic/env.py
diff --git a/zuul/driver/sql/alembic_reporter/script.py.mako b/zuul/driver/sql/alembic/script.py.mako
similarity index 100%
rename from zuul/driver/sql/alembic_reporter/script.py.mako
rename to zuul/driver/sql/alembic/script.py.mako
diff --git a/zuul/driver/sql/alembic_reporter/versions/1dd914d4a482_allow_score_to_be_null.py b/zuul/driver/sql/alembic/versions/1dd914d4a482_allow_score_to_be_null.py
similarity index 100%
rename from zuul/driver/sql/alembic_reporter/versions/1dd914d4a482_allow_score_to_be_null.py
rename to zuul/driver/sql/alembic/versions/1dd914d4a482_allow_score_to_be_null.py
diff --git a/zuul/driver/sql/alembic_reporter/versions/20126015a87d_add_indexes.py b/zuul/driver/sql/alembic/versions/20126015a87d_add_indexes.py
similarity index 96%
rename from zuul/driver/sql/alembic_reporter/versions/20126015a87d_add_indexes.py
rename to zuul/driver/sql/alembic/versions/20126015a87d_add_indexes.py
index 3ac680d..12e7c09 100644
--- a/zuul/driver/sql/alembic_reporter/versions/20126015a87d_add_indexes.py
+++ b/zuul/driver/sql/alembic/versions/20126015a87d_add_indexes.py
@@ -53,4 +53,4 @@
 
 
 def downgrade():
-    pass
+    raise Exception("Downgrades not supported")
diff --git a/zuul/driver/sql/alembic_reporter/versions/4d3ebd7f06b9_set_up_initial_reporter_tables.py b/zuul/driver/sql/alembic/versions/4d3ebd7f06b9_set_up_initial_reporter_tables.py
similarity index 100%
rename from zuul/driver/sql/alembic_reporter/versions/4d3ebd7f06b9_set_up_initial_reporter_tables.py
rename to zuul/driver/sql/alembic/versions/4d3ebd7f06b9_set_up_initial_reporter_tables.py
diff --git a/zuul/driver/sql/alembic_reporter/versions/f86c9871ee67_add_tenant_column.py b/zuul/driver/sql/alembic/versions/5efb477fa963_add_ref_url_column.py
similarity index 71%
copy from zuul/driver/sql/alembic_reporter/versions/f86c9871ee67_add_tenant_column.py
copy to zuul/driver/sql/alembic/versions/5efb477fa963_add_ref_url_column.py
index 7728bd4..f9c3535 100644
--- a/zuul/driver/sql/alembic_reporter/versions/f86c9871ee67_add_tenant_column.py
+++ b/zuul/driver/sql/alembic/versions/5efb477fa963_add_ref_url_column.py
@@ -12,17 +12,17 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-"""Add tenant column
+"""Add ref_url column
 
-Revision ID: f86c9871ee67
-Revises: 20126015a87d
-Create Date: 2017-07-17 05:47:48.189767
+Revision ID: 5efb477fa963
+Revises: 60c119eb1e3f
+Create Date: 2017-09-12 22:50:29.307695
 
 """
 
 # revision identifiers, used by Alembic.
-revision = 'f86c9871ee67'
-down_revision = '20126015a87d'
+revision = '5efb477fa963'
+down_revision = '60c119eb1e3f'
 branch_labels = None
 depends_on = None
 
@@ -31,8 +31,8 @@
 
 
 def upgrade():
-    op.add_column('zuul_buildset', sa.Column('tenant', sa.String(255)))
+    op.add_column('zuul_buildset', sa.Column('ref_url', sa.String(255)))
 
 
 def downgrade():
-    op.drop_column('zuul_buildset', 'tenant')
+    raise Exception("Downgrades not supported")
diff --git a/zuul/driver/sql/alembic_reporter/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
similarity index 66%
rename from zuul/driver/sql/alembic_reporter/versions/60c119eb1e3f_use_build_set_results.py
rename to zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
index 8e8142f..985eb0c 100644
--- a/zuul/driver/sql/alembic_reporter/versions/60c119eb1e3f_use_build_set_results.py
+++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
@@ -35,15 +35,4 @@
 
 
 def downgrade():
-    op.add_column(BUILDSET_TABLE, sa.Column('score', sa.Integer))
-
-    connection = op.get_bind()
-    connection.execute(
-        """
-        UPDATE {buildset_table}
-         SET score=(
-             SELECT CASE result
-                WHEN 'SUCCESS' THEN 1
-                ELSE -1 END)
-        """.format(buildset_table=BUILDSET_TABLE))
-    op.drop_column(BUILDSET_TABLE, 'result')
+    raise Exception("Downgrades not supported")
diff --git a/zuul/driver/sql/alembic/versions/ba4cdce9b18c_add_rev_columns.py b/zuul/driver/sql/alembic/versions/ba4cdce9b18c_add_rev_columns.py
new file mode 100644
index 0000000..dc75983
--- /dev/null
+++ b/zuul/driver/sql/alembic/versions/ba4cdce9b18c_add_rev_columns.py
@@ -0,0 +1,25 @@
+"""Add oldrev/newrev columns
+
+Revision ID: ba4cdce9b18c
+Revises: 5efb477fa963
+Create Date: 2017-09-27 19:33:21.800198
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'ba4cdce9b18c'
+down_revision = '5efb477fa963'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.add_column('zuul_buildset', sa.Column('oldrev', sa.String(255)))
+    op.add_column('zuul_buildset', sa.Column('newrev', sa.String(255)))
+
+
+def downgrade():
+    raise Exception("Downgrades not supported")
diff --git a/zuul/driver/sql/alembic_reporter/versions/f86c9871ee67_add_tenant_column.py b/zuul/driver/sql/alembic/versions/f86c9871ee67_add_tenant_column.py
similarity index 95%
rename from zuul/driver/sql/alembic_reporter/versions/f86c9871ee67_add_tenant_column.py
rename to zuul/driver/sql/alembic/versions/f86c9871ee67_add_tenant_column.py
index 7728bd4..4087af3 100644
--- a/zuul/driver/sql/alembic_reporter/versions/f86c9871ee67_add_tenant_column.py
+++ b/zuul/driver/sql/alembic/versions/f86c9871ee67_add_tenant_column.py
@@ -35,4 +35,4 @@
 
 
 def downgrade():
-    op.drop_column('zuul_buildset', 'tenant')
+    raise Exception("Downgrades not supported")
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index afdc747..b964c0b 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -44,11 +44,10 @@
             # Recycle connections if they've been idle for more than 1 second.
             # MySQL connections are lightweight and thus keeping long-lived
             # connections around is not valuable.
-            # TODO(mordred) Add a config paramter
             self.engine = sa.create_engine(
                 self.dburi,
                 poolclass=sqlalchemy.pool.QueuePool,
-                pool_recycle=1)
+                pool_recycle=self.connection_config.get('pool_recycle', 1))
             self._migrate()
             self._setup_tables()
             self.zuul_buildset_table, self.zuul_build_table \
@@ -72,7 +71,7 @@
 
             config = alembic.config.Config()
             config.set_main_option("script_location",
-                                   "zuul:driver/sql/alembic_reporter")
+                                   "zuul:driver/sql/alembic")
             config.set_main_option("sqlalchemy.url",
                                    self.connection_config.get('dburi'))
 
@@ -91,6 +90,9 @@
             sa.Column('change', sa.Integer, nullable=True),
             sa.Column('patchset', sa.Integer, nullable=True),
             sa.Column('ref', sa.String(255)),
+            sa.Column('oldrev', sa.String(255)),
+            sa.Column('newrev', sa.String(255)),
+            sa.Column('ref_url', sa.String(255)),
             sa.Column('result', sa.String(255)),
             sa.Column('message', sa.TEXT()),
             sa.Column('tenant', sa.String(255)),
diff --git a/zuul/driver/sql/sqlreporter.py b/zuul/driver/sql/sqlreporter.py
index ca35577..7785c48 100644
--- a/zuul/driver/sql/sqlreporter.py
+++ b/zuul/driver/sql/sqlreporter.py
@@ -36,6 +36,8 @@
             change = getattr(item.change, 'number', '')
             patchset = getattr(item.change, 'patchset', '')
             ref = getattr(item.change, 'ref', '')
+            oldrev = getattr(item.change, 'oldrev', '')
+            newrev = getattr(item.change, 'newrev', '')
             buildset_ins = self.connection.zuul_buildset_table.insert().values(
                 zuul_ref=item.current_build_set.ref,
                 pipeline=item.pipeline.name,
@@ -43,6 +45,9 @@
                 change=change,
                 patchset=patchset,
                 ref=ref,
+                oldrev=oldrev,
+                newrev=newrev,
+                ref_url=item.change.url,
                 result=item.current_build_set.result,
                 message=self._formatItemReport(
                     item, with_jobs=False),
@@ -61,15 +66,19 @@
 
                 (result, url) = item.formatJobResult(job)
 
+                start = end = None
+                if build.start_time:
+                    start = datetime.datetime.fromtimestamp(build.start_time)
+                if build.end_time:
+                    end = datetime.datetime.fromtimestamp(build.end_time)
+
                 build_inserts.append({
                     'buildset_id': buildset_ins_result.inserted_primary_key,
                     'uuid': build.uuid,
                     'job_name': build.job.name,
                     'result': result,
-                    'start_time': datetime.datetime.fromtimestamp(
-                        build.start_time),
-                    'end_time': datetime.datetime.fromtimestamp(
-                        build.end_time),
+                    'start_time': start,
+                    'end_time': end,
                     'voting': build.job.voting,
                     'log_url': url,
                     'node_name': build.node_name,
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index eb2bfdf..58ad885 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -101,7 +101,7 @@
             job = super(ZuulGearmanClient, self).handleStatusRes(packet)
         except gear.UnknownJobError:
             handle = packet.getArgument(0)
-            for build in self.__zuul_gearman.builds:
+            for build in self.__zuul_gearman.builds.values():
                 if build.__gearman_job.handle == handle:
                     self.__zuul_gearman.onUnknownJob(job)
 
@@ -133,7 +133,7 @@
         self.gearman.shutdown()
         self.log.debug("Stopped")
 
-    def execute(self, job, item, pipeline, dependent_items=[],
+    def execute(self, job, item, pipeline, dependent_changes=[],
                 merger_items=[]):
         tenant = pipeline.layout.tenant
         uuid = str(uuid4().hex)
@@ -143,11 +143,7 @@
                 job, uuid,
                 item.current_build_set.getJobNodeSet(job.name),
                 item.change,
-                [x.change for x in dependent_items]))
-
-        dependent_items = dependent_items[:]
-        dependent_items.reverse()
-        all_items = dependent_items + [item]
+                dependent_changes))
 
         # TODOv3(jeblair): This ansible vars data structure will
         # replace the environment variables below.
@@ -168,7 +164,8 @@
                            project=project,
                            tenant=tenant.name,
                            timeout=job.timeout,
-                           jobtags=sorted(job.tags))
+                           jobtags=sorted(job.tags),
+                           _inheritance_path=list(job.inheritance_path))
         if hasattr(item.change, 'branch'):
             zuul_params['branch'] = item.change.branch
         if hasattr(item.change, 'tag'):
@@ -186,25 +183,8 @@
             and item.change.newrev != '0' * 40):
             zuul_params['newrev'] = item.change.newrev
         zuul_params['projects'] = []  # Set below
-        zuul_params['items'] = []
-        for i in all_items:
-            d = dict()
-            d['project'] = dict(
-                name=i.change.project.name,
-                short_name=i.change.project.name.split('/')[-1],
-                canonical_hostname=i.change.project.canonical_hostname,
-                canonical_name=i.change.project.canonical_name,
-                src_dir=os.path.join('src', i.change.project.canonical_name),
-            )
-            if hasattr(i.change, 'number'):
-                d['change'] = str(i.change.number)
-            if hasattr(i.change, 'url'):
-                d['change_url'] = i.change.url
-            if hasattr(i.change, 'patchset'):
-                d['patchset'] = str(i.change.patchset)
-            if hasattr(i.change, 'branch'):
-                d['branch'] = i.change.branch
-            zuul_params['items'].append(d)
+        zuul_params['_projects'] = {}  # transitional to convert to dict
+        zuul_params['items'] = dependent_changes
 
         params = dict()
         params['job'] = job.name
@@ -237,7 +217,7 @@
         required_projects = set()
 
         def make_project_dict(project, override_branch=None):
-            project_config = item.current_build_set.layout.project_configs.get(
+            project_config = item.layout.project_configs.get(
                 project.canonical_name, None)
             if project_config:
                 project_default_branch = project_config.default_branch
@@ -262,26 +242,40 @@
                                       job_project.override_branch))
                 projects.add(project)
                 required_projects.add(project)
-        for i in all_items:
-            if i.change.project not in projects:
-                project = i.change.project
+        for change in dependent_changes:
+            # We have to find the project this way because it may not
+            # be registered in the tenant (ie, a foreign project).
+            source = self.sched.connections.getSourceByHostname(
+                change['project']['canonical_hostname'])
+            project = source.getProject(change['project']['name'])
+            if project not in projects:
                 params['projects'].append(make_project_dict(project))
                 projects.add(project)
-
         for p in projects:
-            zuul_params['projects'].append(dict(
+            zuul_params['_projects'][p.canonical_name] = (dict(
                 name=p.name,
                 short_name=p.name.split('/')[-1],
-                canonical_hostname=p.canonical_hostname,
+                # Duplicate this into the dict too, so that iterating
+                # project.values() is easier for callers
                 canonical_name=p.canonical_name,
+                canonical_hostname=p.canonical_hostname,
                 src_dir=os.path.join('src', p.canonical_name),
                 required=(p in required_projects),
             ))
+        # We are transitioning "projects" from a list to a dict
+        # indexed by canonical name, as it is much easier to access
+        # values in ansible.  Existing callers are converted to
+        # "_projects", then once "projects" is unused we switch it,
+        # then convert callers back.  Finally when "_projects" is
+        # unused it will be removed.
+        for cn, p in zuul_params['_projects'].items():
+            zuul_params['projects'].append(p)
 
         build = Build(job, uuid)
         build.parameters = params
 
         if job.name == 'noop':
+            self.sched.onBuildStarted(build)
             self.sched.onBuildCompleted(build, 'SUCCESS', {})
             return build
 
@@ -371,9 +365,17 @@
                     result = 'RETRY_LIMIT'
                 else:
                     build.retry = True
+            if result in ('DISCONNECT', 'ABORTED'):
+                # Always retry if the executor just went away
+                build.retry = True
             result_data = data.get('data', {})
             self.log.info("Build %s complete, result %s" %
                           (job, result))
+            # If the build should be retried, don't supply the result
+            # so that elsewhere we don't have to deal with keeping
+            # track of which results are non-final.
+            if build.retry:
+                result = None
             self.sched.onBuildCompleted(build, result, result_data)
             # The test suite expects the build to be removed from the
             # internal dict after it's added to the report queue.
@@ -402,7 +404,7 @@
 
     def onDisconnect(self, job):
         self.log.info("Gearman job %s lost due to disconnect" % job)
-        self.onBuildCompleted(job)
+        self.onBuildCompleted(job, 'DISCONNECT')
 
     def onUnknownJob(self, job):
         self.log.info("Gearman job %s lost due to unknown handle" % job)
@@ -443,7 +445,9 @@
 
     def lookForLostBuilds(self):
         self.log.debug("Looking for lost builds")
-        for build in self.builds.values():
+        # Construct a list from the values iterator to protect from it changing
+        # out from underneath us.
+        for build in list(self.builds.values()):
             if build.result:
                 # The build has finished, it will be removed
                 continue
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index b37a82e..e7a6dbc 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -16,6 +16,7 @@
 import datetime
 import json
 import logging
+import multiprocessing
 import os
 import shutil
 import signal
@@ -28,6 +29,7 @@
 import traceback
 from zuul.lib.yamlutil import yaml
 from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd
 
 try:
     import ara.plugins.callbacks as ara_callbacks
@@ -45,6 +47,11 @@
 DEFAULT_FINGER_PORT = 79
 
 
+class StopException(Exception):
+    """An exception raised when an inner loop is asked to stop."""
+    pass
+
+
 class ExecutorError(Exception):
     """A non-transient run-time executor error
 
@@ -88,7 +95,7 @@
         if cache_dir == jobs_base:
             raise Exception("Cache dir and jobs dir cannot be the same")
         self.thread = threading.Thread(target=self._run,
-                                       name='executor-diskaccountant')
+                                       name='diskaccountant')
         self.thread.daemon = True
         self._running = False
         self.jobs_base = jobs_base
@@ -147,7 +154,7 @@
         self.function = function
         self.args = args
         self.thread = threading.Thread(target=self._run,
-                                       name='executor-watchdog')
+                                       name='watchdog')
         self.thread.daemon = True
         self.timed_out = None
 
@@ -284,6 +291,7 @@
         #     inventory.yaml
         #   .ansible (mounted in bwrap read-write)
         #     fact-cache/localhost
+        #     cp
         #   playbook_0 (mounted in bwrap for each playbook read-only)
         #     secrets.yaml
         #     project -> ../trusted/project_0/...
@@ -324,6 +332,8 @@
         self.ansible_cache_root = os.path.join(self.root, '.ansible')
         self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
         os.makedirs(self.fact_cache)
+        self.control_path = os.path.join(self.ansible_cache_root, 'cp')
+        os.makedirs(self.control_path)
         localhost_facts = os.path.join(self.fact_cache, 'localhost')
         # NOTE(pabelanger): We do not want to leak zuul-executor facts to other
         # playbooks now that smart fact gathering is enabled by default.  We
@@ -499,369 +509,6 @@
     return inventory
 
 
-class ExecutorMergeWorker(gear.TextWorker):
-    def __init__(self, executor_server, *args, **kw):
-        self.zuul_executor_server = executor_server
-        super(ExecutorMergeWorker, self).__init__(*args, **kw)
-
-    def handleNoop(self, packet):
-        # Wait until the update queue is empty before responding
-        while self.zuul_executor_server.update_queue.qsize():
-            time.sleep(1)
-
-        with self.zuul_executor_server.merger_lock:
-            super(ExecutorMergeWorker, self).handleNoop(packet)
-
-
-class ExecutorExecuteWorker(gear.TextWorker):
-    def __init__(self, executor_server, *args, **kw):
-        self.zuul_executor_server = executor_server
-        super(ExecutorExecuteWorker, self).__init__(*args, **kw)
-
-    def handleNoop(self, packet):
-        # Delay our response to running a new job based on the number
-        # of jobs we're currently running, in an attempt to spread
-        # load evenly among executors.
-        workers = len(self.zuul_executor_server.job_workers)
-        delay = (workers ** 2) / 1000.0
-        time.sleep(delay)
-        return super(ExecutorExecuteWorker, self).handleNoop(packet)
-
-
-class ExecutorServer(object):
-    log = logging.getLogger("zuul.ExecutorServer")
-
-    def __init__(self, config, connections={}, jobdir_root=None,
-                 keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
-        self.config = config
-        self.keep_jobdir = keep_jobdir
-        self.jobdir_root = jobdir_root
-        # TODOv3(mordred): make the executor name more unique --
-        # perhaps hostname+pid.
-        self.hostname = socket.gethostname()
-        self.log_streaming_port = log_streaming_port
-        self.merger_lock = threading.Lock()
-        self.verbose = False
-        self.command_map = dict(
-            stop=self.stop,
-            pause=self.pause,
-            unpause=self.unpause,
-            graceful=self.graceful,
-            verbose=self.verboseOn,
-            unverbose=self.verboseOff,
-            keep=self.keep,
-            nokeep=self.nokeep,
-        )
-
-        self.merge_root = get_default(self.config, 'executor', 'git_dir',
-                                      '/var/lib/zuul/executor-git')
-        self.default_username = get_default(self.config, 'executor',
-                                            'default_username', 'zuul')
-        self.disk_limit_per_job = int(get_default(self.config, 'executor',
-                                                  'disk_limit_per_job', 250))
-        self.merge_email = get_default(self.config, 'merger', 'git_user_email')
-        self.merge_name = get_default(self.config, 'merger', 'git_user_name')
-        execution_wrapper_name = get_default(self.config, 'executor',
-                                             'execution_wrapper', 'bubblewrap')
-        self.execution_wrapper = connections.drivers[execution_wrapper_name]
-
-        self.connections = connections
-        # This merger and its git repos are used to maintain
-        # up-to-date copies of all the repos that are used by jobs, as
-        # well as to support the merger:cat functon to supply
-        # configuration information to Zuul when it starts.
-        self.merger = self._getMerger(self.merge_root)
-        self.update_queue = DeduplicateQueue()
-
-        state_dir = get_default(self.config, 'executor', 'state_dir',
-                                '/var/lib/zuul', expand_user=True)
-        path = os.path.join(state_dir, 'executor.socket')
-        self.command_socket = commandsocket.CommandSocket(path)
-        ansible_dir = os.path.join(state_dir, 'ansible')
-        self.ansible_dir = ansible_dir
-        if os.path.exists(ansible_dir):
-            shutil.rmtree(ansible_dir)
-
-        zuul_dir = os.path.join(ansible_dir, 'zuul')
-        plugin_dir = os.path.join(zuul_dir, 'ansible')
-
-        os.makedirs(plugin_dir, mode=0o0755)
-
-        self.library_dir = os.path.join(plugin_dir, 'library')
-        self.action_dir = os.path.join(plugin_dir, 'action')
-        self.callback_dir = os.path.join(plugin_dir, 'callback')
-        self.lookup_dir = os.path.join(plugin_dir, 'lookup')
-        self.filter_dir = os.path.join(plugin_dir, 'filter')
-
-        _copy_ansible_files(zuul.ansible, plugin_dir)
-
-        # We're copying zuul.ansible.* into a directory we are going
-        # to add to pythonpath, so our plugins can "import
-        # zuul.ansible".  But we're not installing all of zuul, so
-        # create a __init__.py file for the stub "zuul" module.
-        with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
-            pass
-
-        self.job_workers = {}
-        self.disk_accountant = DiskAccountant(self.jobdir_root,
-                                              self.disk_limit_per_job,
-                                              self.stopJobByJobdir,
-                                              self.merge_root)
-
-    def _getMerger(self, root, logger=None):
-        if root != self.merge_root:
-            cache_root = self.merge_root
-        else:
-            cache_root = None
-        return zuul.merger.merger.Merger(root, self.connections,
-                                         self.merge_email, self.merge_name,
-                                         cache_root, logger)
-
-    def start(self):
-        self._running = True
-        self._command_running = True
-        server = self.config.get('gearman', 'server')
-        port = get_default(self.config, 'gearman', 'port', 4730)
-        ssl_key = get_default(self.config, 'gearman', 'ssl_key')
-        ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
-        ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
-        self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
-        self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
-        self.executor_worker = ExecutorExecuteWorker(
-            self, 'Zuul Executor Server')
-        self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
-        self.log.debug("Waiting for server")
-        self.merger_worker.waitForServer()
-        self.executor_worker.waitForServer()
-        self.log.debug("Registering")
-        self.register()
-
-        self.log.debug("Starting command processor")
-        self.command_socket.start()
-        self.command_thread = threading.Thread(target=self.runCommand)
-        self.command_thread.daemon = True
-        self.command_thread.start()
-
-        self.log.debug("Starting worker")
-        self.update_thread = threading.Thread(target=self._updateLoop)
-        self.update_thread.daemon = True
-        self.update_thread.start()
-        self.merger_thread = threading.Thread(target=self.run_merger)
-        self.merger_thread.daemon = True
-        self.merger_thread.start()
-        self.executor_thread = threading.Thread(target=self.run_executor)
-        self.executor_thread.daemon = True
-        self.executor_thread.start()
-        self.disk_accountant.start()
-
-    def register(self):
-        self.executor_worker.registerFunction("executor:execute")
-        self.executor_worker.registerFunction("executor:stop:%s" %
-                                              self.hostname)
-        self.merger_worker.registerFunction("merger:merge")
-        self.merger_worker.registerFunction("merger:cat")
-        self.merger_worker.registerFunction("merger:refstate")
-
-    def stop(self):
-        self.log.debug("Stopping")
-        self.disk_accountant.stop()
-        self._running = False
-        self._command_running = False
-        self.command_socket.stop()
-        self.update_queue.put(None)
-
-        for job_worker in list(self.job_workers.values()):
-            try:
-                job_worker.stop()
-            except Exception:
-                self.log.exception("Exception sending stop command "
-                                   "to worker:")
-        self.merger_worker.shutdown()
-        self.executor_worker.shutdown()
-        self.log.debug("Stopped")
-
-    def pause(self):
-        # TODOv3: implement
-        pass
-
-    def unpause(self):
-        # TODOv3: implement
-        pass
-
-    def graceful(self):
-        # TODOv3: implement
-        pass
-
-    def verboseOn(self):
-        self.verbose = True
-
-    def verboseOff(self):
-        self.verbose = False
-
-    def keep(self):
-        self.keep_jobdir = True
-
-    def nokeep(self):
-        self.keep_jobdir = False
-
-    def join(self):
-        self.update_thread.join()
-        self.merger_thread.join()
-        self.executor_thread.join()
-
-    def runCommand(self):
-        while self._command_running:
-            try:
-                command = self.command_socket.get().decode('utf8')
-                if command != '_stop':
-                    self.command_map[command]()
-            except Exception:
-                self.log.exception("Exception while processing command")
-
-    def _updateLoop(self):
-        while self._running:
-            try:
-                self._innerUpdateLoop()
-            except:
-                self.log.exception("Exception in update thread:")
-
-    def _innerUpdateLoop(self):
-        # Inside of a loop that keeps the main repositories up to date
-        task = self.update_queue.get()
-        if task is None:
-            # We are asked to stop
-            return
-        with self.merger_lock:
-            self.log.info("Updating repo %s/%s" % (
-                task.connection_name, task.project_name))
-            self.merger.updateRepo(task.connection_name, task.project_name)
-            self.log.debug("Finished updating repo %s/%s" %
-                           (task.connection_name, task.project_name))
-        task.setComplete()
-
-    def update(self, connection_name, project_name):
-        # Update a repository in the main merger
-        task = UpdateTask(connection_name, project_name)
-        task = self.update_queue.put(task)
-        return task
-
-    def run_merger(self):
-        self.log.debug("Starting merger listener")
-        while self._running:
-            try:
-                job = self.merger_worker.getJob()
-                try:
-                    if job.name == 'merger:cat':
-                        self.log.debug("Got cat job: %s" % job.unique)
-                        self.cat(job)
-                    elif job.name == 'merger:merge':
-                        self.log.debug("Got merge job: %s" % job.unique)
-                        self.merge(job)
-                    elif job.name == 'merger:refstate':
-                        self.log.debug("Got refstate job: %s" % job.unique)
-                        self.refstate(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
-                except Exception:
-                    self.log.exception("Exception while running job")
-                    job.sendWorkException(
-                        traceback.format_exc().encode('utf8'))
-            except gear.InterruptedError:
-                pass
-            except Exception:
-                self.log.exception("Exception while getting job")
-
-    def run_executor(self):
-        self.log.debug("Starting executor listener")
-        while self._running:
-            try:
-                job = self.executor_worker.getJob()
-                try:
-                    if job.name == 'executor:execute':
-                        self.log.debug("Got execute job: %s" % job.unique)
-                        self.executeJob(job)
-                    elif job.name.startswith('executor:stop'):
-                        self.log.debug("Got stop job: %s" % job.unique)
-                        self.stopJob(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
-                except Exception:
-                    self.log.exception("Exception while running job")
-                    job.sendWorkException(
-                        traceback.format_exc().encode('utf8'))
-            except gear.InterruptedError:
-                pass
-            except Exception:
-                self.log.exception("Exception while getting job")
-
-    def executeJob(self, job):
-        self.job_workers[job.unique] = AnsibleJob(self, job)
-        self.job_workers[job.unique].run()
-
-    def finishJob(self, unique):
-        del(self.job_workers[unique])
-
-    def stopJobByJobdir(self, jobdir):
-        unique = os.path.basename(jobdir)
-        self.stopJobByUnique(unique)
-
-    def stopJob(self, job):
-        try:
-            args = json.loads(job.arguments)
-            self.log.debug("Stop job with arguments: %s" % (args,))
-            unique = args['uuid']
-            self.stopJobByUnique(unique)
-        finally:
-            job.sendWorkComplete()
-
-    def stopJobByUnique(self, unique):
-        job_worker = self.job_workers.get(unique)
-        if not job_worker:
-            self.log.debug("Unable to find worker for job %s" % (unique,))
-            return
-        try:
-            job_worker.stop()
-        except Exception:
-            self.log.exception("Exception sending stop command "
-                               "to worker:")
-
-    def cat(self, job):
-        args = json.loads(job.arguments)
-        task = self.update(args['connection'], args['project'])
-        task.wait()
-        with self.merger_lock:
-            files = self.merger.getFiles(args['connection'], args['project'],
-                                         args['branch'], args['files'],
-                                         args.get('dirs', []))
-        result = dict(updated=True,
-                      files=files)
-        job.sendWorkComplete(json.dumps(result))
-
-    def refstate(self, job):
-        args = json.loads(job.arguments)
-        with self.merger_lock:
-            success, repo_state = self.merger.getRepoState(args['items'])
-        result = dict(updated=success,
-                      repo_state=repo_state)
-        job.sendWorkComplete(json.dumps(result))
-
-    def merge(self, job):
-        args = json.loads(job.arguments)
-        with self.merger_lock:
-            ret = self.merger.mergeChanges(args['items'], args.get('files'),
-                                           args.get('dirs', []),
-                                           args.get('repo_state'))
-        result = dict(merged=(ret is not None))
-        if ret is None:
-            result['commit'] = result['files'] = result['repo_state'] = None
-        else:
-            (result['commit'], result['files'], result['repo_state'],
-             recent) = ret
-        job.sendWorkComplete(json.dumps(result))
-
-
 class AnsibleJobLogAdapter(logging.LoggerAdapter):
     def process(self, msg, kwargs):
         msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
@@ -874,12 +521,14 @@
     RESULT_TIMED_OUT = 2
     RESULT_UNREACHABLE = 3
     RESULT_ABORTED = 4
+    RESULT_DISK_FULL = 5
 
     RESULT_MAP = {
         RESULT_NORMAL: 'RESULT_NORMAL',
         RESULT_TIMED_OUT: 'RESULT_TIMED_OUT',
         RESULT_UNREACHABLE: 'RESULT_UNREACHABLE',
         RESULT_ABORTED: 'RESULT_ABORTED',
+        RESULT_DISK_FULL: 'RESULT_DISK_FULL',
     }
 
     def __init__(self, executor_server, job):
@@ -892,6 +541,7 @@
         self.proc_lock = threading.Lock()
         self.running = False
         self.aborted = False
+        self.aborted_reason = None
         self.thread = None
         self.private_key_file = get_default(self.executor_server.config,
                                             'executor', 'private_key_file',
@@ -906,12 +556,16 @@
 
     def run(self):
         self.running = True
-        self.thread = threading.Thread(target=self.execute)
+        self.thread = threading.Thread(target=self.execute,
+                                       name='build-%s' % self.job.unique)
         self.thread.start()
 
-    def stop(self):
+    def stop(self, reason=None):
         self.aborted = True
+        self.aborted_reason = reason
         self.abortRunningProc()
+
+    def wait(self):
         if self.thread:
             self.thread.join()
 
@@ -1058,6 +712,13 @@
         self.job.sendWorkStatus(0, 100)
 
         result = self.runPlaybooks(args)
+
+        # Stop the persistent SSH connections.
+        setup_status, setup_code = self.runAnsibleCleanup(
+            self.jobdir.setup_playbook)
+
+        if self.aborted_reason == self.RESULT_DISK_FULL:
+            result = 'DISK_FULL'
         data = self.getResultData()
         result_data = json.dumps(dict(result=result,
                                       data=data))
@@ -1174,13 +835,44 @@
             # TODOv3(pabelanger): Implement post-run timeout setting.
             post_status, post_code = self.runAnsiblePlaybook(
                 playbook, args['timeout'], success, phase='post', index=index)
+            if post_status == self.RESULT_ABORTED:
+                return 'ABORTED'
             if post_status != self.RESULT_NORMAL or post_code != 0:
+                success = False
                 # If we encountered a pre-failure, that takes
                 # precedence over the post result.
                 if not pre_failed:
                     result = 'POST_FAILURE'
+                if (index + 1) == len(self.jobdir.post_playbooks):
+                    self._logFinalPlaybookError()
+
         return result
 
+    def _logFinalPlaybookError(self):
+        # Failures in the final post playbook can include failures
+        # uploading logs, which makes diagnosing issues difficult.
+        # Grab the output from the last playbook from the json
+        # file and log it.
+        json_output = self.jobdir.job_output_file.replace('txt', 'json')
+        self.log.debug("Final playbook failed")
+        if not os.path.exists(json_output):
+            self.log.debug("JSON logfile {logfile} is missing".format(
+                logfile=json_output))
+            return
+        try:
+            output = json.load(open(json_output, 'r'))
+            last_playbook = output[-1]
+            # Transform json to yaml - because it's easier to read and given
+            # the size of the data it'll be extra-hard to read this as an
+            # all on one line stringified nested dict.
+            yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False)
+            for line in yaml_out.split('\n'):
+                self.log.debug(line)
+        except Exception:
+            self.log.exception(
+                "Could not decode json from {logfile}".format(
+                    logfile=json_output))
+
     def getHostList(self, args):
         hosts = []
         for node in args['nodes']:
@@ -1507,6 +1199,13 @@
             config.write('display_args_to_stdout = %s\n' %
                          str(not jobdir_playbook.secrets_content))
 
+            # Increase the internal poll interval of ansible.
+            # The default interval of 0.001s is optimized for interactive
+            # ui at the expense of CPU load. As we have a non-interactive
+            # automation use case a longer poll interval is more suitable
+            # and reduces CPU load of the ansible process.
+            config.write('internal_poll_interval = 0.01\n')
+
             config.write('[ssh_connection]\n')
             # NB: when setting pipelining = True, keep_remote_files
             # must be False (the default).  Otherwise it apparently
@@ -1517,6 +1216,7 @@
             # command which expects interactive input on a tty (such
             # as sudo) it does not hang.
             config.write('pipelining = True\n')
+            config.write('control_path_dir = %s\n' % self.jobdir.control_path)
             ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
                 "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
             config.write('ssh_args = %s\n' % ssh_args)
@@ -1538,7 +1238,7 @@
             except Exception:
                 self.log.exception("Exception while killing ansible process:")
 
-    def runAnsible(self, cmd, timeout, playbook):
+    def runAnsible(self, cmd, timeout, playbook, wrapped=True):
         config_file = playbook.ansible_config
         env_copy = os.environ.copy()
         env_copy.update(self.ssh_agent.env)
@@ -1579,8 +1279,12 @@
         if playbook.secrets_content:
             secrets[playbook.secrets] = playbook.secrets_content
 
-        context = self.executor_server.execution_wrapper.getExecutionContext(
-            ro_paths, rw_paths, secrets)
+        if wrapped:
+            wrapper = self.executor_server.execution_wrapper
+        else:
+            wrapper = self.executor_server.connections.drivers['nullwrap']
+
+        context = wrapper.getExecutionContext(ro_paths, rw_paths, secrets)
 
         popen = context.getPopen(
             work_dir=self.jobdir.work_root,
@@ -1599,6 +1303,7 @@
             self.proc = popen(
                 cmd,
                 cwd=self.jobdir.work_root,
+                stdin=subprocess.DEVNULL,
                 stdout=subprocess.PIPE,
                 stderr=subprocess.STDOUT,
                 preexec_fn=os.setsid,
@@ -1646,6 +1351,13 @@
         elif ret == -9:
             # Received abort request.
             return (self.RESULT_ABORTED, None)
+        elif ret == 1:
+            if syntax_buffer[0].startswith(b'ERROR!'):
+                with open(self.jobdir.job_output_file, 'a') as job_output:
+                    for line in syntax_buffer:
+                        job_output.write("{now} | {line}\n".format(
+                            now=datetime.datetime.now(),
+                            line=line.decode('utf-8').rstrip()))
         elif ret == 4:
             # Ansible could not parse the yaml.
             self.log.debug("Ansible parse error")
@@ -1668,7 +1380,7 @@
                     now=datetime.datetime.now()))
                 found_marker = False
                 for line in syntax_buffer:
-                    if line.startswith('ERROR! Unexpected Exception'):
+                    if line.startswith(b'ERROR! Unexpected Exception'):
                         found_marker = True
                     if not found_marker:
                         continue
@@ -1688,11 +1400,63 @@
                '-a', 'gather_subset=!all']
 
         result, code = self.runAnsible(
-            cmd=cmd, timeout=60, playbook=playbook)
+            cmd=cmd, timeout=60, playbook=playbook,
+            wrapped=False)
         self.log.debug("Ansible complete, result %s code %s" % (
             self.RESULT_MAP[result], code))
         return result, code
 
+    def runAnsibleCleanup(self, playbook):
+        # TODO(jeblair): This requires a bugfix in Ansible 2.4
+        # Once this is used, increase the controlpersist timeout.
+        return (self.RESULT_NORMAL, 0)
+
+        if self.executor_server.verbose:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+
+        cmd = ['ansible', '*', verbose, '-m', 'meta',
+               '-a', 'reset_connection']
+
+        result, code = self.runAnsible(
+            cmd=cmd, timeout=60, playbook=playbook,
+            wrapped=False)
+        self.log.debug("Ansible complete, result %s code %s" % (
+            self.RESULT_MAP[result], code))
+        return result, code
+
+    def emitPlaybookBanner(self, playbook, step, phase, result=None):
+        # This is used to print a header and a footer, respectively at the
+        # beginning and the end of each playbook execution.
+        # We are doing it from the executor rather than from a callback because
+        # the parameters are not made available to the callback until it's too
+        # late.
+        phase = phase or ''
+        trusted = playbook.trusted
+        trusted = 'trusted' if trusted else 'untrusted'
+        branch = playbook.branch
+        playbook = playbook.canonical_name_and_path
+
+        if phase and phase != 'run':
+            phase = '{phase}-run'.format(phase=phase)
+        phase = phase.upper()
+
+        if result is not None:
+            result = self.RESULT_MAP[result]
+            msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
+            msg = msg.format(phase=phase, step=step, result=result,
+                             trusted=trusted, playbook=playbook, branch=branch)
+        else:
+            msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
+            msg = msg.format(phase=phase, step=step, trusted=trusted,
+                             playbook=playbook, branch=branch)
+
+        with open(self.jobdir.job_output_file, 'a') as job_output:
+            job_output.write("{now} | {msg}\n".format(
+                now=datetime.datetime.now(),
+                msg=msg))
+
     def runAnsiblePlaybook(self, playbook, timeout, success=None,
                            phase=None, index=None):
         if self.executor_server.verbose:
@@ -1723,8 +1487,482 @@
         if self.executor_variables_file is not None:
             cmd.extend(['-e@%s' % self.executor_variables_file])
 
+        self.emitPlaybookBanner(playbook, 'START', phase)
+
         result, code = self.runAnsible(
             cmd=cmd, timeout=timeout, playbook=playbook)
         self.log.debug("Ansible complete, result %s code %s" % (
             self.RESULT_MAP[result], code))
+
+        self.emitPlaybookBanner(playbook, 'END', phase, result=result)
         return result, code
+
+
+class ExecutorMergeWorker(gear.TextWorker):
+    def __init__(self, executor_server, *args, **kw):
+        self.zuul_executor_server = executor_server
+        super(ExecutorMergeWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        # Wait until the update queue is empty before responding
+        while self.zuul_executor_server.update_queue.qsize():
+            time.sleep(1)
+
+        with self.zuul_executor_server.merger_lock:
+            super(ExecutorMergeWorker, self).handleNoop(packet)
+
+
+class ExecutorExecuteWorker(gear.TextWorker):
+    def __init__(self, executor_server, *args, **kw):
+        self.zuul_executor_server = executor_server
+        super(ExecutorExecuteWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        # Delay our response to running a new job based on the number
+        # of jobs we're currently running, in an attempt to spread
+        # load evenly among executors.
+        workers = len(self.zuul_executor_server.job_workers)
+        delay = (workers ** 2) / 1000.0
+        time.sleep(delay)
+        return super(ExecutorExecuteWorker, self).handleNoop(packet)
+
+
+class ExecutorServer(object):
+    log = logging.getLogger("zuul.ExecutorServer")
+    _job_class = AnsibleJob
+
+    def __init__(self, config, connections={}, jobdir_root=None,
+                 keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
+        self.config = config
+        self.keep_jobdir = keep_jobdir
+        self.jobdir_root = jobdir_root
+        # TODOv3(mordred): make the executor name more unique --
+        # perhaps hostname+pid.
+        self.hostname = get_default(self.config, 'executor', 'hostname',
+                                    socket.gethostname())
+        self.log_streaming_port = log_streaming_port
+        self.merger_lock = threading.Lock()
+        self.run_lock = threading.Lock()
+        self.verbose = False
+        self.command_map = dict(
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            graceful=self.graceful,
+            verbose=self.verboseOn,
+            unverbose=self.verboseOff,
+            keep=self.keep,
+            nokeep=self.nokeep,
+        )
+
+        self.statsd = get_statsd(config)
+        self.merge_root = get_default(self.config, 'executor', 'git_dir',
+                                      '/var/lib/zuul/executor-git')
+        self.default_username = get_default(self.config, 'executor',
+                                            'default_username', 'zuul')
+        self.disk_limit_per_job = int(get_default(self.config, 'executor',
+                                                  'disk_limit_per_job', 250))
+        self.merge_email = get_default(self.config, 'merger', 'git_user_email')
+        self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+        self.merge_speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        self.merge_speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
+        execution_wrapper_name = get_default(self.config, 'executor',
+                                             'execution_wrapper', 'bubblewrap')
+        load_multiplier = float(get_default(self.config, 'executor',
+                                            'load_multiplier', '2.5'))
+        self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+        self.accepting_work = False
+        self.execution_wrapper = connections.drivers[execution_wrapper_name]
+
+        self.connections = connections
+        # This merger and its git repos are used to maintain
+        # up-to-date copies of all the repos that are used by jobs, as
+        # well as to support the merger:cat functon to supply
+        # configuration information to Zuul when it starts.
+        self.merger = self._getMerger(self.merge_root)
+        self.update_queue = DeduplicateQueue()
+
+        state_dir = get_default(self.config, 'executor', 'state_dir',
+                                '/var/lib/zuul', expand_user=True)
+        path = os.path.join(state_dir, 'executor.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
+        ansible_dir = os.path.join(state_dir, 'ansible')
+        self.ansible_dir = ansible_dir
+        if os.path.exists(ansible_dir):
+            shutil.rmtree(ansible_dir)
+
+        zuul_dir = os.path.join(ansible_dir, 'zuul')
+        plugin_dir = os.path.join(zuul_dir, 'ansible')
+
+        os.makedirs(plugin_dir, mode=0o0755)
+
+        self.library_dir = os.path.join(plugin_dir, 'library')
+        self.action_dir = os.path.join(plugin_dir, 'action')
+        self.callback_dir = os.path.join(plugin_dir, 'callback')
+        self.lookup_dir = os.path.join(plugin_dir, 'lookup')
+        self.filter_dir = os.path.join(plugin_dir, 'filter')
+
+        _copy_ansible_files(zuul.ansible, plugin_dir)
+
+        # We're copying zuul.ansible.* into a directory we are going
+        # to add to pythonpath, so our plugins can "import
+        # zuul.ansible".  But we're not installing all of zuul, so
+        # create a __init__.py file for the stub "zuul" module.
+        with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
+            pass
+
+        self.job_workers = {}
+        self.disk_accountant = DiskAccountant(self.jobdir_root,
+                                              self.disk_limit_per_job,
+                                              self.stopJobDiskFull,
+                                              self.merge_root)
+
+    def _getMerger(self, root, logger=None):
+        if root != self.merge_root:
+            cache_root = self.merge_root
+        else:
+            cache_root = None
+        return zuul.merger.merger.Merger(
+            root, self.connections, self.merge_email, self.merge_name,
+            self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
+
+    def start(self):
+        self._running = True
+        self._command_running = True
+        server = self.config.get('gearman', 'server')
+        port = get_default(self.config, 'gearman', 'port', 4730)
+        ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+        ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+        ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+        self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
+        self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+        self.executor_worker = ExecutorExecuteWorker(
+            self, 'Zuul Executor Server')
+        self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
+        self.log.debug("Waiting for server")
+        self.merger_worker.waitForServer()
+        self.executor_worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand,
+                                               name='command')
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
+        self.log.debug("Starting worker")
+        self.update_thread = threading.Thread(target=self._updateLoop,
+                                              name='update')
+        self.update_thread.daemon = True
+        self.update_thread.start()
+        self.merger_thread = threading.Thread(target=self.run_merger,
+                                              name='merger')
+        self.merger_thread.daemon = True
+        self.merger_thread.start()
+        self.executor_thread = threading.Thread(target=self.run_executor,
+                                                name='executor')
+        self.executor_thread.daemon = True
+        self.executor_thread.start()
+        self.governor_stop_event = threading.Event()
+        self.governor_thread = threading.Thread(target=self.run_governor,
+                                                name='governor')
+        self.governor_thread.daemon = True
+        self.governor_thread.start()
+        self.disk_accountant.start()
+
+    def register(self):
+        self.register_work()
+        self.executor_worker.registerFunction("executor:stop:%s" %
+                                              self.hostname)
+        self.merger_worker.registerFunction("merger:merge")
+        self.merger_worker.registerFunction("merger:cat")
+        self.merger_worker.registerFunction("merger:refstate")
+
+    def register_work(self):
+        if self._running:
+            self.accepting_work = True
+            self.executor_worker.registerFunction("executor:execute")
+
+    def unregister_work(self):
+        self.accepting_work = False
+        self.executor_worker.unregisterFunction("executor:execute")
+
+    def stop(self):
+        self.log.debug("Stopping")
+        self.disk_accountant.stop()
+        # The governor can change function registration, so make sure
+        # it has stopped.
+        self.governor_stop_event.set()
+        self.governor_thread.join()
+        # Stop accepting new jobs
+        self.merger_worker.setFunctions([])
+        self.executor_worker.setFunctions([])
+        # Tell the executor worker to abort any jobs it just accepted,
+        # and grab the list of currently running job workers.
+        with self.run_lock:
+            self._running = False
+            self._command_running = False
+            workers = list(self.job_workers.values())
+        self.command_socket.stop()
+
+        for job_worker in workers:
+            try:
+                job_worker.stop()
+            except Exception:
+                self.log.exception("Exception sending stop command "
+                                   "to worker:")
+        for job_worker in workers:
+            try:
+                job_worker.wait()
+            except Exception:
+                self.log.exception("Exception waiting for worker "
+                                   "to stop:")
+
+        # Now that we aren't accepting any new jobs, and all of the
+        # running jobs have stopped, tell the update processor to
+        # stop.
+        self.update_queue.put(None)
+
+        # All job results should have been sent by now, shutdown the
+        # gearman workers.
+        self.merger_worker.shutdown()
+        self.executor_worker.shutdown()
+
+        if self.statsd:
+            base_key = 'zuul.executor.%s' % self.hostname
+            self.statsd.gauge(base_key + '.load_average', 0)
+            self.statsd.gauge(base_key + '.running_builds', 0)
+
+        self.log.debug("Stopped")
+
+    def join(self):
+        self.governor_thread.join()
+        self.update_thread.join()
+        self.merger_thread.join()
+        self.executor_thread.join()
+
+    def pause(self):
+        # TODOv3: implement
+        pass
+
+    def unpause(self):
+        # TODOv3: implement
+        pass
+
+    def graceful(self):
+        # TODOv3: implement
+        pass
+
+    def verboseOn(self):
+        self.verbose = True
+
+    def verboseOff(self):
+        self.verbose = False
+
+    def keep(self):
+        self.keep_jobdir = True
+
+    def nokeep(self):
+        self.keep_jobdir = False
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
+
+    def _updateLoop(self):
+        while True:
+            try:
+                self._innerUpdateLoop()
+            except StopException:
+                return
+            except Exception:
+                self.log.exception("Exception in update thread:")
+
+    def _innerUpdateLoop(self):
+        # Inside of a loop that keeps the main repositories up to date
+        task = self.update_queue.get()
+        if task is None:
+            # We are asked to stop
+            raise StopException()
+        with self.merger_lock:
+            self.log.info("Updating repo %s/%s" % (
+                task.connection_name, task.project_name))
+            self.merger.updateRepo(task.connection_name, task.project_name)
+            self.log.debug("Finished updating repo %s/%s" %
+                           (task.connection_name, task.project_name))
+        task.setComplete()
+
+    def update(self, connection_name, project_name):
+        # Update a repository in the main merger
+        task = UpdateTask(connection_name, project_name)
+        task = self.update_queue.put(task)
+        return task
+
+    def run_merger(self):
+        self.log.debug("Starting merger listener")
+        while self._running:
+            try:
+                job = self.merger_worker.getJob()
+                try:
+                    self.mergerJobDispatch(job)
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(
+                        traceback.format_exc().encode('utf8'))
+            except gear.InterruptedError:
+                pass
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def mergerJobDispatch(self, job):
+        with self.run_lock:
+            if job.name == 'merger:cat':
+                self.log.debug("Got cat job: %s" % job.unique)
+                self.cat(job)
+            elif job.name == 'merger:merge':
+                self.log.debug("Got merge job: %s" % job.unique)
+                self.merge(job)
+            elif job.name == 'merger:refstate':
+                self.log.debug("Got refstate job: %s" % job.unique)
+                self.refstate(job)
+            else:
+                self.log.error("Unable to handle job %s" % job.name)
+                job.sendWorkFail()
+
+    def run_executor(self):
+        self.log.debug("Starting executor listener")
+        while self._running:
+            try:
+                job = self.executor_worker.getJob()
+                try:
+                    self.executorJobDispatch(job)
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(
+                        traceback.format_exc().encode('utf8'))
+            except gear.InterruptedError:
+                pass
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def executorJobDispatch(self, job):
+        with self.run_lock:
+            if not self._running:
+                job.sendWorkFail()
+                return
+            if job.name == 'executor:execute':
+                self.log.debug("Got execute job: %s" % job.unique)
+                self.executeJob(job)
+            elif job.name.startswith('executor:stop'):
+                self.log.debug("Got stop job: %s" % job.unique)
+                self.stopJob(job)
+            else:
+                self.log.error("Unable to handle job %s" % job.name)
+                job.sendWorkFail()
+
+    def executeJob(self, job):
+        if self.statsd:
+            base_key = 'zuul.executor.%s' % self.hostname
+            self.statsd.incr(base_key + '.builds')
+        self.job_workers[job.unique] = self._job_class(self, job)
+        self.job_workers[job.unique].run()
+
+    def run_governor(self):
+        while not self.governor_stop_event.wait(30):
+            try:
+                self.manageLoad()
+            except Exception:
+                self.log.exception("Exception in governor thread:")
+
+    def manageLoad(self):
+        ''' Apply some heuristics to decide whether or not we should
+            be askign for more jobs '''
+        load_avg = os.getloadavg()[0]
+        if self.accepting_work:
+            # Don't unregister if we don't have any active jobs.
+            if load_avg > self.max_load_avg and self.job_workers:
+                self.log.info(
+                    "Unregistering due to high system load {} > {}".format(
+                        load_avg, self.max_load_avg))
+                self.unregister_work()
+        elif load_avg <= self.max_load_avg:
+            self.log.info(
+                "Re-registering as load is within limits {} <= {}".format(
+                    load_avg, self.max_load_avg))
+            self.register_work()
+        if self.statsd:
+            base_key = 'zuul.executor.%s' % self.hostname
+            self.statsd.gauge(base_key + '.load_average',
+                              int(load_avg * 100))
+            self.statsd.gauge(base_key + '.running_builds',
+                              len(self.job_workers))
+
+    def finishJob(self, unique):
+        del(self.job_workers[unique])
+
+    def stopJobDiskFull(self, jobdir):
+        unique = os.path.basename(jobdir)
+        self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
+
+    def stopJob(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Stop job with arguments: %s" % (args,))
+            unique = args['uuid']
+            self.stopJobByUnique(unique)
+        finally:
+            job.sendWorkComplete()
+
+    def stopJobByUnique(self, unique, reason=None):
+        job_worker = self.job_workers.get(unique)
+        if not job_worker:
+            self.log.debug("Unable to find worker for job %s" % (unique,))
+            return
+        try:
+            job_worker.stop(reason)
+        except Exception:
+            self.log.exception("Exception sending stop command "
+                               "to worker:")
+
+    def cat(self, job):
+        args = json.loads(job.arguments)
+        task = self.update(args['connection'], args['project'])
+        task.wait()
+        with self.merger_lock:
+            files = self.merger.getFiles(args['connection'], args['project'],
+                                         args['branch'], args['files'],
+                                         args.get('dirs', []))
+        result = dict(updated=True,
+                      files=files)
+        job.sendWorkComplete(json.dumps(result))
+
+    def refstate(self, job):
+        args = json.loads(job.arguments)
+        with self.merger_lock:
+            success, repo_state = self.merger.getRepoState(args['items'])
+        result = dict(updated=success,
+                      repo_state=repo_state)
+        job.sendWorkComplete(json.dumps(result))
+
+    def merge(self, job):
+        args = json.loads(job.arguments)
+        with self.merger_lock:
+            ret = self.merger.mergeChanges(args['items'], args.get('files'),
+                                           args.get('dirs', []),
+                                           args.get('repo_state'))
+        result = dict(merged=(ret is not None))
+        if ret is None:
+            result['commit'] = result['files'] = result['repo_state'] = None
+        else:
+            (result['commit'], result['files'], result['repo_state'],
+             recent) = ret
+        job.sendWorkComplete(json.dumps(result))
diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py
index 79d78f4..262490a 100644
--- a/zuul/lib/connections.py
+++ b/zuul/lib/connections.py
@@ -161,3 +161,10 @@
     def getTrigger(self, connection_name, config=None):
         connection = self.connections[connection_name]
         return connection.driver.getTrigger(connection, config)
+
+    def getSourceByHostname(self, canonical_hostname):
+        for connection in self.connections.values():
+            if hasattr(connection, 'canonical_hostname'):
+                if connection.canonical_hostname == canonical_hostname:
+                    return self.getSource(connection.connection_name)
+        return None
diff --git a/zuul/lib/queue.py b/zuul/lib/queue.py
new file mode 100644
index 0000000..db8af47
--- /dev/null
+++ b/zuul/lib/queue.py
@@ -0,0 +1,78 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import threading
+
+
+class MergedQueue(object):
+    def __init__(self):
+        self.queue = collections.deque()
+        self.lock = threading.RLock()
+        self.condition = threading.Condition(self.lock)
+        self.join_condition = threading.Condition(self.lock)
+        self.tasks = 0
+
+    def qsize(self):
+        return len(self.queue)
+
+    def empty(self):
+        return self.qsize() == 0
+
+    def put(self, item):
+        # Returns the original item if added, or an updated equivalent
+        # item if already enqueued.
+        self.condition.acquire()
+        ret = None
+        try:
+            for x in self.queue:
+                if item == x:
+                    ret = x
+                    if hasattr(ret, 'merge'):
+                        ret.merge(item)
+            if ret is None:
+                ret = item
+                self.queue.append(item)
+                self.condition.notify()
+        finally:
+            self.condition.release()
+        return ret
+
+    def get(self):
+        self.condition.acquire()
+        try:
+            while True:
+                try:
+                    ret = self.queue.popleft()
+                    self.join_condition.acquire()
+                    self.tasks += 1
+                    self.join_condition.release()
+                    return ret
+                except IndexError:
+                    self.condition.wait()
+        finally:
+            self.condition.release()
+
+    def task_done(self):
+        self.join_condition.acquire()
+        self.tasks -= 1
+        self.join_condition.notify()
+        self.join_condition.release()
+
+    def join(self):
+        self.join_condition.acquire()
+        while self.tasks:
+            self.join_condition.wait()
+        self.join_condition.release()
diff --git a/zuul/lib/statsd.py b/zuul/lib/statsd.py
new file mode 100644
index 0000000..0ccacf9
--- /dev/null
+++ b/zuul/lib/statsd.py
@@ -0,0 +1,30 @@
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from zuul.lib.config import get_default
+
+
+def get_statsd_config(config):
+    statsd_host = get_default(config, 'statsd', 'server')
+    statsd_port = int(get_default(config, 'statsd', 'port', 8125))
+    statsd_prefix = get_default(config, 'statsd', 'prefix')
+    return (statsd_host, statsd_port, statsd_prefix)
+
+
+def get_statsd(config):
+    (statsd_host, statsd_port, statsd_prefix) = get_statsd_config(config)
+    if statsd_host is None:
+        return None
+    import statsd
+    return statsd.StatsClient(statsd_host, statsd_port, statsd_prefix)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index b94b8a5..c17d1e7 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -11,6 +11,7 @@
 # under the License.
 
 import logging
+import textwrap
 
 from zuul import exceptions
 from zuul import model
@@ -42,9 +43,10 @@
 class PipelineManager(object):
     """Abstract Base Class for enqueing and processing Changes in a Pipeline"""
 
-    log = logging.getLogger("zuul.PipelineManager")
-
     def __init__(self, sched, pipeline):
+        self.log = logging.getLogger("zuul.Pipeline.%s.%s" %
+                                     (pipeline.layout.tenant.name,
+                                      pipeline.name,))
         self.sched = sched
         self.pipeline = pipeline
         self.event_filters = []
@@ -228,20 +230,17 @@
                                (item.change, change_queue))
                 change_queue.enqueueItem(item)
 
-                # Get an updated copy of the layout if necessary.
-                # This will return one of the following:
-                # 1) An existing layout from the item ahead or pipeline.
-                # 2) A newly created layout from the cached pipeline
-                #    layout config plus the previously returned
-                #    in-repo files stored in the buildset.
-                # 3) None in the case that a fetch of the files from
-                #    the merger is still pending.
-                item.current_build_set.layout = self.getLayout(item)
-
-                # Rebuild the frozen job tree from the new layout, if
-                # we have one.  If not, it will be built later.
-                if item.current_build_set.layout:
-                    item.freezeJobGraph()
+                # Get an updated copy of the layout and update the job
+                # graph if necessary.  This resumes the buildset merge
+                # state machine.  If we have an up-to-date layout, it
+                # will go ahead and refresh the job graph if needed;
+                # or it will send a new merge job if necessary, or it
+                # will do nothing if we're waiting on a merge job.
+                item.job_graph = None
+                item.layout = None
+                if item.active:
+                    if self.prepareItem(item):
+                        self.prepareJobs(item)
 
                 # Re-set build results in case any new jobs have been
                 # added to the tree.
@@ -359,10 +358,10 @@
             try:
                 nodeset = item.current_build_set.getJobNodeSet(job.name)
                 self.sched.nodepool.useNodeSet(nodeset)
-                build = self.sched.executor.execute(job, item,
-                                                    self.pipeline,
-                                                    build_set.dependent_items,
-                                                    build_set.merger_items)
+                build = self.sched.executor.execute(
+                    job, item, self.pipeline,
+                    build_set.dependent_changes,
+                    build_set.merger_items)
                 self.log.debug("Adding build %s of job %s to item %s" %
                                (build, job, item))
                 item.addBuild(build)
@@ -373,7 +372,7 @@
     def executeJobs(self, item):
         # TODO(jeblair): This should return a value indicating a job
         # was executed.  Appears to be a longstanding bug.
-        if not item.current_build_set.layout:
+        if not item.layout:
             return False
 
         jobs = item.findJobsToRun(
@@ -407,13 +406,8 @@
                     old_build_set.item, build.job)
 
             if not was_running:
-                try:
-                    nodeset = build.build_set.getJobNodeSet(build.job.name)
-                    self.sched.nodepool.returnNodeSet(nodeset)
-                except Exception:
-                    self.log.exception("Unable to return nodeset %s for "
-                                       "canceled build request %s" %
-                                       (nodeset, build))
+                nodeset = build.build_set.getJobNodeSet(build.job.name)
+                self.sched.nodepool.returnNodeSet(nodeset)
             build.result = 'CANCELED'
             canceled = True
             canceled_jobs.add(build.job.name)
@@ -434,30 +428,60 @@
         import zuul.configloader
         loader = zuul.configloader.ConfigLoader()
 
-        build_set = item.current_build_set
         self.log.debug("Loading dynamic layout")
+        (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
+        build_set = item.current_build_set
+        trusted_layout_verified = False
         try:
             # First parse the config as it will land with the
             # full set of config and project repos.  This lets us
             # catch syntax errors in config repos even though we won't
             # actually run with that config.
-            loader.createDynamicLayout(
-                item.pipeline.layout.tenant,
-                build_set.files,
-                include_config_projects=True,
-                scheduler=self.sched,
-                connections=self.sched.connections)
+            if trusted_updates:
+                self.log.debug("Loading dynamic layout (phase 1)")
+                loader.createDynamicLayout(
+                    item.pipeline.layout.tenant,
+                    build_set.files,
+                    include_config_projects=True,
+                    scheduler=self.sched,
+                    connections=self.sched.connections)
+                trusted_layout_verified = True
 
             # Then create the config a second time but without changes
             # to config repos so that we actually use this config.
-            layout = loader.createDynamicLayout(
-                item.pipeline.layout.tenant,
-                build_set.files,
-                include_config_projects=False)
+            if untrusted_updates:
+                self.log.debug("Loading dynamic layout (phase 2)")
+                layout = loader.createDynamicLayout(
+                    item.pipeline.layout.tenant,
+                    build_set.files,
+                    include_config_projects=False)
+            else:
+                # We're a change to a config repo (with no untrusted
+                # items ahead), so just use the most recently
+                # generated layout.
+                if item.item_ahead:
+                    return item.item_ahead.layout
+                else:
+                    return item.queue.pipeline.layout
+            self.log.debug("Loading dynamic layout complete")
         except zuul.configloader.ConfigurationSyntaxError as e:
-            self.log.info("Configuration syntax error "
-                          "in dynamic layout")
-            item.setConfigError(str(e))
+            self.log.info("Configuration syntax error in dynamic layout")
+            if trusted_layout_verified:
+                # The config is good if we include config-projects,
+                # but is currently invalid if we omit them.  Instead
+                # of returning the whole error message, just leave a
+                # note that the config will work once the dependent
+                # changes land.
+                msg = "This change depends on a change "\
+                      "to a config project.\n\n"
+                msg += textwrap.fill(textwrap.dedent("""\
+                The syntax of the configuration in this change has
+                been verified to be correct once the config project
+                change upon which it depends is merged, but it can not
+                be used until that occurs."""))
+                item.setConfigError(msg)
+            else:
+                item.setConfigError(str(e))
             return None
         except Exception:
             self.log.exception("Error in dynamic layout")
@@ -468,7 +492,7 @@
     def getLayout(self, item):
         if not item.change.updatesConfig():
             if item.item_ahead:
-                return item.item_ahead.current_build_set.layout
+                return item.item_ahead.layout
             else:
                 return item.queue.pipeline.layout
         # This item updates the config, ask the merger for the result.
@@ -519,10 +543,9 @@
     def prepareJobs(self, item):
         # This only runs once the item is in the pipeline's action window
         # Returns True if the item is ready, false otherwise
-        build_set = item.current_build_set
-        if not build_set.layout:
-            build_set.layout = self.getLayout(item)
-        if not build_set.layout:
+        if not item.layout:
+            item.layout = self.getLayout(item)
+        if not item.layout:
             return False
 
         if not item.job_graph:
@@ -748,8 +771,7 @@
         # pipeline, use the dynamic layout if available, otherwise,
         # fall back to the current static layout as a best
         # approximation.
-        layout = (item.current_build_set.layout or
-                  self.pipeline.layout)
+        layout = (item.layout or self.pipeline.layout)
 
         project_in_pipeline = True
         if not layout.getProjectPipelineConfig(item.change.project,
@@ -815,19 +837,28 @@
                 dt = None
             items = len(self.pipeline.getAllItems())
 
-            # stats.timers.zuul.pipeline.NAME.resident_time
-            # stats_counts.zuul.pipeline.NAME.total_changes
-            # stats.gauges.zuul.pipeline.NAME.current_changes
-            key = 'zuul.pipeline.%s' % self.pipeline.name
+            tenant = self.pipeline.layout.tenant
+            basekey = 'zuul.tenant.%s' % tenant.name
+            key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
+            # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time
+            # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.total_changes
+            # stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.current_changes
             self.sched.statsd.gauge(key + '.current_changes', items)
             if dt:
                 self.sched.statsd.timing(key + '.resident_time', dt)
                 self.sched.statsd.incr(key + '.total_changes')
 
-            # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
-            # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
-            project_name = item.change.project.name.replace('/', '.')
-            key += '.%s' % project_name
+            hostname = (item.change.project.canonical_hostname.
+                        replace('.', '_'))
+            projectname = (item.change.project.name.
+                           replace('.', '_').replace('/', '.'))
+            projectname = projectname.replace('.', '_').replace('/', '.')
+            branchname = item.change.branch.replace('.', '_').replace('/', '.')
+            # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.
+            #   project.<host>.<project>.<branch>.resident_time
+            # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.
+            #   project.<host>.<project>.<branch>.total_changes
+            key += '.project.%s.%s.%s' % (hostname, projectname, branchname)
             if dt:
                 self.sched.statsd.timing(key + '.resident_time', dt)
                 self.sched.statsd.incr(key + '.total_changes')
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index 411894e..5aef453 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -10,8 +10,6 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import logging
-
 from zuul import model
 from zuul.manager import PipelineManager, StaticChangeQueueContextManager
 from zuul.manager import DynamicChangeQueueContextManager
@@ -25,7 +23,6 @@
     using the Optmistic Branch Prediction logic with Nearest Non-Failing Item
     reparenting algorithm for handling errors.
     """
-    log = logging.getLogger("zuul.DependentPipelineManager")
     changes_merge = True
 
     def __init__(self, *args, **kwargs):
diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py
index 7b0a9f5..65f5ca0 100644
--- a/zuul/manager/independent.py
+++ b/zuul/manager/independent.py
@@ -10,8 +10,6 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import logging
-
 from zuul import model
 from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
 
@@ -19,7 +17,6 @@
 class IndependentPipelineManager(PipelineManager):
     """PipelineManager that puts every Change into its own ChangeQueue."""
 
-    log = logging.getLogger("zuul.IndependentPipelineManager")
     changes_merge = False
 
     def _postConfig(self, layout):
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 8b98bfb..035d1d0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -13,10 +13,13 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+from contextlib import contextmanager
+import logging
+import os
+import shutil
+
 import git
 import gitdb
-import os
-import logging
 
 import zuul.model
 
@@ -38,22 +41,37 @@
             raise
 
 
+@contextmanager
+def timeout_handler(path):
+    try:
+        yield
+    except git.exc.GitCommandError as e:
+        if e.status == -9:
+            # Timeout.  The repo could be in a bad state, so delete it.
+            shutil.rmtree(path)
+        raise
+
+
 class ZuulReference(git.Reference):
     _common_path_default = "refs/zuul"
     _points_to_commits_only = True
 
 
 class Repo(object):
-    def __init__(self, remote, local, email, username, sshkey=None,
-                 cache_path=None, logger=None):
+    def __init__(self, remote, local, email, username, speed_limit, speed_time,
+                 sshkey=None, cache_path=None, logger=None, git_timeout=300):
         if logger is None:
             self.log = logging.getLogger("zuul.Repo")
         else:
             self.log = logger
+        self.env = {
+            'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
+            'GIT_HTTP_LOW_SPEED_TIME': speed_time,
+        }
+        self.git_timeout = git_timeout
         if sshkey:
-            self.env = {'GIT_SSH_COMMAND': 'ssh -i %s' % (sshkey,)}
-        else:
-            self.env = {}
+            self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
+
         self.remote_url = remote
         self.local_path = local
         self.email = email
@@ -62,7 +80,7 @@
         self._initialized = False
         try:
             self._ensure_cloned()
-        except:
+        except Exception:
             self.log.exception("Unable to initialize repo for %s" % remote)
 
     def _ensure_cloned(self):
@@ -75,12 +93,10 @@
             self.log.debug("Cloning from %s to %s" % (self.remote_url,
                                                       self.local_path))
             if self.cache_path:
-                git.Repo.clone_from(self.cache_path, self.local_path,
-                                    env=self.env)
+                self._git_clone(self.cache_path)
                 rewrite_url = True
             else:
-                git.Repo.clone_from(self.remote_url, self.local_path,
-                                    env=self.env)
+                self._git_clone(self.remote_url)
         repo = git.Repo(self.local_path)
         repo.git.update_environment(**self.env)
         # Create local branches corresponding to all the remote branches
@@ -104,6 +120,18 @@
     def isInitialized(self):
         return self._initialized
 
+    def _git_clone(self, url):
+        mygit = git.cmd.Git(os.getcwd())
+        mygit.update_environment(**self.env)
+        with timeout_handler(self.local_path):
+            mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
+                        kill_after_timeout=self.git_timeout)
+
+    def _git_fetch(self, repo, remote, ref=None, **kwargs):
+        with timeout_handler(self.local_path):
+            repo.git.fetch(remote, ref, kill_after_timeout=self.git_timeout,
+                           **kwargs)
+
     def createRepoObject(self):
         self._ensure_cloned()
         repo = git.Repo(self.local_path)
@@ -225,19 +253,18 @@
 
     def fetch(self, ref):
         repo = self.createRepoObject()
-        # The git.remote.fetch method may read in git progress info and
-        # interpret it improperly causing an AssertionError. Because the
-        # data was fetched properly subsequent fetches don't seem to fail.
-        # So try again if an AssertionError is caught.
-        origin = repo.remotes.origin
-        try:
-            origin.fetch(ref)
-        except AssertionError:
-            origin.fetch(ref)
+        # NOTE: The following is currently not applicable, but if we
+        # switch back to fetch methods from GitPython, we need to
+        # consider it:
+        #   The git.remote.fetch method may read in git progress info and
+        #   interpret it improperly causing an AssertionError. Because the
+        #   data was fetched properly subsequent fetches don't seem to fail.
+        #   So try again if an AssertionError is caught.
+        self._git_fetch(repo, 'origin', ref)
 
     def fetchFrom(self, repository, ref):
         repo = self.createRepoObject()
-        repo.git.fetch(repository, ref)
+        self._git_fetch(repo, repository, ref)
 
     def createZuulRef(self, ref, commit='HEAD'):
         repo = self.createRepoObject()
@@ -254,15 +281,14 @@
     def update(self):
         repo = self.createRepoObject()
         self.log.debug("Updating repository %s" % self.local_path)
-        origin = repo.remotes.origin
         if repo.git.version_info[:2] < (1, 9):
             # Before 1.9, 'git fetch --tags' did not include the
             # behavior covered by 'git --fetch', so we run both
             # commands in that case.  Starting with 1.9, 'git fetch
             # --tags' is all that is necessary.  See
             # https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
-            origin.fetch()
-        origin.fetch(tags=True)
+            self._git_fetch(repo, 'origin')
+        self._git_fetch(repo, 'origin', tags=True)
 
     def getFiles(self, files, dirs=[], branch=None, commit=None):
         ret = {}
@@ -293,7 +319,7 @@
 
 class Merger(object):
     def __init__(self, working_root, connections, email, username,
-                 cache_root=None, logger=None):
+                 speed_limit, speed_time, cache_root=None, logger=None):
         self.logger = logger
         if logger is None:
             self.log = logging.getLogger("zuul.Merger")
@@ -306,6 +332,8 @@
         self.connections = connections
         self.email = email
         self.username = username
+        self.speed_limit = speed_limit
+        self.speed_time = speed_time
         self.cache_root = cache_root
 
     def _addProject(self, hostname, project_name, url, sshkey):
@@ -318,8 +346,9 @@
                                           project_name)
             else:
                 cache_path = None
-            repo = Repo(url, path, self.email, self.username,
-                        sshkey, cache_path, self.logger)
+            repo = Repo(
+                url, path, self.email, self.username, self.speed_limit,
+                self.speed_time, sshkey, cache_path, self.logger)
 
             self.repos[key] = repo
         except Exception:
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 881209d..765d9e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -33,9 +33,13 @@
                                  '/var/lib/zuul/merger-git')
         merge_email = get_default(self.config, 'merger', 'git_user_email')
         merge_name = get_default(self.config, 'merger', 'git_user_name')
-
-        self.merger = merger.Merger(merge_root, connections, merge_email,
-                                    merge_name)
+        speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
+        self.merger = merger.Merger(
+            merge_root, connections, merge_email, merge_name, speed_limit,
+            speed_time)
 
     def start(self):
         self._running = True
diff --git a/zuul/model.py b/zuul/model.py
index eab43bb..e4b6eb5 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -23,6 +23,8 @@
 import urllib.parse
 import textwrap
 
+from zuul import change_matcher
+
 MERGER_MERGE = 1          # "git merge"
 MERGER_MERGE_RESOLVE = 2  # "git merge -s resolve"
 MERGER_CHERRY_PICK = 3    # "git cherry-pick"
@@ -44,6 +46,12 @@
     'high': PRECEDENCE_HIGH,
 }
 
+PRIORITY_MAP = {
+    PRECEDENCE_NORMAL: 200,
+    PRECEDENCE_LOW: 300,
+    PRECEDENCE_HIGH: 100,
+}
+
 # Request states
 STATE_REQUESTED = 'requested'
 STATE_PENDING = 'pending'
@@ -515,6 +523,7 @@
         self.job = job
         self.nodeset = nodeset
         self._state = STATE_REQUESTED
+        self.requested_time = time.time()
         self.state_time = time.time()
         self.stat = None
         self.uid = uuid4().hex
@@ -525,6 +534,14 @@
         self.canceled = False
 
     @property
+    def priority(self):
+        if self.build_set:
+            precedence = self.build_set.item.pipeline.precedence
+        else:
+            precedence = PRECEDENCE_NORMAL
+        return PRIORITY_MAP[precedence]
+
+    @property
     def fulfilled(self):
         return (self._state == STATE_FULFILLED) and not self.failed
 
@@ -817,6 +834,7 @@
         self.other_attributes = dict(
             name=None,
             source_context=None,
+            source_line=None,
             inheritance_path=(),
             parent_data=None,
         )
@@ -852,14 +870,16 @@
         return self.name
 
     def __repr__(self):
-        return '<Job %s branches: %s source: %s>' % (self.name,
-                                                     self.branch_matcher,
-                                                     self.source_context)
+        return '<Job %s branches: %s source: %s#%s>' % (
+            self.name,
+            self.branch_matcher,
+            self.source_context,
+            self.source_line)
 
     def __getattr__(self, name):
         v = self.__dict__.get(name)
         if v is None:
-            return copy.deepcopy(self.attributes[name])
+            return self.attributes[name]
         return v
 
     def _get(self, name):
@@ -869,6 +889,8 @@
         return Attributes(name=self.name)
 
     def setRun(self):
+        msg = 'self %s' % (repr(self),)
+        self.inheritance_path = self.inheritance_path + (msg,)
         if not self.run:
             self.run = self.implied_run
 
@@ -899,8 +921,15 @@
         if changed:
             self.roles = tuple(newroles)
 
+    def setBranchMatcher(self, branches):
+        # Set the branch matcher to match any of the supplied branches
+        matchers = []
+        for branch in branches:
+            matchers.append(change_matcher.BranchMatcher(branch))
+        self.branch_matcher = change_matcher.MatchAny(matchers)
+
     def updateVariables(self, other_vars):
-        v = self.variables
+        v = copy.deepcopy(self.variables)
         Job._deepUpdate(v, other_vars)
         self.variables = v
 
@@ -920,8 +949,8 @@
         self.variables = v
 
     def updateProjects(self, other_projects):
-        required_projects = self.required_projects
-        Job._deepUpdate(required_projects, other_projects)
+        required_projects = self.required_projects.copy()
+        required_projects.update(other_projects)
         self.required_projects = required_projects
 
     @staticmethod
@@ -948,7 +977,7 @@
         # copy all attributes
         for k in self.inheritable_attributes:
             if (other._get(k) is not None):
-                setattr(self, k, copy.deepcopy(getattr(other, k)))
+                setattr(self, k, getattr(other, k))
 
         msg = 'inherit from %s' % (repr(other),)
         self.inheritance_path = other.inheritance_path + (msg,)
@@ -1041,12 +1070,14 @@
         else:
             self.jobs[job.name] = [job]
 
-    def inheritFrom(self, other):
+    def inheritFrom(self, other, implied_branch):
         for jobname, jobs in other.jobs.items():
-            if jobname in self.jobs:
-                self.jobs[jobname].extend(jobs)
-            else:
-                self.jobs[jobname] = jobs
+            joblist = self.jobs.setdefault(jobname, [])
+            for job in jobs:
+                if not job.branch_matcher and implied_branch:
+                    job = job.copy()
+                    job.setBranchMatcher([implied_branch])
+                joblist.append(job)
 
 
 class JobGraph(object):
@@ -1145,7 +1176,6 @@
         self.start_time = None
         self.end_time = None
         self.estimated_time = None
-        self.pipeline = None
         self.canceled = False
         self.retry = False
         self.parameters = {}
@@ -1157,6 +1187,10 @@
         return ('<Build %s of %s on %s>' %
                 (self.uuid, self.job.name, self.worker))
 
+    @property
+    def pipeline(self):
+        return self.build_set.item.pipeline
+
     def getSafeAttributes(self):
         return Attributes(uuid=self.uuid,
                           result=self.result,
@@ -1245,11 +1279,9 @@
         self.item = item
         self.builds = {}
         self.result = None
-        self.next_build_set = None
-        self.previous_build_set = None
         self.uuid = None
         self.commit = None
-        self.dependent_items = None
+        self.dependent_changes = None
         self.merger_items = None
         self.unable_to_merge = False
         self.config_error = None  # None or an error message string.
@@ -1259,7 +1291,6 @@
         self.node_requests = {}  # job -> reqs
         self.files = RepoFiles()
         self.repo_state = {}
-        self.layout = None
         self.tries = {}
 
     @property
@@ -1279,18 +1310,16 @@
         # The change isn't enqueued until after it's created
         # so we don't know what the other changes ahead will be
         # until jobs start.
-        if self.dependent_items is None:
-            items = []
+        if not self.uuid:
+            self.uuid = uuid4().hex
+        if self.dependent_changes is None:
+            items = [self.item]
             next_item = self.item.item_ahead
             while next_item:
                 items.append(next_item)
                 next_item = next_item.item_ahead
-            self.dependent_items = items
-        if not self.uuid:
-            self.uuid = uuid4().hex
-        if self.merger_items is None:
-            items = [self.item] + self.dependent_items
             items.reverse()
+            self.dependent_changes = [i.change.toDict() for i in items]
             self.merger_items = [i.makeMergerItem() for i in items]
 
     def getStateName(self, state_num):
@@ -1354,7 +1383,7 @@
         item = self.item
         layout = None
         while item:
-            layout = item.current_build_set.layout
+            layout = item.layout
             if layout:
                 break
             item = item.item_ahead
@@ -1385,10 +1414,8 @@
         self.pipeline = queue.pipeline
         self.queue = queue
         self.change = change  # a ref
-        self.build_sets = []
         self.dequeued_needing_change = False
         self.current_build_set = BuildSet(self)
-        self.build_sets.append(self.current_build_set)
         self.item_ahead = None
         self.items_behind = []
         self.enqueue_time = None
@@ -1398,7 +1425,7 @@
         self.quiet = False
         self.active = False  # Whether an item is within an active window
         self.live = True  # Whether an item is intended to be processed at all
-        # TODO(jeblair): move job_graph to buildset
+        self.layout = None
         self.job_graph = None
 
     def __repr__(self):
@@ -1410,17 +1437,12 @@
             id(self), self.change, pipeline)
 
     def resetAllBuilds(self):
-        old = self.current_build_set
-        self.current_build_set.result = 'CANCELED'
         self.current_build_set = BuildSet(self)
-        old.next_build_set = self.current_build_set
-        self.current_build_set.previous_build_set = old
-        self.build_sets.append(self.current_build_set)
+        self.layout = None
         self.job_graph = None
 
     def addBuild(self, build):
         self.current_build_set.addBuild(build)
-        build.pipeline = self.pipeline
 
     def removeBuild(self, build):
         self.current_build_set.removeBuild(build)
@@ -1431,8 +1453,7 @@
     def freezeJobGraph(self):
         """Find or create actual matching jobs for this item's change and
         store the resulting job tree."""
-        layout = self.current_build_set.layout
-        job_graph = layout.createJobGraph(self)
+        job_graph = self.layout.createJobGraph(self)
         for job in job_graph.getJobs():
             # Ensure that each jobs's dependencies are fully
             # accessible.  This will raise an exception if not.
@@ -1507,6 +1528,25 @@
     def wasDequeuedNeedingChange(self):
         return self.dequeued_needing_change
 
+    def includesConfigUpdates(self):
+        includes_trusted = False
+        includes_untrusted = False
+        tenant = self.pipeline.layout.tenant
+        item = self
+        while item:
+            if item.change.updatesConfig():
+                (trusted, project) = tenant.getProject(
+                    item.change.project.canonical_name)
+                if trusted:
+                    includes_trusted = True
+                else:
+                    includes_untrusted = True
+            if includes_trusted and includes_untrusted:
+                # We're done early
+                return (includes_trusted, includes_untrusted)
+            item = item.item_ahead
+        return (includes_trusted, includes_untrusted)
+
     def isHoldingFollowingChanges(self):
         if not self.live:
             return False
@@ -1947,6 +1987,18 @@
                           oldrev=self.oldrev,
                           newrev=self.newrev)
 
+    def toDict(self):
+        # Render to a dict to use in passing json to the executor
+        d = dict()
+        d['project'] = dict(
+            name=self.project.name,
+            short_name=self.project.name.split('/')[-1],
+            canonical_hostname=self.project.canonical_hostname,
+            canonical_name=self.project.canonical_name,
+            src_dir=os.path.join('src', self.project.canonical_name),
+        )
+        return d
+
 
 class Branch(Ref):
     """An existing branch state for a Project."""
@@ -1954,6 +2006,12 @@
         super(Branch, self).__init__(project)
         self.branch = None
 
+    def toDict(self):
+        # Render to a dict to use in passing json to the executor
+        d = super(Branch, self).toDict()
+        d['branch'] = self.branch
+        return d
+
 
 class Tag(Ref):
     """An existing tag state for a Project."""
@@ -2016,6 +2074,14 @@
                           number=self.number,
                           patchset=self.patchset)
 
+    def toDict(self):
+        # Render to a dict to use in passing json to the executor
+        d = super(Change, self).toDict()
+        d['change'] = str(self.number)
+        d['change_url'] = self.url
+        d['patchset'] = str(self.patchset)
+        return d
+
 
 class TriggerEvent(object):
     """Incoming event from an external system."""
@@ -2117,6 +2183,7 @@
     def __init__(self, name):
         self.name = name
         self.merge_mode = None
+        # The default branch for the project (usually master).
         self.default_branch = None
         self.pipelines = {}
         self.private_key_file = None
@@ -2296,6 +2363,7 @@
     """Holds all of the Pipelines."""
 
     def __init__(self, tenant):
+        self.uuid = uuid4().hex
         self.tenant = tenant
         self.project_configs = {}
         self.project_templates = {}
@@ -2374,6 +2442,11 @@
         self.pipelines[pipeline.name] = pipeline
 
     def addProjectTemplate(self, project_template):
+        if project_template.name in self.project_templates:
+            # TODO(jeblair): issue a warning to the logs on loading
+            # the config, and an error when this hits in a proposed
+            # change.
+            return
         self.project_templates[project_template.name] = project_template
 
     def addProjectConfig(self, project_config):
@@ -2517,14 +2590,14 @@
 
     @staticmethod
     def _max_count(item, semaphore_name):
-        if not item.current_build_set.layout:
+        if not item.layout:
             # This should not occur as the layout of the item must already be
             # built when acquiring or releasing a semaphore for a job.
             raise Exception("Item {} has no layout".format(item))
 
         # find the right semaphore
         default_semaphore = Semaphore(semaphore_name, 1)
-        semaphores = item.current_build_set.layout.semaphores
+        semaphores = item.layout.semaphores
         return semaphores.get(semaphore_name, default_semaphore).max
 
 
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index dc855cd..7dafca0 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -22,6 +22,35 @@
         self.requests = {}
         self.sched = scheduler
 
+    def emitStats(self, request):
+        if not self.sched.statsd:
+            return
+        statsd = self.sched.statsd
+        # counter zuul.nodepool.requested
+        # counter zuul.nodepool.requested.label.<label>
+        # counter zuul.nodepool.requested.size.<size>
+        # gauge zuul.nodepool.current_requests
+        state = request.state
+        if request.canceled:
+            state = 'canceled'
+            dt = None
+        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+            dt = int((request.state_time - request.requested_time) * 1000)
+        else:
+            dt = None
+        key = 'zuul.nodepool.%s' % state
+        statsd.incr(key)
+        if dt:
+            statsd.timing(key, dt)
+        for node in request.nodeset.getNodes():
+            statsd.incr(key + '.label.%s' % node.label)
+            if dt:
+                statsd.timing(key + '.label.%s' % node.label, dt)
+        statsd.incr(key + '.size.%s' % len(request.nodeset.nodes))
+        if dt:
+            statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
+        statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
+
     def requestNodes(self, build_set, job):
         # Create a copy of the nodeset to represent the actual nodes
         # returned by nodepool.
@@ -33,6 +62,7 @@
             self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
             # Logged after submission so that we have the request id
             self.log.info("Submited node request %s" % (req,))
+            self.emitStats(req)
         else:
             self.log.info("Fulfilling empty node request %s" % (req,))
             req.state = model.STATE_FULFILLED
@@ -90,10 +120,15 @@
         self.log.info("Returning nodeset %s" % (nodeset,))
         for node in nodeset.getNodes():
             if node.lock is None:
-                raise Exception("Node %s is not locked" % (node,))
-            if node.state == model.STATE_IN_USE:
-                node.state = model.STATE_USED
-                self.sched.zk.storeNode(node)
+                self.log.error("Node %s is not locked" % (node,))
+            else:
+                try:
+                    if node.state == model.STATE_IN_USE:
+                        node.state = model.STATE_USED
+                        self.sched.zk.storeNode(node)
+                except Exception:
+                    self.log.exception("Exception storing node %s "
+                                       "while unlocking:" % (node,))
         self._unlockNodes(nodeset.getNodes())
 
     def unlockNodeSet(self, nodeset):
@@ -106,18 +141,21 @@
             except Exception:
                 self.log.exception("Error unlocking node:")
 
-    def lockNodeSet(self, nodeset):
-        self._lockNodes(nodeset.getNodes())
+    def lockNodeSet(self, nodeset, request_id):
+        self._lockNodes(nodeset.getNodes(), request_id)
 
-    def _lockNodes(self, nodes):
+    def _lockNodes(self, nodes, request_id):
         # Try to lock all of the supplied nodes.  If any lock fails,
         # try to unlock any which have already been locked before
         # re-raising the error.
         locked_nodes = []
         try:
             for node in nodes:
+                if node.allocated_to != request_id:
+                    raise Exception("Node %s allocated to %s, not %s" %
+                                    (node.id, node.allocated_to, request_id))
                 self.log.debug("Locking node %s" % (node,))
-                self.sched.zk.lockNode(node)
+                self.sched.zk.lockNode(node, timeout=30)
                 locked_nodes.append(node)
         except Exception:
             self.log.exception("Error locking nodes:")
@@ -134,29 +172,48 @@
 
         if request.canceled:
             del self.requests[request.uid]
+            self.emitStats(request)
             return False
 
-        if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+        # TODOv3(jeblair): handle allocation failure
+        if deleted:
+            self.log.debug("Resubmitting lost node request %s" % (request,))
+            request.id = None
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
             self.log.info("Node request %s %s" % (request, request.state))
 
             # Give our results to the scheduler.
             self.sched.onNodesProvisioned(request)
             del self.requests[request.uid]
 
+            self.emitStats(request)
+
             # Stop watching this request node.
             return False
-        # TODOv3(jeblair): handle allocation failure
-        elif deleted:
-            self.log.debug("Resubmitting lost node request %s" % (request,))
-            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
         return True
 
-    def acceptNodes(self, request):
+    def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
         # nodes for (potential) use.
 
         self.log.info("Accepting node request %s" % (request,))
 
+        if request_id != request.id:
+            self.log.info("Skipping node accept for %s (resubmitted as %s)",
+                          request_id, request.id)
+            return
+
+        # Make sure the request still exists. It's possible it could have
+        # disappeared if we lost the ZK session between when the fulfillment
+        # response was added to our queue, and when we actually get around to
+        # processing it. Nodepool will automatically reallocate the assigned
+        # nodes in that situation.
+        if not self.sched.zk.nodeRequestExists(request):
+            self.log.info("Request %s no longer exists", request.id)
+            return
+
         if request.canceled:
             self.log.info("Ignoring canceled node request %s" % (request,))
             # The request was already deleted when it was canceled
@@ -166,7 +223,7 @@
         if request.fulfilled:
             # If the request suceeded, try to lock the nodes.
             try:
-                self.lockNodeSet(request.nodeset)
+                self.lockNodeSet(request.nodeset, request.id)
                 locked = True
             except Exception:
                 self.log.exception("Error locking nodes:")
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 1a0a084..8f2e5dc 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -56,6 +56,14 @@
                 'count': count}
         return not self.submitJob('zuul:autohold', data).failure
 
+    def autohold_list(self):
+        data = {}
+        job = self.submitJob('zuul:autohold_list', data)
+        if job.failure:
+            return False
+        else:
+            return json.loads(job.data[0])
+
     def enqueue(self, tenant, pipeline, project, trigger, change):
         data = {'tenant': tenant,
                 'pipeline': pipeline,
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 52a7e51..11d6684 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -50,6 +50,7 @@
 
     def register(self):
         self.worker.registerFunction("zuul:autohold")
+        self.worker.registerFunction("zuul:autohold_list")
         self.worker.registerFunction("zuul:enqueue")
         self.worker.registerFunction("zuul:enqueue_ref")
         self.worker.registerFunction("zuul:promote")
@@ -90,6 +91,17 @@
             except Exception:
                 self.log.exception("Exception while getting job")
 
+    def handle_autohold_list(self, job):
+        req = {}
+
+        # The json.dumps() call cannot handle dict keys that are not strings
+        # so we convert our key to a CSV string that the caller can parse.
+        for key, value in self.sched.autohold_requests.items():
+            new_key = ','.join(key)
+            req[new_key] = value
+
+        job.sendWorkComplete(json.dumps(req))
+
     def handle_autohold(self, job):
         args = json.loads(job.arguments)
         params = {}
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 5432661..33b6723 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -15,7 +15,6 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import extras
 import json
 import logging
 import os
@@ -31,6 +30,8 @@
 from zuul import exceptions
 from zuul import version as zuul_version
 from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd
+import zuul.lib.queue
 
 
 class ManagementEvent(object):
@@ -71,10 +72,28 @@
     the path specified in the configuration.
 
     :arg Tenant tenant: the tenant to reconfigure
+    :arg Project project: if supplied, clear the cached configuration
+         from this project first
     """
-    def __init__(self, tenant):
+    def __init__(self, tenant, project):
         super(TenantReconfigureEvent, self).__init__()
-        self.tenant = tenant
+        self.tenant_name = tenant.name
+        self.projects = set([project])
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __eq__(self, other):
+        if not isinstance(other, TenantReconfigureEvent):
+            return False
+        # We don't check projects because they will get combined when
+        # merged.
+        return (self.tenant_name == other.tenant_name)
+
+    def merge(self, other):
+        if self.tenant_name != other.tenant_name:
+            raise Exception("Can not merge events from different tenants")
+        self.projects |= other.projects
 
 
 class PromoteEvent(ManagementEvent):
@@ -161,6 +180,7 @@
 
     def __init__(self, request):
         self.request = request
+        self.request_id = request.id
 
 
 def toList(item):
@@ -207,7 +227,7 @@
         self.executor = None
         self.merger = None
         self.connections = None
-        self.statsd = extras.try_import('statsd.statsd')
+        self.statsd = get_statsd(config)
         # TODO(jeblair): fix this
         # Despite triggers being part of the pipeline, there is one trigger set
         # per scheduler. The pipeline handles the trigger filters but since
@@ -219,7 +239,7 @@
 
         self.trigger_event_queue = queue.Queue()
         self.result_event_queue = queue.Queue()
-        self.management_event_queue = queue.Queue()
+        self.management_event_queue = zuul.lib.queue.MergedQueue()
         self.abide = model.Abide()
 
         if not testonly:
@@ -259,22 +279,16 @@
         self.zk = zk
 
     def addEvent(self, event):
-        self.log.debug("Adding trigger event: %s" % event)
         self.trigger_event_queue.put(event)
         self.wake_event.set()
-        self.log.debug("Done adding trigger event: %s" % event)
 
     def onBuildStarted(self, build):
-        self.log.debug("Adding start event for build: %s" % build)
         build.start_time = time.time()
         event = BuildStartedEvent(build)
         self.result_event_queue.put(event)
         self.wake_event.set()
-        self.log.debug("Done adding start event for build: %s" % build)
 
     def onBuildCompleted(self, build, result, result_data):
-        self.log.debug("Adding complete event for build: %s result: %s" % (
-            build, result))
         build.end_time = time.time()
         build.result_data = result_data
         # Note, as soon as the result is set, other threads may act
@@ -284,61 +298,60 @@
         build.result = result
         try:
             if self.statsd and build.pipeline:
-                jobname = build.job.name.replace('.', '_')
-                key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+                tenant = build.pipeline.layout.tenant
+                jobname = build.job.name.replace('.', '_').replace('/', '_')
+                hostname = (build.build_set.item.change.project.
+                            canonical_hostname.replace('.', '_'))
+                projectname = (build.build_set.item.change.project.name.
+                               replace('.', '_').replace('/', '_'))
+                branchname = (build.build_set.item.change.branch.
+                              replace('.', '_').replace('/', '_'))
+                basekey = 'zuul.tenant.%s' % tenant.name
+                pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
+                # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
+                key = '%s.all_jobs' % pipekey
                 self.statsd.incr(key)
-                for label in build.node_labels:
-                    # Jenkins includes the node name in its list of labels, so
-                    # we filter it out here, since that is not statistically
-                    # interesting.
-                    if label == build.node_name:
-                        continue
-                    dt = int((build.start_time - build.execute_time) * 1000)
-                    key = 'zuul.pipeline.%s.label.%s.wait_time' % (
-                        build.pipeline.name, label)
-                    self.statsd.timing(key, dt)
-                key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
-                                                      jobname, build.result)
+                jobkey = '%s.project.%s.%s.%s.job.%s' % (
+                    pipekey, hostname, projectname, branchname, jobname)
+                # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+                #   <host>.<project>.<branch>.job.<job>.<result>
+                key = '%s.%s' % (jobkey, build.result)
                 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
                     dt = int((build.end_time - build.start_time) * 1000)
                     self.statsd.timing(key, dt)
                 self.statsd.incr(key)
-
-                key = 'zuul.pipeline.%s.job.%s.wait_time' % (
-                    build.pipeline.name, jobname)
+                # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+                #  <host>.<project>.<branch>.job.<job>.wait_time
+                key = '%s.wait_time' % jobkey
                 dt = int((build.start_time - build.execute_time) * 1000)
                 self.statsd.timing(key, dt)
-        except:
+        except Exception:
             self.log.exception("Exception reporting runtime stats")
         event = BuildCompletedEvent(build)
         self.result_event_queue.put(event)
         self.wake_event.set()
-        self.log.debug("Done adding complete event for build: %s" % build)
 
     def onMergeCompleted(self, build_set, merged, updated,
                          commit, files, repo_state):
-        self.log.debug("Adding merge complete event for build set: %s" %
-                       build_set)
         event = MergeCompletedEvent(build_set, merged,
                                     updated, commit, files, repo_state)
         self.result_event_queue.put(event)
         self.wake_event.set()
 
     def onNodesProvisioned(self, req):
-        self.log.debug("Adding nodes provisioned event for build set: %s" %
-                       req.build_set)
         event = NodesProvisionedEvent(req)
         self.result_event_queue.put(event)
         self.wake_event.set()
 
-    def reconfigureTenant(self, tenant):
-        self.log.debug("Prepare to reconfigure")
-        event = TenantReconfigureEvent(tenant)
+    def reconfigureTenant(self, tenant, project):
+        self.log.debug("Submitting tenant reconfiguration event for "
+                       "%s due to project %s", tenant.name, project)
+        event = TenantReconfigureEvent(tenant, project)
         self.management_event_queue.put(event)
         self.wake_event.set()
 
     def reconfigure(self, config):
-        self.log.debug("Prepare to reconfigure")
+        self.log.debug("Submitting reconfiguration event")
         event = ReconfigureEvent(config)
         self.management_event_queue.put(event)
         self.wake_event.set()
@@ -457,7 +470,7 @@
         self.layout_lock.acquire()
         self.config = event.config
         try:
-            self.log.debug("Performing reconfiguration")
+            self.log.debug("Full reconfiguration beginning")
             loader = configloader.ConfigLoader()
             abide = loader.loadConfig(
                 self.config.get('scheduler', 'tenant_config'),
@@ -468,24 +481,31 @@
             self.abide = abide
         finally:
             self.layout_lock.release()
+        self.log.debug("Full reconfiguration complete")
 
     def _doTenantReconfigureEvent(self, event):
         # This is called in the scheduler loop after another thread submits
         # a request
         self.layout_lock.acquire()
         try:
-            self.log.debug("Performing tenant reconfiguration")
+            self.log.debug("Tenant reconfiguration beginning")
+            # If a change landed to a project, clear out the cached
+            # config before reconfiguring.
+            for project in event.projects:
+                project.unparsed_config = None
+            old_tenant = self.abide.tenants[event.tenant_name]
             loader = configloader.ConfigLoader()
             abide = loader.reloadTenant(
                 self.config.get('scheduler', 'tenant_config'),
                 self._get_project_key_dir(),
                 self, self.merger, self.connections,
-                self.abide, event.tenant)
-            tenant = abide.tenants[event.tenant.name]
+                self.abide, old_tenant)
+            tenant = abide.tenants[event.tenant_name]
             self._reconfigureTenant(tenant)
             self.abide = abide
         finally:
             self.layout_lock.release()
+        self.log.debug("Tenant reconfiguration complete")
 
     def _reenqueueGetProject(self, tenant, item):
         project = item.change.project
@@ -769,8 +789,7 @@
                     # or a branch was just created or deleted.  Clear
                     # out cached data for this project and perform a
                     # reconfiguration.
-                    change.project.unparsed_config = None
-                    self.reconfigureTenant(tenant)
+                    self.reconfigureTenant(tenant, change.project)
                 for pipeline in tenant.layout.pipelines.values():
                     if event.isPatchsetCreated():
                         pipeline.manager.removeOldVersionsOfChange(change)
@@ -853,8 +872,12 @@
             try:
                 self.nodepool.holdNodeSet(nodeset, autohold_key)
             except Exception:
-                self.log.exception("Unable to process autohold for %s",
+                self.log.exception("Unable to process autohold for %s:",
                                    autohold_key)
+                if autohold_key in self.autohold_requests:
+                    self.log.debug("Removing autohold %s due to exception",
+                                   autohold_key)
+                    del self.autohold_requests[autohold_key]
 
             self.nodepool.returnNodeSet(nodeset)
         except Exception:
@@ -891,9 +914,10 @@
 
     def _doNodesProvisionedEvent(self, event):
         request = event.request
+        request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request_id)
         if request.canceled:
             return
 
@@ -932,6 +956,9 @@
         data['result_event_queue'] = {}
         data['result_event_queue']['length'] = \
             self.result_event_queue.qsize()
+        data['management_event_queue'] = {}
+        data['management_event_queue']['length'] = \
+            self.management_event_queue.qsize()
 
         if self.last_reconfigured:
             data['last_reconfigured'] = self.last_reconfigured * 1000
diff --git a/zuul/zk.py b/zuul/zk.py
index 5ea4e56..ede78be 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -90,7 +90,7 @@
     def resetLostFlag(self):
         self._became_lost = False
 
-    def connect(self, hosts, read_only=False):
+    def connect(self, hosts, read_only=False, timeout=10.0):
         '''
         Establish a connection with ZooKeeper cluster.
 
@@ -100,10 +100,12 @@
         :param str hosts: Comma-separated list of hosts to connect to (e.g.
             127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
         :param bool read_only: If True, establishes a read-only connection.
-
+        :param float timeout: The ZooKeeper session timeout, in
+            seconds (default: 10.0).
         '''
         if self.client is None:
-            self.client = KazooClient(hosts=hosts, read_only=read_only)
+            self.client = KazooClient(hosts=hosts, read_only=read_only,
+                                      timeout=timeout)
             self.client.add_listener(self._connection_listener)
             self.client.start()
 
@@ -145,12 +147,10 @@
             from ZooKeeper).  The watcher should return False when
             further updates are no longer necessary.
         '''
-        priority = 100  # TODO(jeblair): integrate into nodereq
-
         data = node_request.toDict()
         data['created_time'] = time.time()
 
-        path = '%s/%s-' % (self.REQUEST_ROOT, priority)
+        path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
         path = self.client.create(path, self._dictToStr(data),
                                   makepath=True,
                                   sequence=True, ephemeral=True)
@@ -160,7 +160,6 @@
         def callback(data, stat):
             if data:
                 data = self._strToDict(data)
-                node_request.updateFromDict(data)
                 request_nodes = list(node_request.nodeset.getNodes())
                 for i, nodeid in enumerate(data.get('nodes', [])):
                     node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
                     node_data = self._strToDict(node_data)
                     request_nodes[i].id = nodeid
                     request_nodes[i].updateFromDict(node_data)
+                node_request.updateFromDict(data)
             deleted = (data is None)  # data *are* none
             return watcher(node_request, deleted)
 
@@ -187,6 +187,19 @@
         except kze.NoNodeError:
             pass
 
+    def nodeRequestExists(self, node_request):
+        '''
+        See if a NodeRequest exists in ZooKeeper.
+
+        :param NodeRequest node_request: A NodeRequest to verify.
+
+        :returns: True if the request exists, False otherwise.
+        '''
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        if self.client.exists(path):
+            return True
+        return False
+
     def storeNode(self, node):
         '''Store the node.
 
@@ -256,6 +269,9 @@
         for nodeid in nodes:
             node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
             node_data, node_stat = self.client.get(node_path)
+            if not node_data:
+                self.log.warning("Node ID %s has no data", nodeid)
+                continue
             node_data = self._strToDict(node_data)
             if (node_data['state'] == zuul.model.STATE_HOLD and
                     node_data.get('hold_job') == identifier):