Merge "Fix doc typo that missed important words" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index ff1a523..c081235 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -45,8 +45,7 @@
     name: openstack-infra/zuul
     infra-check:
       jobs:
-        - build-openstack-infra-sphinx-docs:
-            success-url: 'html/feature/zuulv3/'
+        - build-openstack-sphinx-docs:
             irrelevant-files:
               - zuul/cmd/migrate.py
               - playbooks/zuul-migrate/.*
@@ -67,8 +66,10 @@
               - playbooks/zuul-migrate/.*
     infra-gate:
       jobs:
-        - build-openstack-infra-sphinx-docs:
-            success-url: 'html/feature/zuulv3/'
+        - build-openstack-sphinx-docs:
+            irrelevant-files:
+              - zuul/cmd/migrate.py
+              - playbooks/zuul-migrate/.*
         - tox-pep8
         - tox-py35:
             irrelevant-files:
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 5e7e0e1..464cb60 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -265,6 +265,22 @@
 
       Directory in which Zuul should clone git repositories.
 
+   .. attr:: git_http_low_speed_limit
+      :default: 1000
+
+      If the HTTP transfer speed is less then git_http_low_speed_limit for
+      longer then git_http_low_speed_time, the transfer is aborted.
+
+      Value in bytes, setting to 0 will disable.
+
+   .. attr:: git_http_low_speed_time
+      :default: 30
+
+      If the HTTP transfer speed is less then git_http_low_speed_limit for
+      longer then git_http_low_speed_time, the transfer is aborted.
+
+      Value in seconds, setting to 0 will disable.
+
    .. attr:: git_user_email
 
       Value to pass to `git config user.email
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 4fed1f9..a8b2324 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -33,17 +33,13 @@
 
 These metrics are emitted by the Zuul :ref:`scheduler`:
 
-.. stat:: gerrit.event.<type>
+.. stat:: zuul.event.<driver>.event.<type>
    :type: counter
 
-   Gerrit emits different kinds of messages over its `stream-events`
-   interface.  Zuul will report counters for each type of event it
-   receives from Gerrit.
+   Zuul will report counters for each type of event it receives from
+   each of its configured drivers.
 
-   Refer to your Gerrit installation documentation for a complete
-   list of Gerrit event types.
-
-.. stat:: zuul.pipeline
+.. stat:: zuul.<tenant>.pipeline
 
    Holds metrics specific to jobs. This hierarchy includes:
 
@@ -63,22 +59,60 @@
          The number of items currently being processed by this
          pipeline.
 
-      .. stat:: job
+      .. stat:: project
 
-         Subtree detailing per jobs statistics:
+         This hierarchy holds more specific metrics for each project
+         participating in the pipeline.
 
-         .. stat:: <jobname>
+         .. stat:: <canonical_hostname>
 
-            The triggered job name.
+            The canonical hostname for the triggering project.
+            Embedded ``.`` characters will be translated to ``_``.
 
-            .. stat:: <result>
-               :type: counter, timer
+            .. stat:: <project>
 
-               A counter for each type of result (e.g., ``SUCCESS`` or
-               ``FAILURE``, ``ERROR``, etc.) for the job.  If the
-               result is ``SUCCESS`` or ``FAILURE``, Zuul will
-               additionally report the duration of the build as a
-               timer.
+               The name of the triggering project.  Embedded ``/`` or
+               ``.`` characters will be translated to ``_``.
+
+               .. stat:: <branch>
+
+                  The name of the triggering branch.  Embedded ``/`` or
+                  ``.`` characters will be translated to ``_``.
+
+                  .. stat:: job
+
+                     Subtree detailing per-project job statistics:
+
+                     .. stat:: <jobname>
+
+                        The triggered job name.
+
+                        .. stat:: <result>
+                           :type: counter, timer
+
+                           A counter for each type of result (e.g., ``SUCCESS`` or
+                           ``FAILURE``, ``ERROR``, etc.) for the job.  If the
+                           result is ``SUCCESS`` or ``FAILURE``, Zuul will
+                           additionally report the duration of the build as a
+                           timer.
+
+                  .. stat:: current_changes
+                     :type: gauge
+
+                     The number of items of this project currently being
+                     processed by this pipeline.
+
+                  .. stat:: resident_time
+                     :type: timer
+
+                     A timer metric reporting how long each item for this
+                     project has been in the pipeline.
+
+                  .. stat:: total_changes
+                     :type: counter
+
+                     The number of changes for this project processed by the
+                     pipeline since Zuul started.
 
       .. stat:: resident_time
          :type: timer
@@ -98,34 +132,12 @@
          How long each item spent in the pipeline before its first job
          started.
 
-      .. stat:: <project>
 
-         This hierarchy holds more specific metrics for each project
-         participating in the pipeline.  If the project name contains
-         a ``/`` character, it will be replaced with a ``.``.
+As an example, given a job named `myjob` in `mytenant` triggered by a
+change to `myproject` on the `master` branch in the `gate` pipeline
+which took 40 seconds to build, the Zuul scheduler will emit the
+following statsd events:
 
-         .. stat:: current_changes
-            :type: gauge
-
-            The number of items of this project currently being
-            processed by this pipeline.
-
-         .. stat:: resident_time
-            :type: timer
-
-            A timer metric reporting how long each item for this
-            project has been in the pipeline.
-
-         .. stat:: total_changes
-            :type: counter
-
-            The number of changes for this project processed by the
-            pipeline since Zuul started.
-
-As an example, given a job named `myjob` triggered by the `gate` pipeline
-which took 40 seconds to build, the Zuul scheduler will emit the following
-statsd events:
-
-  * ``zuul.pipeline.gate.job.myjob.SUCCESS`` +1
-  * ``zuul.pipeline.gate.job.myjob``  40 seconds
-  * ``zuul.pipeline.gate.all_jobs`` +1
+  * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS`` +1
+  * ``zuul.tenant.mytenant.pipeline.gate.project.example_com.myproject.master.job.myjob.SUCCESS``  40 seconds
+  * ``zuul.tenant.mytenant.pipeline.gate.all_jobs`` +1
diff --git a/requirements.txt b/requirements.txt
index cdffda2..b718827 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,7 +7,11 @@
 Paste
 WebOb>=1.2.3
 paramiko>=1.8.0,<2.0.0
-GitPython>=0.3.3,<2.1.2
+# Using a local fork of gitpython until at least these changes are in a
+# release.
+# https://github.com/gitpython-developers/GitPython/pull/682
+# https://github.com/gitpython-developers/GitPython/pull/686
+-e git+https://github.com/jeblair/GitPython.git@zuul#egg=GitPython
 python-daemon>=2.0.4,<2.1.0
 extras
 statsd>=1.0.0,<3.0
diff --git a/tests/base.py b/tests/base.py
index 5841c08..dacb1ef 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1570,7 +1570,7 @@
                 parameters = json.loads(job.arguments.decode('utf8'))
                 if not regex or re.match(regex, parameters.get('job')):
                     match = True
-            if job.name == b'merger:merge':
+            if job.name.startswith(b'merger:'):
                 if not regex:
                     match = True
             if match:
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
new file mode 100644
index 0000000..bd46718
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output-failure-post.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+  tasks:
+    - shell: echo "Failure test {{ zuul.executor.src_root }}"
+    - shell: exit 1
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index a83f0bc..f182d8d 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -20,8 +20,19 @@
     parent: base
     name: job-output
 
+- job:
+    name: job-output-failure
+    run: playbooks/job-output
+    post-run: playbooks/job-output-failure-post
+
 - project:
     name: org/project
     check:
       jobs:
         - job-output
+
+- project:
+    name: org/project2
+    check:
+      jobs:
+        - job-output-failure
diff --git a/tests/fixtures/config/job-output/git/org_project2/README b/tests/fixtures/config/job-output/git/org_project2/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project2/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
index 208e274..14b382f 100644
--- a/tests/fixtures/config/job-output/main.yaml
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -6,3 +6,4 @@
           - common-config
         untrusted-projects:
           - org/project
+          - org/project2
diff --git a/tests/fixtures/fake_git.sh b/tests/fixtures/fake_git.sh
new file mode 100755
index 0000000..5b787b7
--- /dev/null
+++ b/tests/fixtures/fake_git.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+
+echo $*
+case "$1" in
+    clone)
+        dest=$3
+        mkdir -p $dest/.git
+        ;;
+    version)
+        echo "git version 1.0.0"
+        exit 0
+        ;;
+esac
+sleep 30
diff --git a/tests/fixtures/layouts/delayed-repo-init.yaml b/tests/fixtures/layouts/delayed-repo-init.yaml
index e97d37a..c89e2fa 100644
--- a/tests/fixtures/layouts/delayed-repo-init.yaml
+++ b/tests/fixtures/layouts/delayed-repo-init.yaml
@@ -67,7 +67,7 @@
             dependencies: project-merge
     gate:
       jobs:
-        - project-merge:
+        - project-merge
         - project-test1:
             dependencies: project-merge
         - project-test2:
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 8aafabf..ec30a2b 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -19,9 +19,10 @@
 import os
 
 import git
+import testtools
 
 from zuul.merger.merger import Repo
-from tests.base import ZuulTestCase
+from tests.base import ZuulTestCase, FIXTURE_DIR
 
 
 class TestMergerRepo(ZuulTestCase):
@@ -49,7 +50,7 @@
             msg='.git file in submodule should be a file')
 
         work_repo = Repo(parent_path, self.workspace_root,
-                         'none@example.org', 'User Name')
+                         'none@example.org', 'User Name', '0', '0')
         self.assertTrue(
             os.path.isdir(os.path.join(self.workspace_root, 'subdir')),
             msg='Cloned repository has a submodule placeholder directory')
@@ -60,7 +61,7 @@
         sub_repo = Repo(
             os.path.join(self.upstream_root, 'org/project2'),
             os.path.join(self.workspace_root, 'subdir'),
-            'none@example.org', 'User Name')
+            'none@example.org', 'User Name', '0', '0')
         self.assertTrue(os.path.exists(
             os.path.join(self.workspace_root, 'subdir', '.git')),
             msg='Cloned over the submodule placeholder')
@@ -74,3 +75,28 @@
             os.path.join(self.upstream_root, 'org/project2'),
             sub_repo.createRepoObject().remotes[0].url,
             message="Sub repository points to upstream project2")
+
+    def test_clone_timeout(self):
+        parent_path = os.path.join(self.upstream_root, 'org/project1')
+        self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+                   os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+        work_repo = Repo(parent_path, self.workspace_root,
+                         'none@example.org', 'User Name', '0', '0',
+                         git_timeout=0.001)
+        # TODO: have the merger and repo classes catch fewer
+        # exceptions, including this one on initialization.  For the
+        # test, we try cloning again.
+        with testtools.ExpectedException(git.exc.GitCommandError,
+                                         '.*exit code\(-9\)'):
+            work_repo._ensure_cloned()
+
+    def test_fetch_timeout(self):
+        parent_path = os.path.join(self.upstream_root, 'org/project1')
+        work_repo = Repo(parent_path, self.workspace_root,
+                         'none@example.org', 'User Name', '0', '0')
+        work_repo.git_timeout = 0.001
+        self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
+                   os.path.join(FIXTURE_DIR, 'fake_git.sh'))
+        with testtools.ExpectedException(git.exc.GitCommandError,
+                                         '.*exit code\(-9\)'):
+            work_repo.update()
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index ba7523c..d51898b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -76,7 +76,7 @@
         self.assertEqual(request.state, 'fulfilled')
 
         # Accept the nodes
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request.id)
         nodeset = request.nodeset
 
         for node in nodeset.getNodes():
@@ -125,3 +125,47 @@
 
         self.waitForRequests()
         self.assertEqual(len(self.provisioned_requests), 0)
+
+    def test_accept_nodes_resubmitted(self):
+        # Test that a resubmitted request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        # Accept the nodes, passing a different ID
+        self.nodepool.acceptNodes(request, "invalid")
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
+
+    def test_accept_nodes_lost_request(self):
+        # Test that a lost request would not lock nodes
+
+        nodeset = model.NodeSet()
+        nodeset.addNode(model.Node('controller', 'ubuntu-xenial'))
+        nodeset.addNode(model.Node('compute', 'ubuntu-xenial'))
+        job = model.Job('testjob')
+        job.nodeset = nodeset
+        request = self.nodepool.requestNodes(None, job)
+        self.waitForRequests()
+        self.assertEqual(len(self.provisioned_requests), 1)
+        self.assertEqual(request.state, 'fulfilled')
+
+        self.zk.deleteNodeRequest(request)
+
+        # Accept the nodes
+        self.nodepool.acceptNodes(request, request.id)
+        nodeset = request.nodeset
+
+        for node in nodeset.getNodes():
+            self.assertIsNone(node.lock)
+            self.assertEqual(node.state, 'ready')
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 65a37ff..3203960 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -89,25 +89,34 @@
         self.assertEqual(self.getJobFromHistory('project-test2').node,
                          'label1')
 
+        for stat in self.statsd.stats:
+            k, v = stat.decode('utf-8').split(':')
+            self.log.debug('stat %s:%s', k, v)
         # TODOv3(jeblair): we may want to report stats by tenant (also?).
         # Per-driver
         self.assertReportedStat('zuul.event.gerrit.comment-added', value='1|c')
         # Per-driver per-connection
         self.assertReportedStat('zuul.event.gerrit.gerrit.comment-added',
                                 value='1|c')
-        self.assertReportedStat('zuul.pipeline.gate.current_changes',
-                                value='1|g')
-        self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
-                                kind='ms')
-        self.assertReportedStat('zuul.pipeline.gate.job.project-merge.SUCCESS',
-                                value='1|c')
-        self.assertReportedStat('zuul.pipeline.gate.resident_time', kind='ms')
-        self.assertReportedStat('zuul.pipeline.gate.total_changes',
-                                value='1|c')
         self.assertReportedStat(
-            'zuul.pipeline.gate.org.project.resident_time', kind='ms')
+            'zuul.tenant.tenant-one.pipeline.gate.current_changes',
+            value='1|g')
         self.assertReportedStat(
-            'zuul.pipeline.gate.org.project.total_changes', value='1|c')
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.job.project-merge.SUCCESS', kind='ms')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.job.project-merge.SUCCESS', value='1|c')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.resident_time', kind='ms')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.total_changes', value='1|c')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.resident_time', kind='ms')
+        self.assertReportedStat(
+            'zuul.tenant.tenant-one.pipeline.gate.project.review_example_com.'
+            'org_project.master.total_changes', value='1|c')
 
         for build in self.history:
             self.assertTrue(build.parameters['zuul']['voting'])
@@ -4596,6 +4605,50 @@
         self.assertEqual(B.data['status'], 'MERGED')
         self.assertEqual(B.reported, 2)
 
+    def test_job_aborted(self):
+        "Test that if a execute server aborts a job, it is run again"
+        self.executor_server.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.executor_server.release('.*-merge')
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.builds), 2)
+
+        # first abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        # second abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        # third abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        # fourth abort
+        self.builds[0].aborted = True
+        self.executor_server.release('.*-test*')
+        self.waitUntilSettled()
+        self.assertEqual(len(self.builds), 1)
+
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(len(self.history), 7)
+        self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 4)
+        self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
+
     def test_rerun_on_abort(self):
         "Test that if a execute server fails to run a job, it is run again"
 
@@ -4715,6 +4768,28 @@
         build = self.getJobFromHistory('py27')
         self.assertEqual(build.parameters['zuul']['jobtags'], [])
 
+    def test_pending_merge_in_reconfig(self):
+        # Test that if we are waiting for an outstanding merge on
+        # reconfiguration that we continue to do so.
+        self.gearman_server.hold_merge_jobs_in_queue = True
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+        A.setMerged()
+        self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
+        self.waitUntilSettled()
+        # Reconfigure while we still have an outstanding merge job
+        self.sched.reconfigureTenant(self.sched.abide.tenants['tenant-one'],
+                                     None)
+        self.waitUntilSettled()
+        # Verify the merge job is still running and that the item is
+        # in the pipeline
+        self.assertEqual(len(self.sched.merger.jobs), 1)
+        tenant = self.sched.abide.tenants.get('tenant-one')
+        pipeline = tenant.layout.pipelines['post']
+        self.assertEqual(len(pipeline.getAllItems()), 1)
+        self.gearman_server.hold_merge_jobs_in_queue = False
+        self.gearman_server.release()
+        self.waitUntilSettled()
+
 
 class TestExecutor(ZuulTestCase):
     tenant_config_file = 'config/single-tenant/main.yaml'
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 1c633ba..0d081d5 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,9 +14,13 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import io
 import json
+import logging
 import os
 import textwrap
+import gc
+from unittest import skip
 
 import testtools
 
@@ -170,6 +174,39 @@
         self.assertIn('tenant-one-gate', A.messages[1],
                       "A should transit tenant-one gate")
 
+    @skip("This test is useful, but not reliable")
+    def test_full_and_dynamic_reconfig(self):
+        self.executor_server.hold_jobs_in_build = True
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+
+            - project:
+                name: org/project
+                tenant-one-gate:
+                  jobs:
+                    - project-test1
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
+        gc.collect()
+        pipelines = [obj for obj in gc.get_objects()
+                     if isinstance(obj, zuul.model.Pipeline)]
+        self.assertEqual(len(pipelines), 4)
+
+        self.executor_server.hold_jobs_in_build = False
+        self.executor_server.release()
+        self.waitUntilSettled()
+
     def test_dynamic_config(self):
         in_repo_conf = textwrap.dedent(
             """
@@ -696,6 +733,47 @@
         self.assertIn('the only project definition permitted', A.messages[0],
                       "A should have a syntax error reported")
 
+    def test_untrusted_depends_on_trusted(self):
+        with open(os.path.join(FIXTURE_DIR,
+                               'config/in-repo/git/',
+                               'common-config/zuul.yaml')) as f:
+            common_config = f.read()
+
+        common_config += textwrap.dedent(
+            """
+            - job:
+                name: project-test9
+            """)
+
+        file_dict = {'zuul.yaml': common_config}
+        A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
+                                           files=file_dict)
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project:
+                name: org/project
+                check:
+                  jobs:
+                    - project-test9
+            """)
+
+        file_dict = {'zuul.yaml': in_repo_conf}
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+                                           files=file_dict)
+        B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            B.subject, A.data['id'])
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        self.assertEqual(B.data['status'], 'NEW')
+        self.assertEqual(B.reported, 1,
+                         "B should report failure")
+        self.assertIn('depends on a change to a config project',
+                      B.messages[0],
+                      "A should have a syntax error reported")
+
     def test_duplicate_node_error(self):
         in_repo_conf = textwrap.dedent(
             """
@@ -816,6 +894,150 @@
                       A.messages[0],
                       "A should have a syntax error reported")
 
+    def test_job_list_in_project_template_not_dict_error(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project-template:
+                name: some-jobs
+                check:
+                  jobs:
+                    - project-test1:
+                        - required-projects:
+                            org/project2
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('expected str for dictionary value',
+                      A.messages[0], "A should have a syntax error reported")
+
+    def test_job_list_in_project_not_dict_error(self):
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project:
+                name: org/project1
+                check:
+                  jobs:
+                    - project-test1:
+                        - required-projects:
+                            org/project2
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+
+        self.assertEqual(A.data['status'], 'NEW')
+        self.assertEqual(A.reported, 1,
+                         "A should report failure")
+        self.assertIn('expected str for dictionary value',
+                      A.messages[0], "A should have a syntax error reported")
+
+    def test_project_template(self):
+        # Tests that a project template is not modified when used, and
+        # can therefore be used in subsequent reconfigurations.
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+            - project-template:
+                name: some-jobs
+                tenant-one-gate:
+                  jobs:
+                    - project-test1:
+                        required-projects:
+                          - org/project1
+            - project:
+                name: org/project
+                templates:
+                  - some-jobs
+            """)
+
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+                                           files=file_dict)
+        A.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.assertEqual(A.data['status'], 'MERGED')
+        self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+        self.waitUntilSettled()
+        in_repo_conf = textwrap.dedent(
+            """
+            - project:
+                name: org/project1
+                templates:
+                  - some-jobs
+            """)
+        file_dict = {'.zuul.yaml': in_repo_conf}
+        B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B',
+                                           files=file_dict)
+        B.addApproval('Code-Review', 2)
+        self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+        self.waitUntilSettled()
+        self.assertEqual(B.data['status'], 'MERGED')
+
+    def test_job_remove_add(self):
+        # Tests that a job can be removed from one repo and added in another.
+        # First, remove the current config for project1 since it
+        # references the job we want to remove.
+        file_dict = {'.zuul.yaml': None}
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
+                                           files=file_dict)
+        A.setMerged()
+        self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+        self.waitUntilSettled()
+        # Then propose a change to delete the job from one repo...
+        file_dict = {'.zuul.yaml': None}
+        B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+                                           files=file_dict)
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        # ...and a second that depends on it that adds it to another repo.
+        in_repo_conf = textwrap.dedent(
+            """
+            - job:
+                name: project-test1
+
+            - project:
+                name: org/project1
+                check:
+                  jobs:
+                    - project-test1
+            """)
+        in_repo_playbook = textwrap.dedent(
+            """
+            - hosts: all
+              tasks: []
+            """)
+        file_dict = {'.zuul.yaml': in_repo_conf,
+                     'playbooks/project-test1.yaml': in_repo_playbook}
+        C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C',
+                                           files=file_dict,
+                                           parent='refs/changes/1/1/1')
+        C.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            C.subject, B.data['id'])
+        self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
+        ], ordered=False)
+
     def test_multi_repo(self):
         downstream_repo_conf = textwrap.dedent(
             """
@@ -1665,7 +1887,8 @@
             return f.read()
 
     def test_job_output(self):
-        # Verify that command standard output appears in the job output
+        # Verify that command standard output appears in the job output,
+        # and that failures in the final playbook get logged.
 
         # This currently only verifies we receive output from
         # localhost.  Notably, it does not verify we receive output
@@ -1690,3 +1913,36 @@
         self.assertIn(token,
                       self._get_file(self.history[0],
                                      'work/logs/job-output.txt'))
+
+    def test_job_output_failure_log(self):
+        logger = logging.getLogger('zuul.AnsibleJob')
+        output = io.StringIO()
+        logger.addHandler(logging.StreamHandler(output))
+
+        # Verify that a failure in the last post playbook emits the contents
+        # of the json output to the log
+        self.executor_server.keep_jobdir = True
+        A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='job-output-failure',
+                 result='POST_FAILURE', changes='1,1'),
+        ], ordered=False)
+
+        token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+        j = json.loads(self._get_file(self.history[0],
+                                      'work/logs/job-output.json'))
+        self.assertEqual(token,
+                         j[0]['plays'][0]['tasks'][0]
+                         ['hosts']['localhost']['stdout'])
+
+        print(self._get_file(self.history[0],
+                             'work/logs/job-output.json'))
+        self.assertIn(token,
+                      self._get_file(self.history[0],
+                                     'work/logs/job-output.txt'))
+
+        log_output = output.getvalue()
+        self.assertIn('Final playbook failed', log_output)
+        self.assertIn('Failure test', log_output)
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 6a9ba01..79c8b39 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -11,6 +11,7 @@
 # under the License.
 
 import base64
+import collections
 from contextlib import contextmanager
 import copy
 import os
@@ -397,39 +398,42 @@
     secret = {vs.Required('name'): str,
               vs.Required('secret'): str}
 
-    job = {vs.Required('name'): str,
-           'parent': vs.Any(str, None),
-           'final': bool,
-           'failure-message': str,
-           'success-message': str,
-           'failure-url': str,
-           'success-url': str,
-           'hold-following-changes': bool,
-           'voting': bool,
-           'semaphore': str,
-           'tags': to_list(str),
-           'branches': to_list(str),
-           'files': to_list(str),
-           'secrets': to_list(vs.Any(secret, str)),
-           'irrelevant-files': to_list(str),
-           # validation happens in NodeSetParser
-           'nodeset': vs.Any(dict, str),
-           'timeout': int,
-           'attempts': int,
-           'pre-run': to_list(str),
-           'post-run': to_list(str),
-           'run': str,
-           '_source_context': model.SourceContext,
-           '_start_mark': ZuulMark,
-           'roles': to_list(role),
-           'required-projects': to_list(vs.Any(job_project, str)),
-           'vars': dict,
-           'dependencies': to_list(str),
-           'allowed-projects': to_list(str),
-           'override-branch': str,
-           'description': str,
-           'post-review': bool
-           }
+    # Attributes of a job that can also be used in Project and ProjectTemplate
+    job_attributes = {'parent': vs.Any(str, None),
+                      'final': bool,
+                      'failure-message': str,
+                      'success-message': str,
+                      'failure-url': str,
+                      'success-url': str,
+                      'hold-following-changes': bool,
+                      'voting': bool,
+                      'semaphore': str,
+                      'tags': to_list(str),
+                      'branches': to_list(str),
+                      'files': to_list(str),
+                      'secrets': to_list(vs.Any(secret, str)),
+                      'irrelevant-files': to_list(str),
+                      # validation happens in NodeSetParser
+                      'nodeset': vs.Any(dict, str),
+                      'timeout': int,
+                      'attempts': int,
+                      'pre-run': to_list(str),
+                      'post-run': to_list(str),
+                      'run': str,
+                      '_source_context': model.SourceContext,
+                      '_start_mark': ZuulMark,
+                      'roles': to_list(role),
+                      'required-projects': to_list(vs.Any(job_project, str)),
+                      'vars': dict,
+                      'dependencies': to_list(str),
+                      'allowed-projects': to_list(str),
+                      'override-branch': str,
+                      'description': str,
+                      'post-review': bool}
+
+    job_name = {vs.Required('name'): str}
+
+    job = dict(collections.ChainMap(job_name, job_attributes))
 
     schema = vs.Schema(job)
 
@@ -474,22 +478,27 @@
         return None
 
     @staticmethod
-    def fromYaml(tenant, layout, conf, project_pipeline=False):
-        with configuration_exceptions('job', conf):
-            JobParser.schema(conf)
+    def fromYaml(tenant, layout, conf, project_pipeline=False,
+                 name=None, validate=True):
+        if validate:
+            with configuration_exceptions('job', conf):
+                JobParser.schema(conf)
+
+        if name is None:
+            name = conf['name']
 
         # NB: The default detection system in the Job class requires
         # that we always assign values directly rather than modifying
         # them (e.g., "job.run = ..." rather than
         # "job.run.append(...)").
 
-        reference = layout.jobs.get(conf['name'], [None])[0]
+        reference = layout.jobs.get(name, [None])[0]
 
-        job = model.Job(conf['name'])
+        job = model.Job(name)
         job.source_context = conf.get('_source_context')
         job.source_line = conf.get('_start_mark').line + 1
 
-        is_variant = layout.hasJob(conf['name'])
+        is_variant = layout.hasJob(name)
         if 'parent' in conf:
             if conf['parent'] is not None:
                 # Parent job is explicitly specified, so inherit from it.
@@ -725,9 +734,12 @@
             '_start_mark': ZuulMark,
         }
 
+        job = {str: vs.Any(str, JobParser.job_attributes)}
+        job_list = [vs.Any(str, job)]
+        pipeline_contents = {'queue': str, 'jobs': job_list}
+
         for p in self.layout.pipelines.values():
-            project_template[p.name] = {'queue': str,
-                                        'jobs': [vs.Any(str, dict)]}
+            project_template[p.name] = pipeline_contents
         return vs.Schema(project_template)
 
     def fromYaml(self, conf, validate=True):
@@ -752,16 +764,11 @@
     def parseJobList(self, conf, source_context, start_mark, job_list):
         for conf_job in conf:
             if isinstance(conf_job, str):
-                attrs = dict(name=conf_job)
+                jobname = conf_job
+                attrs = {}
             elif isinstance(conf_job, dict):
                 # A dictionary in a job tree may override params
                 jobname, attrs = list(conf_job.items())[0]
-                if attrs:
-                    # We are overriding params, so make a new job def
-                    attrs['name'] = jobname
-                else:
-                    # Not overriding, so add a blank job
-                    attrs = dict(name=jobname)
             else:
                 raise Exception("Job must be a string or dictionary")
             attrs['_source_context'] = source_context
@@ -770,10 +777,11 @@
             # validate that the job is existing
             with configuration_exceptions('project or project-template',
                                           attrs):
-                self.layout.getJob(attrs['name'])
+                self.layout.getJob(jobname)
 
             job_list.addJob(JobParser.fromYaml(self.tenant, self.layout,
-                                               attrs, project_pipeline=True))
+                                               attrs, project_pipeline=True,
+                                               name=jobname, validate=False))
 
 
 class ProjectParser(object):
@@ -796,9 +804,12 @@
             '_start_mark': ZuulMark,
         }
 
+        job = {str: vs.Any(str, JobParser.job_attributes)}
+        job_list = [vs.Any(str, job)]
+        pipeline_contents = {'queue': str, 'jobs': job_list}
+
         for p in self.layout.pipelines.values():
-            project[p.name] = {'queue': str,
-                               'jobs': [vs.Any(str, dict)]}
+            project[p.name] = pipeline_contents
         return vs.Schema(project)
 
     def fromYaml(self, conf_list):
@@ -1621,9 +1632,22 @@
         for branch in branches:
             fns1 = []
             fns2 = []
-            files_list = files.connections.get(
+            files_entry = files.connections.get(
                 project.source.connection.connection_name, {}).get(
-                    project.name, {}).get(branch, {}).keys()
+                    project.name, {}).get(branch)
+            # If there is no files entry at all for this
+            # project-branch, then use the cached config.
+            if files_entry is None:
+                if trusted:
+                    incdata = project.unparsed_config
+                else:
+                    incdata = project.unparsed_branch_config.get(branch)
+                if incdata:
+                    config.extend(incdata)
+                continue
+            # Otherwise, do not use the cached config (even if the
+            # files are empty as that likely means they were deleted).
+            files_list = files_entry.keys()
             for fn in files_list:
                 if fn.startswith("zuul.d/"):
                     fns1.append(fn)
@@ -1655,14 +1679,6 @@
 
                     config.extend(incdata)
 
-            if not loaded:
-                if trusted:
-                    incdata = project.unparsed_config
-                else:
-                    incdata = project.unparsed_branch_config.get(branch)
-                if incdata:
-                    config.extend(incdata)
-
     def createDynamicLayout(self, tenant, files,
                             include_config_projects=False,
                             scheduler=None, connections=None):
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index cde773d..0f8d7d7 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -373,6 +373,9 @@
                     result = 'RETRY_LIMIT'
                 else:
                     build.retry = True
+            if result in ('DISCONNECT', 'ABORTED'):
+                # Always retry if the executor just went away
+                build.retry = True
             result_data = data.get('data', {})
             self.log.info("Build %s complete, result %s" %
                           (job, result))
@@ -404,7 +407,7 @@
 
     def onDisconnect(self, job):
         self.log.info("Gearman job %s lost due to disconnect" % job)
-        self.onBuildCompleted(job)
+        self.onBuildCompleted(job, 'DISCONNECT')
 
     def onUnknownJob(self, job):
         self.log.info("Gearman job %s lost due to unknown handle" % job)
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 82921fb..d739c18 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -562,6 +562,10 @@
                                                   'disk_limit_per_job', 250))
         self.merge_email = get_default(self.config, 'merger', 'git_user_email')
         self.merge_name = get_default(self.config, 'merger', 'git_user_name')
+        self.merge_speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        self.merge_speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
         execution_wrapper_name = get_default(self.config, 'executor',
                                              'execution_wrapper', 'bubblewrap')
         load_multiplier = float(get_default(self.config, 'executor',
@@ -610,7 +614,7 @@
         self.job_workers = {}
         self.disk_accountant = DiskAccountant(self.jobdir_root,
                                               self.disk_limit_per_job,
-                                              self.stopJobByJobdir,
+                                              self.stopJobDiskFull,
                                               self.merge_root)
 
     def _getMerger(self, root, logger=None):
@@ -618,9 +622,9 @@
             cache_root = self.merge_root
         else:
             cache_root = None
-        return zuul.merger.merger.Merger(root, self.connections,
-                                         self.merge_email, self.merge_name,
-                                         cache_root, logger)
+        return zuul.merger.merger.Merger(
+            root, self.connections, self.merge_email, self.merge_name,
+            self.merge_speed_limit, self.merge_speed_time, cache_root, logger)
 
     def start(self):
         self._running = True
@@ -843,9 +847,9 @@
     def finishJob(self, unique):
         del(self.job_workers[unique])
 
-    def stopJobByJobdir(self, jobdir):
+    def stopJobDiskFull(self, jobdir):
         unique = os.path.basename(jobdir)
-        self.stopJobByUnique(unique)
+        self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
 
     def stopJob(self, job):
         try:
@@ -856,13 +860,13 @@
         finally:
             job.sendWorkComplete()
 
-    def stopJobByUnique(self, unique):
+    def stopJobByUnique(self, unique, reason=None):
         job_worker = self.job_workers.get(unique)
         if not job_worker:
             self.log.debug("Unable to find worker for job %s" % (unique,))
             return
         try:
-            job_worker.stop()
+            job_worker.stop(reason)
         except Exception:
             self.log.exception("Exception sending stop command "
                                "to worker:")
@@ -914,12 +918,14 @@
     RESULT_TIMED_OUT = 2
     RESULT_UNREACHABLE = 3
     RESULT_ABORTED = 4
+    RESULT_DISK_FULL = 5
 
     RESULT_MAP = {
         RESULT_NORMAL: 'RESULT_NORMAL',
         RESULT_TIMED_OUT: 'RESULT_TIMED_OUT',
         RESULT_UNREACHABLE: 'RESULT_UNREACHABLE',
         RESULT_ABORTED: 'RESULT_ABORTED',
+        RESULT_DISK_FULL: 'RESULT_DISK_FULL',
     }
 
     def __init__(self, executor_server, job):
@@ -932,6 +938,7 @@
         self.proc_lock = threading.Lock()
         self.running = False
         self.aborted = False
+        self.aborted_reason = None
         self.thread = None
         self.private_key_file = get_default(self.executor_server.config,
                                             'executor', 'private_key_file',
@@ -949,8 +956,9 @@
         self.thread = threading.Thread(target=self.execute)
         self.thread.start()
 
-    def stop(self):
+    def stop(self, reason=None):
         self.aborted = True
+        self.aborted_reason = reason
         self.abortRunningProc()
         if self.thread:
             self.thread.join()
@@ -1098,6 +1106,8 @@
         self.job.sendWorkStatus(0, 100)
 
         result = self.runPlaybooks(args)
+        if self.aborted_reason == self.RESULT_DISK_FULL:
+            result = 'DISK_FULL'
         data = self.getResultData()
         result_data = json.dumps(dict(result=result,
                                       data=data))
@@ -1215,12 +1225,41 @@
             post_status, post_code = self.runAnsiblePlaybook(
                 playbook, args['timeout'], success, phase='post', index=index)
             if post_status != self.RESULT_NORMAL or post_code != 0:
+                success = False
                 # If we encountered a pre-failure, that takes
                 # precedence over the post result.
                 if not pre_failed:
                     result = 'POST_FAILURE'
+                if (index + 1) == len(self.jobdir.post_playbooks):
+                    self._logFinalPlaybookError()
+
         return result
 
+    def _logFinalPlaybookError(self):
+        # Failures in the final post playbook can include failures
+        # uploading logs, which makes diagnosing issues difficult.
+        # Grab the output from the last playbook from the json
+        # file and log it.
+        json_output = self.jobdir.job_output_file.replace('txt', 'json')
+        self.log.debug("Final playbook failed")
+        if not os.path.exists(json_output):
+            self.log.debug("JSON logfile {logfile} is missing".format(
+                logfile=json_output))
+            return
+        try:
+            output = json.load(open(json_output, 'r'))
+            last_playbook = output[-1]
+            # Transform json to yaml - because it's easier to read and given
+            # the size of the data it'll be extra-hard to read this as an
+            # all on one line stringified nested dict.
+            yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False)
+            for line in yaml_out.split('\n'):
+                self.log.debug(line)
+        except Exception:
+            self.log.exception(
+                "Could not decode json from {logfile}".format(
+                    logfile=json_output))
+
     def getHostList(self, args):
         hosts = []
         for node in args['nodes']:
@@ -1693,6 +1732,13 @@
         elif ret == -9:
             # Received abort request.
             return (self.RESULT_ABORTED, None)
+        elif ret == 1:
+            if syntax_buffer[0].startswith('ERROR!'):
+                with open(self.jobdir.job_output_file, 'a') as job_output:
+                    for line in syntax_buffer:
+                        job_output.write("{now} | {line}\n".format(
+                            now=datetime.datetime.now(),
+                            line=line.decode('utf-8').rstrip()))
         elif ret == 4:
             # Ansible could not parse the yaml.
             self.log.debug("Ansible parse error")
@@ -1759,15 +1805,15 @@
         if result is not None:
             result = self.RESULT_MAP[result]
             msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
-            msg.format(phase=phase, step=step, result=result,
-                       trusted=trusted, playbook=playbook, branch=branch)
+            msg = msg.format(phase=phase, step=step, result=result,
+                             trusted=trusted, playbook=playbook, branch=branch)
         else:
             msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
-            msg.format(phase=phase, step=step, trusted=trusted,
-                       playbook=playbook, branch=branch)
+            msg = msg.format(phase=phase, step=step, trusted=trusted,
+                             playbook=playbook, branch=branch)
 
         with open(self.jobdir.job_output_file, 'a') as job_output:
-            job_output.write("{now} | {msg}".format(
+            job_output.write("{now} | {msg}\n".format(
                 now=datetime.datetime.now(),
                 msg=msg))
 
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0d53fc8..edea69c 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -11,6 +11,7 @@
 # under the License.
 
 import logging
+import textwrap
 
 from zuul import exceptions
 from zuul import model
@@ -229,20 +230,17 @@
                                (item.change, change_queue))
                 change_queue.enqueueItem(item)
 
-                # Get an updated copy of the layout if necessary.
-                # This will return one of the following:
-                # 1) An existing layout from the item ahead or pipeline.
-                # 2) A newly created layout from the cached pipeline
-                #    layout config plus the previously returned
-                #    in-repo files stored in the buildset.
-                # 3) None in the case that a fetch of the files from
-                #    the merger is still pending.
-                item.layout = self.getLayout(item)
-
-                # Rebuild the frozen job tree from the new layout, if
-                # we have one.  If not, it will be built later.
-                if item.layout:
-                    item.freezeJobGraph()
+                # Get an updated copy of the layout and update the job
+                # graph if necessary.  This resumes the buildset merge
+                # state machine.  If we have an up-to-date layout, it
+                # will go ahead and refresh the job graph if needed;
+                # or it will send a new merge job if necessary, or it
+                # will do nothing if we're waiting on a merge job.
+                item.job_graph = None
+                item.layout = None
+                if item.active:
+                    if self.prepareItem(item):
+                        self.prepareJobs(item)
 
                 # Re-set build results in case any new jobs have been
                 # added to the tree.
@@ -430,32 +428,60 @@
         import zuul.configloader
         loader = zuul.configloader.ConfigLoader()
 
+        self.log.debug("Loading dynamic layout")
+        (trusted_updates, untrusted_updates) = item.includesConfigUpdates()
         build_set = item.current_build_set
+        trusted_layout_verified = False
         try:
             # First parse the config as it will land with the
             # full set of config and project repos.  This lets us
             # catch syntax errors in config repos even though we won't
             # actually run with that config.
-            self.log.debug("Loading dynamic layout (phase 1)")
-            loader.createDynamicLayout(
-                item.pipeline.layout.tenant,
-                build_set.files,
-                include_config_projects=True,
-                scheduler=self.sched,
-                connections=self.sched.connections)
+            if trusted_updates:
+                self.log.debug("Loading dynamic layout (phase 1)")
+                loader.createDynamicLayout(
+                    item.pipeline.layout.tenant,
+                    build_set.files,
+                    include_config_projects=True,
+                    scheduler=self.sched,
+                    connections=self.sched.connections)
+                trusted_layout_verified = True
 
             # Then create the config a second time but without changes
             # to config repos so that we actually use this config.
-            self.log.debug("Loading dynamic layout (phase 2)")
-            layout = loader.createDynamicLayout(
-                item.pipeline.layout.tenant,
-                build_set.files,
-                include_config_projects=False)
+            if untrusted_updates:
+                self.log.debug("Loading dynamic layout (phase 2)")
+                layout = loader.createDynamicLayout(
+                    item.pipeline.layout.tenant,
+                    build_set.files,
+                    include_config_projects=False)
+            else:
+                # We're a change to a config repo (with no untrusted
+                # items ahead), so just use the most recently
+                # generated layout.
+                if item.item_ahead:
+                    return item.item_ahead.layout
+                else:
+                    return item.queue.pipeline.layout
             self.log.debug("Loading dynamic layout complete")
         except zuul.configloader.ConfigurationSyntaxError as e:
-            self.log.info("Configuration syntax error "
-                          "in dynamic layout")
-            item.setConfigError(str(e))
+            self.log.info("Configuration syntax error in dynamic layout")
+            if trusted_layout_verified:
+                # The config is good if we include config-projects,
+                # but is currently invalid if we omit them.  Instead
+                # of returning the whole error message, just leave a
+                # note that the config will work once the dependent
+                # changes land.
+                msg = "This change depends on a change "\
+                      "to a config project.\n\n"
+                msg += textwrap.fill(textwrap.dedent("""\
+                The syntax of the configuration in this change has
+                been verified to be correct once the config project
+                change upon which it depends is merged, but it can not
+                be used until that occurs."""))
+                item.setConfigError(msg)
+            else:
+                item.setConfigError(str(e))
             return None
         except Exception:
             self.log.exception("Error in dynamic layout")
@@ -811,19 +837,28 @@
                 dt = None
             items = len(self.pipeline.getAllItems())
 
-            # stats.timers.zuul.pipeline.NAME.resident_time
-            # stats_counts.zuul.pipeline.NAME.total_changes
-            # stats.gauges.zuul.pipeline.NAME.current_changes
-            key = 'zuul.pipeline.%s' % self.pipeline.name
+            tenant = self.pipeline.layout.tenant
+            basekey = 'zuul.tenant.%s' % tenant.name
+            key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
+            # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time
+            # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.total_changes
+            # stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.current_changes
             self.sched.statsd.gauge(key + '.current_changes', items)
             if dt:
                 self.sched.statsd.timing(key + '.resident_time', dt)
                 self.sched.statsd.incr(key + '.total_changes')
 
-            # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
-            # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
-            project_name = item.change.project.name.replace('/', '.')
-            key += '.%s' % project_name
+            hostname = (item.change.project.canonical_hostname.
+                        replace('.', '_'))
+            projectname = (item.change.project.name.
+                           replace('.', '_').replace('/', '.'))
+            projectname = projectname.replace('.', '_').replace('/', '.')
+            branchname = item.change.branch.replace('.', '_').replace('/', '.')
+            # stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.
+            #   project.<host>.<project>.<branch>.resident_time
+            # stats_counts.zuul.tenant.<tenant>.pipeline.<pipeline>.
+            #   project.<host>.<project>.<branch>.total_changes
+            key += '.project.%s.%s.%s' % (hostname, projectname, branchname)
             if dt:
                 self.sched.statsd.timing(key + '.resident_time', dt)
                 self.sched.statsd.incr(key + '.total_changes')
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 7b732c7..035d1d0 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -13,10 +13,13 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+from contextlib import contextmanager
+import logging
+import os
+import shutil
+
 import git
 import gitdb
-import os
-import logging
 
 import zuul.model
 
@@ -38,23 +41,34 @@
             raise
 
 
+@contextmanager
+def timeout_handler(path):
+    try:
+        yield
+    except git.exc.GitCommandError as e:
+        if e.status == -9:
+            # Timeout.  The repo could be in a bad state, so delete it.
+            shutil.rmtree(path)
+        raise
+
+
 class ZuulReference(git.Reference):
     _common_path_default = "refs/zuul"
     _points_to_commits_only = True
 
 
 class Repo(object):
-    def __init__(self, remote, local, email, username, sshkey=None,
-                 cache_path=None, logger=None):
+    def __init__(self, remote, local, email, username, speed_limit, speed_time,
+                 sshkey=None, cache_path=None, logger=None, git_timeout=300):
         if logger is None:
             self.log = logging.getLogger("zuul.Repo")
         else:
             self.log = logger
-        # TODO(pabelanger): Expose to user via zuul.conf.
         self.env = {
-            'GIT_HTTP_LOW_SPEED_LIMIT': '1000',
-            'GIT_HTTP_LOW_SPEED_TIME': '30',
+            'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
+            'GIT_HTTP_LOW_SPEED_TIME': speed_time,
         }
+        self.git_timeout = git_timeout
         if sshkey:
             self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
 
@@ -66,7 +80,7 @@
         self._initialized = False
         try:
             self._ensure_cloned()
-        except:
+        except Exception:
             self.log.exception("Unable to initialize repo for %s" % remote)
 
     def _ensure_cloned(self):
@@ -79,12 +93,10 @@
             self.log.debug("Cloning from %s to %s" % (self.remote_url,
                                                       self.local_path))
             if self.cache_path:
-                git.Repo.clone_from(self.cache_path, self.local_path,
-                                    env=self.env)
+                self._git_clone(self.cache_path)
                 rewrite_url = True
             else:
-                git.Repo.clone_from(self.remote_url, self.local_path,
-                                    env=self.env)
+                self._git_clone(self.remote_url)
         repo = git.Repo(self.local_path)
         repo.git.update_environment(**self.env)
         # Create local branches corresponding to all the remote branches
@@ -108,6 +120,18 @@
     def isInitialized(self):
         return self._initialized
 
+    def _git_clone(self, url):
+        mygit = git.cmd.Git(os.getcwd())
+        mygit.update_environment(**self.env)
+        with timeout_handler(self.local_path):
+            mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
+                        kill_after_timeout=self.git_timeout)
+
+    def _git_fetch(self, repo, remote, ref=None, **kwargs):
+        with timeout_handler(self.local_path):
+            repo.git.fetch(remote, ref, kill_after_timeout=self.git_timeout,
+                           **kwargs)
+
     def createRepoObject(self):
         self._ensure_cloned()
         repo = git.Repo(self.local_path)
@@ -229,19 +253,18 @@
 
     def fetch(self, ref):
         repo = self.createRepoObject()
-        # The git.remote.fetch method may read in git progress info and
-        # interpret it improperly causing an AssertionError. Because the
-        # data was fetched properly subsequent fetches don't seem to fail.
-        # So try again if an AssertionError is caught.
-        origin = repo.remotes.origin
-        try:
-            origin.fetch(ref)
-        except AssertionError:
-            origin.fetch(ref)
+        # NOTE: The following is currently not applicable, but if we
+        # switch back to fetch methods from GitPython, we need to
+        # consider it:
+        #   The git.remote.fetch method may read in git progress info and
+        #   interpret it improperly causing an AssertionError. Because the
+        #   data was fetched properly subsequent fetches don't seem to fail.
+        #   So try again if an AssertionError is caught.
+        self._git_fetch(repo, 'origin', ref)
 
     def fetchFrom(self, repository, ref):
         repo = self.createRepoObject()
-        repo.git.fetch(repository, ref)
+        self._git_fetch(repo, repository, ref)
 
     def createZuulRef(self, ref, commit='HEAD'):
         repo = self.createRepoObject()
@@ -258,15 +281,14 @@
     def update(self):
         repo = self.createRepoObject()
         self.log.debug("Updating repository %s" % self.local_path)
-        origin = repo.remotes.origin
         if repo.git.version_info[:2] < (1, 9):
             # Before 1.9, 'git fetch --tags' did not include the
             # behavior covered by 'git --fetch', so we run both
             # commands in that case.  Starting with 1.9, 'git fetch
             # --tags' is all that is necessary.  See
             # https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
-            origin.fetch()
-        origin.fetch(tags=True)
+            self._git_fetch(repo, 'origin')
+        self._git_fetch(repo, 'origin', tags=True)
 
     def getFiles(self, files, dirs=[], branch=None, commit=None):
         ret = {}
@@ -297,7 +319,7 @@
 
 class Merger(object):
     def __init__(self, working_root, connections, email, username,
-                 cache_root=None, logger=None):
+                 speed_limit, speed_time, cache_root=None, logger=None):
         self.logger = logger
         if logger is None:
             self.log = logging.getLogger("zuul.Merger")
@@ -310,6 +332,8 @@
         self.connections = connections
         self.email = email
         self.username = username
+        self.speed_limit = speed_limit
+        self.speed_time = speed_time
         self.cache_root = cache_root
 
     def _addProject(self, hostname, project_name, url, sshkey):
@@ -322,8 +346,9 @@
                                           project_name)
             else:
                 cache_path = None
-            repo = Repo(url, path, self.email, self.username,
-                        sshkey, cache_path, self.logger)
+            repo = Repo(
+                url, path, self.email, self.username, self.speed_limit,
+                self.speed_time, sshkey, cache_path, self.logger)
 
             self.repos[key] = repo
         except Exception:
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 881209d..765d9e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -33,9 +33,13 @@
                                  '/var/lib/zuul/merger-git')
         merge_email = get_default(self.config, 'merger', 'git_user_email')
         merge_name = get_default(self.config, 'merger', 'git_user_name')
-
-        self.merger = merger.Merger(merge_root, connections, merge_email,
-                                    merge_name)
+        speed_limit = get_default(
+            config, 'merger', 'git_http_low_speed_limit', '1000')
+        speed_time = get_default(
+            config, 'merger', 'git_http_low_speed_time', '30')
+        self.merger = merger.Merger(
+            merge_root, connections, merge_email, merge_name, speed_limit,
+            speed_time)
 
     def start(self):
         self._running = True
diff --git a/zuul/model.py b/zuul/model.py
index c95a169..6eebbfb 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1157,7 +1157,6 @@
         self.start_time = None
         self.end_time = None
         self.estimated_time = None
-        self.pipeline = None
         self.canceled = False
         self.retry = False
         self.parameters = {}
@@ -1169,6 +1168,10 @@
         return ('<Build %s of %s on %s>' %
                 (self.uuid, self.job.name, self.worker))
 
+    @property
+    def pipeline(self):
+        return self.build_set.item.pipeline
+
     def getSafeAttributes(self):
         return Attributes(uuid=self.uuid,
                           result=self.result,
@@ -1432,7 +1435,6 @@
 
     def addBuild(self, build):
         self.current_build_set.addBuild(build)
-        build.pipeline = self.pipeline
 
     def removeBuild(self, build):
         self.current_build_set.removeBuild(build)
@@ -1518,6 +1520,25 @@
     def wasDequeuedNeedingChange(self):
         return self.dequeued_needing_change
 
+    def includesConfigUpdates(self):
+        includes_trusted = False
+        includes_untrusted = False
+        tenant = self.pipeline.layout.tenant
+        item = self
+        while item:
+            if item.change.updatesConfig():
+                (trusted, project) = tenant.getProject(
+                    item.change.project.canonical_name)
+                if trusted:
+                    includes_trusted = True
+                else:
+                    includes_untrusted = True
+            if includes_trusted and includes_untrusted:
+                # We're done early
+                return (includes_trusted, includes_untrusted)
+            item = item.item_ahead
+        return (includes_trusted, includes_untrusted)
+
     def isHoldingFollowingChanges(self):
         if not self.live:
             return False
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 9a125ce..f4c850d 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -111,16 +111,19 @@
             except Exception:
                 self.log.exception("Error unlocking node:")
 
-    def lockNodeSet(self, nodeset):
-        self._lockNodes(nodeset.getNodes())
+    def lockNodeSet(self, nodeset, request_id):
+        self._lockNodes(nodeset.getNodes(), request_id)
 
-    def _lockNodes(self, nodes):
+    def _lockNodes(self, nodes, request_id):
         # Try to lock all of the supplied nodes.  If any lock fails,
         # try to unlock any which have already been locked before
         # re-raising the error.
         locked_nodes = []
         try:
             for node in nodes:
+                if node.allocated_to != request_id:
+                    raise Exception("Node %s allocated to %s, not %s" %
+                                    (node.id, node.allocated_to, request_id))
                 self.log.debug("Locking node %s" % (node,))
                 self.sched.zk.lockNode(node, timeout=30)
                 locked_nodes.append(node)
@@ -141,7 +144,12 @@
             del self.requests[request.uid]
             return False
 
-        if request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
+        # TODOv3(jeblair): handle allocation failure
+        if deleted:
+            self.log.debug("Resubmitting lost node request %s" % (request,))
+            request.id = None
+            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
             self.log.info("Node request %s %s" % (request, request.state))
 
             # Give our results to the scheduler.
@@ -150,18 +158,29 @@
 
             # Stop watching this request node.
             return False
-        # TODOv3(jeblair): handle allocation failure
-        elif deleted:
-            self.log.debug("Resubmitting lost node request %s" % (request,))
-            self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
+
         return True
 
-    def acceptNodes(self, request):
+    def acceptNodes(self, request, request_id):
         # Called by the scheduler when it wants to accept and lock
         # nodes for (potential) use.
 
         self.log.info("Accepting node request %s" % (request,))
 
+        if request_id != request.id:
+            self.log.info("Skipping node accept for %s (resubmitted as %s)",
+                          request_id, request.id)
+            return
+
+        # Make sure the request still exists. It's possible it could have
+        # disappeared if we lost the ZK session between when the fulfillment
+        # response was added to our queue, and when we actually get around to
+        # processing it. Nodepool will automatically reallocate the assigned
+        # nodes in that situation.
+        if not self.sched.zk.nodeRequestExists(request):
+            self.log.info("Request %s no longer exists", request.id)
+            return
+
         if request.canceled:
             self.log.info("Ignoring canceled node request %s" % (request,))
             # The request was already deleted when it was canceled
@@ -171,7 +190,7 @@
         if request.fulfilled:
             # If the request suceeded, try to lock the nodes.
             try:
-                self.lockNodeSet(request.nodeset)
+                self.lockNodeSet(request.nodeset, request.id)
                 locked = True
             except Exception:
                 self.log.exception("Error locking nodes:")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a926f6e..cfcd865 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -164,6 +164,7 @@
 
     def __init__(self, request):
         self.request = request
+        self.request_id = request.id
 
 
 def toList(item):
@@ -281,31 +282,34 @@
         build.result = result
         try:
             if self.statsd and build.pipeline:
-                jobname = build.job.name.replace('.', '_')
-                key = 'zuul.pipeline.%s.all_jobs' % build.pipeline.name
+                tenant = build.pipeline.layout.tenant
+                jobname = build.job.name.replace('.', '_').replace('/', '_')
+                hostname = (build.build_set.item.change.project.
+                            canonical_hostname.replace('.', '_'))
+                projectname = (build.build_set.item.change.project.name.
+                               replace('.', '_').replace('/', '_'))
+                branchname = (build.build_set.item.change.branch.
+                              replace('.', '_').replace('/', '_'))
+                basekey = 'zuul.tenant.%s' % tenant.name
+                pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
+                # zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
+                key = '%s.all_jobs' % pipekey
                 self.statsd.incr(key)
-                for label in build.node_labels:
-                    # Jenkins includes the node name in its list of labels, so
-                    # we filter it out here, since that is not statistically
-                    # interesting.
-                    if label == build.node_name:
-                        continue
-                    dt = int((build.start_time - build.execute_time) * 1000)
-                    key = 'zuul.pipeline.%s.label.%s.wait_time' % (
-                        build.pipeline.name, label)
-                    self.statsd.timing(key, dt)
-                key = 'zuul.pipeline.%s.job.%s.%s' % (build.pipeline.name,
-                                                      jobname, build.result)
+                jobkey = '%s.project.%s.%s.%s.job.%s' % (
+                    pipekey, hostname, projectname, branchname, jobname)
+                # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+                #   <host>.<project>.<branch>.job.<job>.<result>
+                key = '%s.%s' % (jobkey, build.result)
                 if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
                     dt = int((build.end_time - build.start_time) * 1000)
                     self.statsd.timing(key, dt)
                 self.statsd.incr(key)
-
-                key = 'zuul.pipeline.%s.job.%s.wait_time' % (
-                    build.pipeline.name, jobname)
+                # zuul.tenant.<tenant>.pipeline.<pipeline>.project.
+                #  <host>.<project>.<branch>.job.<job>.wait_time
+                key = '%s.wait_time' % jobkey
                 dt = int((build.start_time - build.execute_time) * 1000)
                 self.statsd.timing(key, dt)
-        except:
+        except Exception:
             self.log.exception("Exception reporting runtime stats")
         event = BuildCompletedEvent(build)
         self.result_event_queue.put(event)
@@ -889,9 +893,10 @@
 
     def _doNodesProvisionedEvent(self, event):
         request = event.request
+        request_id = event.request_id
         build_set = request.build_set
 
-        self.nodepool.acceptNodes(request)
+        self.nodepool.acceptNodes(request, request_id)
         if request.canceled:
             return
 
diff --git a/zuul/zk.py b/zuul/zk.py
index a3efef2..2fca749 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -160,7 +160,6 @@
         def callback(data, stat):
             if data:
                 data = self._strToDict(data)
-                node_request.updateFromDict(data)
                 request_nodes = list(node_request.nodeset.getNodes())
                 for i, nodeid in enumerate(data.get('nodes', [])):
                     node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
@@ -168,6 +167,7 @@
                     node_data = self._strToDict(node_data)
                     request_nodes[i].id = nodeid
                     request_nodes[i].updateFromDict(node_data)
+                node_request.updateFromDict(data)
             deleted = (data is None)  # data *are* none
             return watcher(node_request, deleted)
 
@@ -187,6 +187,19 @@
         except kze.NoNodeError:
             pass
 
+    def nodeRequestExists(self, node_request):
+        '''
+        See if a NodeRequest exists in ZooKeeper.
+
+        :param NodeRequest node_request: A NodeRequest to verify.
+
+        :returns: True if the request exists, False otherwise.
+        '''
+        path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
+        if self.client.exists(path):
+            return True
+        return False
+
     def storeNode(self, node):
         '''Store the node.