Merge "Squelch ara initializing message from log" into feature/zuulv3
diff --git a/.zuul.yaml b/.zuul.yaml
index e2628cb..271dd02 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -7,9 +7,11 @@
             voting: false
         - tox-pep8
         - tox-py35
-        - tox-tarball
     gate:
       jobs:
         - tox-docs
         - tox-pep8
         - tox-py35
+    post:
+      jobs:
+        - publish-openstack-python-branch-tarball
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index aa6d8c8..2c70d47 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -311,10 +311,10 @@
 *trusted* execution context, otherwise, it is run in the *untrusted*
 execution context.
 
-Both execution contexts use `bubblewrap`_ to create a namespace to
-ensure that playbook executions are isolated and are unable to access
-files outside of a restricted environment.  The administrator may
-configure additional local directories on the executor to be made
+Both execution contexts use `bubblewrap`_ [#nullwrap]_ to create a
+namespace to ensure that playbook executions are isolated and are unable
+to access files outside of a restricted environment.  The administrator
+may configure additional local directories on the executor to be made
 available to the restricted environment.
 
 The trusted execution context has access to all Ansible features,
@@ -335,6 +335,8 @@
 protections are made as part of a defense-in-depth strategy.
 
 .. _bubblewrap: https://github.com/projectatomic/bubblewrap
+.. [#nullwrap] Unless one has set execution_wrapper to nullwrap in the
+               executor configuration.
 
 Configuration
 ~~~~~~~~~~~~~
@@ -437,6 +439,25 @@
       List of paths, separated by ``:`` to read-write bind mount into
       untrusted bubblewrap contexts.
 
+   .. attr:: execution_wrapper
+      :default: bubblewrap
+
+      Name of the execution wrapper to use when executing
+      `ansible-playbook`. The default, `bubblewrap` is recommended for
+      all installations.
+
+      There is also a `nullwrap` driver for situations where one wants
+      to run Zuul without access to bubblewrap or in such a way that
+      bubblewrap may interfere with the jobs themselves. However,
+      `nullwrap` is considered unsafe, as `bubblewrap` provides
+      significant protections against malicious users and accidental
+      breakage in playbooks. As such,  `nullwrap` is not recommended
+      for use in production.
+      
+      This option, and thus, `nullwrap`, may be removed in the future.
+      `bubblewrap` has become integral to securely operating Zuul.  If you
+      have a valid use case for it, we encourage you to let us know.
+
 .. attr:: merger
 
    .. attr:: git_user_email
diff --git a/doc/source/admin/installation.rst b/doc/source/admin/installation.rst
index bc61f7e..ae7d571 100644
--- a/doc/source/admin/installation.rst
+++ b/doc/source/admin/installation.rst
@@ -30,7 +30,7 @@
 
 Gearman is a job distribution system that Zuul uses to communicate
 with its distributed components.  The Zuul scheduler distributes work
-to Zuul mergers and executors use Gearman.  You may supply your own
+to Zuul mergers and executors using Gearman.  You may supply your own
 gearman server, but the Zuul scheduler includes a built-in server
 which is recommended.  Ensure that all Zuul hosts can communicate with
 the gearman server.
@@ -56,7 +56,7 @@
 Nodepool uses Zookeeper to communicate internally among its
 components, and also to communicate with Zuul.  You can run a simple
 single-node Zookeeper instance, or a multi-node cluster.  Ensure that
-The host running the Zuul scheduler has access to the cluster.
+the host running the Zuul scheduler has access to the cluster.
 
 Ansible
 ~~~~~~~
diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst
index 9c69960..4fed1f9 100644
--- a/doc/source/admin/monitoring.rst
+++ b/doc/source/admin/monitoring.rst
@@ -31,12 +31,12 @@
 Metrics
 ~~~~~~~
 
-The metrics are emitted by the Zuul :ref:`scheduler`:
+These metrics are emitted by the Zuul :ref:`scheduler`:
 
 .. stat:: gerrit.event.<type>
    :type: counter
 
-   Gerrit emits different kind of message over its `stream-events`
+   Gerrit emits different kinds of messages over its `stream-events`
    interface.  Zuul will report counters for each type of event it
    receives from Gerrit.
 
@@ -89,7 +89,7 @@
       .. stat:: total_changes
          :type: counter
 
-         The number of change processed by the pipeline since Zuul
+         The number of changes processed by the pipeline since Zuul
          started.
 
       .. stat:: wait_time
@@ -119,7 +119,7 @@
          .. stat:: total_changes
             :type: counter
 
-            The number of change for this project processed by the
+            The number of changes for this project processed by the
             pipeline since Zuul started.
 
 As an example, given a job named `myjob` triggered by the `gate` pipeline
diff --git a/doc/source/developer/docs.rst b/doc/source/developer/docs.rst
index 6a7256e..0a8bfe5 100644
--- a/doc/source/developer/docs.rst
+++ b/doc/source/developer/docs.rst
@@ -65,7 +65,7 @@
 entire hierarchy of the attribute, but emphasises the last portion
 (i.e., the field being documented).
 
-To use the hierarchical features, simply nest with indendtation in the
+To use the hierarchical features, simply nest with indentation in the
 normal RST manner.
 
 It supports the ``required`` and ``default`` options and will annotate
diff --git a/tests/base.py b/tests/base.py
index 028a8b1..357dd7a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -481,6 +481,25 @@
         self.changes[self.change_number] = c
         return c
 
+    def getFakeBranchCreatedEvent(self, project, branch):
+        path = os.path.join(self.upstream_root, project)
+        repo = git.Repo(path)
+        oldrev = 40 * '0'
+
+        event = {
+            "type": "ref-updated",
+            "submitter": {
+                "name": "User Name",
+            },
+            "refUpdate": {
+                "oldRev": oldrev,
+                "newRev": repo.heads[branch].commit.hexsha,
+                "refName": branch,
+                "project": project,
+            }
+        }
+        return event
+
     def review(self, project, changeid, message, action):
         number, ps = changeid.split(',')
         change = self.changes[int(number)]
@@ -2792,6 +2811,41 @@
             files)
         return before
 
+    def newTenantConfig(self, source_name):
+        """ Use this to update the tenant config file in tests
+
+        This will update self.tenant_config_file to point to a temporary file
+        for the duration of this particular test. The content of that file will
+        be taken from FIXTURE_DIR/source_name
+
+        After the test the original value of self.tenant_config_file will be
+        restored.
+
+        :arg str source_name: The path of the file under
+            FIXTURE_DIR that will be used to populate the new tenant
+            config file.
+        """
+        source_path = os.path.join(FIXTURE_DIR, source_name)
+        orig_tenant_config_file = self.tenant_config_file
+        with tempfile.NamedTemporaryFile(
+            delete=False, mode='wb') as new_tenant_config:
+            self.tenant_config_file = new_tenant_config.name
+            with open(source_path, mode='rb') as source_tenant_config:
+                new_tenant_config.write(source_tenant_config.read())
+        self.config['scheduler']['tenant_config'] = self.tenant_config_file
+        self.setupAllProjectKeys()
+        self.log.debug(
+            'tenant_config_file = {}'.format(self.tenant_config_file))
+
+        def _restoreTenantConfig():
+            self.log.debug(
+                'restoring tenant_config_file = {}'.format(
+                    orig_tenant_config_file))
+            os.unlink(self.tenant_config_file)
+            self.tenant_config_file = orig_tenant_config_file
+            self.config['scheduler']['tenant_config'] = orig_tenant_config_file
+        self.addCleanup(_restoreTenantConfig)
+
     def addEvent(self, connection, event):
 
         """Inject a Fake (Gerrit) event.
diff --git a/tests/fixtures/config/job-output/git/common-config/playbooks/job-output.yaml b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output.yaml
new file mode 100644
index 0000000..332db87
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/playbooks/job-output.yaml
@@ -0,0 +1,3 @@
+- hosts: all
+  tasks:
+    - shell: echo "Standard output test {{ zuul.executor.src_root }}"
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
new file mode 100644
index 0000000..a83f0bc
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -0,0 +1,27 @@
+- pipeline:
+    name: check
+    manager: independent
+    post-review: true
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    parent: base
+    name: job-output
+
+- project:
+    name: org/project
+    check:
+      jobs:
+        - job-output
diff --git a/tests/fixtures/config/job-output/git/org_project/README b/tests/fixtures/config/job-output/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/job-output/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/job-output/main.yaml b/tests/fixtures/config/job-output/main.yaml
new file mode 100644
index 0000000..208e274
--- /dev/null
+++ b/tests/fixtures/config/job-output/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - common-config
+        untrusted-projects:
+          - org/project
diff --git a/tests/fixtures/layout-delayed-repo-init.yaml b/tests/fixtures/layout-delayed-repo-init.yaml
deleted file mode 100644
index 04dc010..0000000
--- a/tests/fixtures/layout-delayed-repo-init.yaml
+++ /dev/null
@@ -1,52 +0,0 @@
-pipelines:
-  - name: check
-    manager: IndependentPipelineManager
-    trigger:
-      gerrit:
-        - event: patchset-created
-    success:
-      gerrit:
-        Verified: 1
-    failure:
-      gerrit:
-        Verified: -1
-
-  - name: post
-    manager: IndependentPipelineManager
-    trigger:
-      gerrit:
-        - event: ref-updated
-          ref: ^(?!refs/).*$
-
-  - name: gate
-    manager: DependentPipelineManager
-    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
-    trigger:
-      gerrit:
-        - event: comment-added
-          approval:
-            - Approved: 1
-    success:
-      gerrit:
-        Verified: 2
-        submit: true
-    failure:
-      gerrit:
-        Verified: -2
-    start:
-      gerrit:
-        Verified: 0
-    precedence: high
-
-projects:
-  - name: org/new-project
-    check:
-      - project-merge:
-        - project-test1
-        - project-test2
-    gate:
-      - project-merge:
-        - project-test1
-        - project-test2
-    post:
-      - project-post
diff --git a/tests/fixtures/layouts/delayed-repo-init.yaml b/tests/fixtures/layouts/delayed-repo-init.yaml
new file mode 100644
index 0000000..e97d37a
--- /dev/null
+++ b/tests/fixtures/layouts/delayed-repo-init.yaml
@@ -0,0 +1,77 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- pipeline:
+    name: post
+    manager: independent
+    trigger:
+      gerrit:
+        - event: ref-updated
+          ref: ^(?!refs/).*$
+
+- pipeline:
+    name: gate
+    manager: dependent
+    failure-message: Build failed.  For information on how to proceed, see http://wiki.example.org/Test_Failures
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - Approved: 1
+    success:
+      gerrit:
+        Verified: 2
+        submit: true
+    failure:
+      gerrit:
+        Verified: -2
+    start:
+      gerrit:
+        Verified: 0
+    precedence: high
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    name: project-merge
+
+- job:
+    name: project-test1
+
+- job:
+    name: project-test2
+
+- job:
+    name: project-post
+
+- project:
+    name: org/new-project
+    check:
+      jobs:
+        - project-merge
+        - project-test1:
+            dependencies: project-merge
+        - project-test2:
+            dependencies: project-merge
+    gate:
+      jobs:
+        - project-merge:
+        - project-test1:
+            dependencies: project-merge
+        - project-test2:
+            dependencies: project-merge
+    post:
+      jobs:
+        - project-post
diff --git a/tests/fixtures/tenants/delayed-repo-init.yaml b/tests/fixtures/tenants/delayed-repo-init.yaml
new file mode 100644
index 0000000..433e6f7
--- /dev/null
+++ b/tests/fixtures/tenants/delayed-repo-init.yaml
@@ -0,0 +1,11 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - common-config
+        untrusted-projects:
+          - org/project
+          - org/project1
+          - org/project2
+          - org/new-project
diff --git a/tests/unit/test_bubblewrap.py b/tests/unit/test_bubblewrap.py
index d94b3f2..661d868 100644
--- a/tests/unit/test_bubblewrap.py
+++ b/tests/unit/test_bubblewrap.py
@@ -32,14 +32,15 @@
 
     def test_bubblewrap_wraps(self):
         bwrap = bubblewrap.BubblewrapDriver()
+        context = bwrap.getExecutionContext()
         work_dir = tempfile.mkdtemp()
         ssh_agent = SshAgent()
         self.addCleanup(ssh_agent.stop)
         ssh_agent.start()
-        po = bwrap.getPopen(work_dir=work_dir,
-                            ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
-        self.assertTrue(po.passwd_r > 2)
-        self.assertTrue(po.group_r > 2)
+        po = context.getPopen(work_dir=work_dir,
+                              ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
+        self.assertTrue(po.fds[0] > 2)
+        self.assertTrue(po.fds[1] > 2)
         self.assertTrue(work_dir in po.command)
         # Now run /usr/bin/id to verify passwd/group entries made it in
         true_proc = po(['/usr/bin/id'], stdout=subprocess.PIPE,
@@ -50,19 +51,19 @@
         # And that it did not print things on stderr
         self.assertEqual(0, len(errs.strip()))
         # Make sure the _r's are closed
-        self.assertIsNone(po.passwd_r)
-        self.assertIsNone(po.group_r)
+        self.assertEqual([], po.fds)
 
     def test_bubblewrap_leak(self):
         bwrap = bubblewrap.BubblewrapDriver()
+        context = bwrap.getExecutionContext()
         work_dir = tempfile.mkdtemp()
         ansible_dir = tempfile.mkdtemp()
         ssh_agent = SshAgent()
         self.addCleanup(ssh_agent.stop)
         ssh_agent.start()
-        po = bwrap.getPopen(work_dir=work_dir,
-                            ansible_dir=ansible_dir,
-                            ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
+        po = context.getPopen(work_dir=work_dir,
+                              ansible_dir=ansible_dir,
+                              ssh_auth_sock=ssh_agent.env['SSH_AUTH_SOCK'])
         leak_time = 60
         # Use hexadecimal notation to avoid false-positive
         true_proc = po(['bash', '-c', 'sleep 0x%X & disown' % leak_time])
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index ce30e7c..6dd8333 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -202,7 +202,7 @@
             'name': 'python27',
             'parent': 'base',
             'pre-run': 'py27-pre',
-            'post-run': 'py27-post',
+            'post-run': ['py27-post-a', 'py27-post-b'],
             'nodes': [{
                 'name': 'controller',
                 'label': 'new',
@@ -275,7 +275,8 @@
                          ['base-pre',
                           'py27-pre'])
         self.assertEqual([x.path for x in job.post_run],
-                         ['py27-post',
+                         ['py27-post-a',
+                          'py27-post-b',
                           'base-post'])
         self.assertEqual([x.path for x in job.run],
                          ['playbooks/python27',
@@ -305,7 +306,8 @@
                           'py27-diablo-pre'])
         self.assertEqual([x.path for x in job.post_run],
                          ['py27-diablo-post',
-                          'py27-post',
+                          'py27-post-a',
+                          'py27-post-b',
                           'base-post'])
         self.assertEqual([x.path for x in job.run],
                          ['py27-diablo']),
@@ -330,7 +332,8 @@
                           'py27-essex-pre'])
         self.assertEqual([x.path for x in job.post_run],
                          ['py27-essex-post',
-                          'py27-post',
+                          'py27-post-a',
+                          'py27-post-b',
                           'base-post'])
         self.assertEqual([x.path for x in job.run],
                          ['playbooks/python27',
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 97d53e0..f33d964 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2759,13 +2759,18 @@
         self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
         self.assertIn('Build succeeded', A.messages[0])
 
-    @skip("Disabled for early v3 development")
     def test_delayed_repo_init(self):
-        self.updateConfigLayout(
-            'tests/fixtures/layout-delayed-repo-init.yaml')
-        self.sched.reconfigure(self.config)
-
         self.init_repo("org/new-project")
+        files = {'README': ''}
+        self.addCommitToRepo("org/new-project", 'Initial commit',
+                             files=files, tag='init')
+        self.newTenantConfig('tenants/delayed-repo-init.yaml')
+        self.commitConfigUpdate(
+            'common-config',
+            'layouts/delayed-repo-init.yaml')
+        self.sched.reconfigure(self.config)
+        self.waitUntilSettled()
+
         A = self.fake_gerrit.addFakeChange('org/new-project', 'master', 'A')
 
         A.addApproval('Code-Review', 2)
@@ -3836,19 +3841,23 @@
         self.create_branch('org/project2', 'mp')
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
-        C = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C')
-        C.data['id'] = B.data['id']
+        C1 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C1')
+        C2 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C2',
+                                            status='ABANDONED')
+        C1.data['id'] = B.data['id']
+        C2.data['id'] = B.data['id']
+
         A.addApproval('Code-Review', 2)
         B.addApproval('Code-Review', 2)
-        C.addApproval('Code-Review', 2)
+        C1.addApproval('Code-Review', 2)
 
-        # A Depends-On: B+C
+        # A Depends-On: B+C1
         A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
             A.subject, B.data['id'])
 
         self.executor_server.hold_jobs_in_build = True
         B.addApproval('Approved', 1)
-        C.addApproval('Approved', 1)
+        C1.addApproval('Approved', 1)
         self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
         self.waitUntilSettled()
 
@@ -3864,10 +3873,10 @@
 
         self.assertEqual(A.data['status'], 'MERGED')
         self.assertEqual(B.data['status'], 'MERGED')
-        self.assertEqual(C.data['status'], 'MERGED')
+        self.assertEqual(C1.data['status'], 'MERGED')
         self.assertEqual(A.reported, 2)
         self.assertEqual(B.reported, 2)
-        self.assertEqual(C.reported, 2)
+        self.assertEqual(C1.reported, 2)
 
         changes = self.getJobFromHistory(
             'project-merge', 'org/project1').changes
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index adb6bed..60a0986 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -14,6 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import json
 import os
 import textwrap
 
@@ -444,6 +445,9 @@
         file_dict = {'.zuul.yaml': in_repo_conf,
                      'playbooks/project-test2.yaml': in_repo_playbook}
         self.create_branch('org/project', 'stable')
+        self.fake_gerrit.addEvent(
+            self.fake_gerrit.getFakeBranchCreatedEvent(
+                'org/project', 'stable'))
         A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A',
                                            files=file_dict)
         A.addApproval('Code-Review', 2)
@@ -484,6 +488,9 @@
         # it from a different branch on a different repo.
 
         self.create_branch('org/project1', 'stable')
+        self.fake_gerrit.addEvent(
+            self.fake_gerrit.getFakeBranchCreatedEvent(
+                'org/project1', 'stable'))
 
         in_repo_conf = textwrap.dedent(
             """
@@ -1286,8 +1293,7 @@
         ], ordered=False)
         matches = self.searchForContent(self.history[0].jobdir.root,
                                         b'test-password')
-        self.assertEqual(set(['/ansible/playbook_0/secrets.yaml',
-                              '/work/secret-file.txt']),
+        self.assertEqual(set(['/work/secret-file.txt']),
                          set(matches))
 
     def test_secret_file(self):
@@ -1322,8 +1328,7 @@
         ], ordered=False)
         matches = self.searchForContent(self.history[0].jobdir.root,
                                         b'test-password')
-        self.assertEqual(set(['/ansible/playbook_0/secrets.yaml',
-                              '/work/failure-file.txt']),
+        self.assertEqual(set(['/work/failure-file.txt']),
                          set(matches))
 
     def test_secret_file_fail(self):
@@ -1334,3 +1339,39 @@
         # paths.
         self.executor_server.verbose = True
         self._test_secret_file_fail()
+
+
+class TestJobOutput(AnsibleZuulTestCase):
+    tenant_config_file = 'config/job-output/main.yaml'
+
+    def _get_file(self, build, path):
+        p = os.path.join(build.jobdir.root, path)
+        with open(p) as f:
+            return f.read()
+
+    def test_job_output(self):
+        # Verify that command standard output appears in the job output
+
+        # This currently only verifies we receive output from
+        # localhost.  Notably, it does not verify we receive output
+        # via zuul_console streaming.
+        self.executor_server.keep_jobdir = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='job-output', result='SUCCESS', changes='1,1'),
+        ], ordered=False)
+
+        token = 'Standard output test %s' % (self.history[0].jobdir.src_root)
+        j = json.loads(self._get_file(self.history[0],
+                                      'work/logs/job-output.json'))
+        self.assertEqual(token,
+                         j[0]['plays'][0]['tasks'][0]
+                         ['hosts']['localhost']['stdout'])
+
+        print(self._get_file(self.history[0],
+                             'work/logs/job-output.txt'))
+        self.assertIn(token,
+                      self._get_file(self.history[0],
+                                     'work/logs/job-output.txt'))
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 1a006bc..4cfd19e 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -259,8 +259,12 @@
                 is_localhost = True
         else:
             task_hostvars = result._task._variable_manager._hostvars[task_host]
+            # Normally hosts in the inventory will have ansible_host
+            # or ansible_inventory host defined.  The implied
+            # inventory record for 'localhost' will have neither, so
+            # default to that if none are supplied.
             if task_hostvars.get('ansible_host', task_hostvars.get(
-                    'ansible_inventory_host')) in localhost_names:
+                    'ansible_inventory_host', 'localhost')) in localhost_names:
                 is_localhost = True
 
         if not is_localhost and is_task:
@@ -487,18 +491,12 @@
     def _get_task_hosts(self, task):
         # If this task has as delegate to, we don't care about the play hosts,
         # we care about the task's delegate target.
-        delegate_to = task.delegate_to
-        if delegate_to:
-            return [delegate_to]
-        hosts = self._play.hosts
-        if 'all' in hosts:
-            # NOTE(jamielennox): play.hosts is purely the list of hosts
-            # that was provided not interpretted by inventory. We don't
-            # have inventory access here but we can assume that 'all' is
-            # everything in hostvars.
-            play_vars = self._play._variable_manager._hostvars
-            hosts = play_vars.keys()
-        return hosts
+        if task.delegate_to:
+            return [task.delegate_to]
+
+        # _restriction returns the parsed/compiled list of hosts after
+        # applying subsets/limits
+        return self._play._variable_manager._inventory._restriction
 
     def _dump_result_dict(self, result_dict):
         result_dict = result_dict.copy()
diff --git a/zuul/ansible/library/zuul_afs.py b/zuul/ansible/library/zuul_afs.py
deleted file mode 100644
index 710c15d..0000000
--- a/zuul/ansible/library/zuul_afs.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2016 Red Hat
-#
-# This module is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This software is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this software.  If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import subprocess
-
-
-def afs_sync(afsuser, afskeytab, afsroot, afssource, afstarget):
-    # Find the list of root markers in the just-completed build
-    # (usually there will only be one, but some builds produce content
-    # at the root *and* at a tag location, or possibly at multiple
-    # translation roots).
-    src_root_markers = []
-    for root, dirnames, filenames in os.walk(afssource):
-        if '.root-marker' in filenames:
-            src_root_markers.append(root)
-
-    output_blocks = []
-    # Synchronize the content at each root marker.
-    for root_count, src_root in enumerate(src_root_markers):
-        # The component of the path between the source root and the
-        # current source root marker.  May be '.' if there is a marker
-        # at the root.
-        subpath = os.path.relpath(src_root, afssource)
-
-        # Add to our debugging output
-        output = dict(subpath=subpath)
-        output_blocks.append(output)
-
-        # The absolute path to the source (in staging) and destination
-        # (in afs) of the build root for the current root marker.
-        subsource = os.path.abspath(os.path.join(afssource, subpath))
-        subtarget = os.path.abspath(os.path.join(afstarget, subpath))
-
-        # Create a filter list for rsync so that we copy exactly the
-        # directories we want to without deleting any existing
-        # directories in the published site that were placed there by
-        # previous builds.
-
-        # Exclude any directories under this subpath which have root
-        # markers.
-        excludes = []
-        for root, dirnames, filenames in os.walk(subtarget):
-            if '.root-marker' in filenames:
-                exclude_subpath = os.path.relpath(root, subtarget)
-                if exclude_subpath == '.':
-                    continue
-                excludes.append(os.path.join('/', exclude_subpath))
-        output['excludes'] = excludes
-
-        filter_file = os.path.join(afsroot, 'filter_%i' % root_count)
-
-        with open(filter_file, 'w') as f:
-            for exclude in excludes:
-                f.write('- %s\n' % exclude)
-
-        # Perform the rsync with the filter list.
-        rsync_cmd = ' '.join([
-            '/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
-            "--out-format='<<CHANGED>>%i %n%L'",
-            "--filter='merge {filter}'", '{src}/', '{dst}/',
-        ])
-        mkdir_cmd = ' '.join(['mkdir', '-p', '{dst}/'])
-        bash_cmd = ' '.join([
-            '/bin/bash', '-c', '"{mkdir_cmd} && {rsync_cmd}"'
-        ]).format(
-            mkdir_cmd=mkdir_cmd,
-            rsync_cmd=rsync_cmd)
-
-        k5start_cmd = ' '.join([
-            '/usr/bin/k5start', '-t', '-f', '{keytab}', '{user}', '--',
-            bash_cmd,
-        ])
-
-        shell_cmd = k5start_cmd.format(
-            src=subsource,
-            dst=subtarget,
-            filter=filter_file,
-            user=afsuser,
-            keytab=afskeytab),
-        output['source'] = subsource
-        output['destination'] = subtarget
-        output['output'] = subprocess.check_output(shell_cmd, shell=True)
-
-    return output_blocks
-
-
-def main():
-    module = AnsibleModule(
-        argument_spec=dict(
-            user=dict(required=True, type='raw'),
-            keytab=dict(required=True, type='raw'),
-            root=dict(required=True, type='raw'),
-            source=dict(required=True, type='raw'),
-            target=dict(required=True, type='raw'),
-        )
-    )
-
-    p = module.params
-    output = afs_sync(p['user'], p['keytab'], p['root'],
-                      p['source'], p['target'])
-    module.exit_json(changed=True, build_roots=output)
-
-from ansible.module_utils.basic import *  # noqa
-from ansible.module_utils.basic import AnsibleModule
-
-if __name__ == '__main__':
-    main()
diff --git a/zuul/configloader.py b/zuul/configloader.py
index aead0d8..c925024 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -507,7 +507,10 @@
                                             pre_run_name, job.roles,
                                             secrets)
             job.pre_run = job.pre_run + (pre_run,)
-        for post_run_name in as_list(conf.get('post-run')):
+        # NOTE(pabelanger): Reverse the order of our post-run list. We prepend
+        # post-runs for inherits however, we want to execute post-runs in the
+        # order they are listed within the job.
+        for post_run_name in reversed(as_list(conf.get('post-run'))):
             post_run = model.PlaybookContext(job.source_context,
                                              post_run_name, job.roles,
                                              secrets)
@@ -1402,7 +1405,17 @@
                     tenant, config_semaphore)
                 if 'semaphore' not in classes:
                     continue
-                layout.addSemaphore(SemaphoreParser.fromYaml(config_semaphore))
+                semaphore = SemaphoreParser.fromYaml(config_semaphore)
+                old_semaphore = layout.semaphores.get(semaphore.name)
+                if (old_semaphore and
+                    (old_semaphore.source_context.project ==
+                     semaphore.source_context.project)):
+                    # If a semaphore shows up twice in the same
+                    # project, it's probably due to showing up in
+                    # two branches.  Ignore subsequent
+                    # definitions.
+                    continue
+                layout.addSemaphore(semaphore)
 
         for config_template in data.project_templates:
             classes = TenantParser._getLoadClasses(tenant, config_template)
@@ -1494,7 +1507,9 @@
         if trusted:
             branches = ['master']
         else:
-            branches = project.source.getProjectBranches(project, tenant)
+            # Use the cached branch list; since this is a dynamic
+            # reconfiguration there should not be any branch changes.
+            branches = project.unparsed_branch_config.keys()
 
         for branch in branches:
             fns1 = []
diff --git a/zuul/driver/__init__.py b/zuul/driver/__init__.py
index 6ac9197..682b251 100644
--- a/zuul/driver/__init__.py
+++ b/zuul/driver/__init__.py
@@ -258,25 +258,22 @@
     """
 
     @abc.abstractmethod
-    def getPopen(self, **kwargs):
-        """Create and return a subprocess.Popen factory wrapped however the
-        driver sees fit.
+    def getExecutionContext(self, ro_paths=None, rw_paths=None, secrets=None):
+        """Create and return an execution context.
+
+        The execution context is meant to be used for a single
+        invocation of a command.
 
         This method is required by the interface
 
-        :arg dict kwargs: key/values for use by driver as needed
-
-        :returns: a callable that takes the same args as subprocess.Popen
-        :rtype: Callable
-        """
-        pass
-
-    @abc.abstractmethod
-    def setMountsMap(self, state_dir, ro_paths=None, rw_paths=None):
-        """Add additional mount point to the execution environment.
-
-        :arg str state_dir: the state directory to be read write
         :arg list ro_paths: read only files or directories to bind mount
         :arg list rw_paths: read write files or directories to bind mount
+        :arg dict secrets: a dictionary where the key is a file path,
+             and the value is the content which should be written to
+             that path in a secure manner.
+
+        :returns: a new ExecutionContext object.
+        :rtype: BaseExecutionContext
+
         """
         pass
diff --git a/zuul/driver/bubblewrap/__init__.py b/zuul/driver/bubblewrap/__init__.py
index cbaa609..a8969cc 100644
--- a/zuul/driver/bubblewrap/__init__.py
+++ b/zuul/driver/bubblewrap/__init__.py
@@ -22,18 +22,19 @@
 import shlex
 import subprocess
 import sys
+import threading
 import re
 
 from typing import Dict, List  # flake8: noqa
 
 from zuul.driver import (Driver, WrapperInterface)
+from zuul.execution_context import BaseExecutionContext
 
 
 class WrappedPopen(object):
-    def __init__(self, command, passwd_r, group_r):
+    def __init__(self, command, fds):
         self.command = command
-        self.passwd_r = passwd_r
-        self.group_r = group_r
+        self.fds = fds
 
     def __call__(self, args, *sub_args, **kwargs):
         try:
@@ -45,7 +46,7 @@
                 # versions. So until we are py3 only we can only bwrap
                 # things that are close_fds=False
                 pass_fds = list(kwargs.get('pass_fds', []))
-                for fd in (self.passwd_r, self.group_r):
+                for fd in self.fds:
                     if fd not in pass_fds:
                         pass_fds.append(fd)
                 kwargs['pass_fds'] = pass_fds
@@ -55,42 +56,32 @@
         return proc
 
     def __del__(self):
-        if self.passwd_r:
+        for fd in self.fds:
             try:
-                os.close(self.passwd_r)
+                os.close(fd)
             except OSError:
                 pass
-            self.passwd_r = None
-        if self.group_r:
-            try:
-                os.close(self.group_r)
-            except OSError:
-                pass
-            self.group_r = None
+        self.fds = []
 
 
-class BubblewrapDriver(Driver, WrapperInterface):
-    name = 'bubblewrap'
-    log = logging.getLogger("zuul.BubblewrapDriver")
+class BubblewrapExecutionContext(BaseExecutionContext):
+    log = logging.getLogger("zuul.BubblewrapExecutionContext")
 
-    mounts_map = {'rw': [], 'ro': []}  # type: Dict[str, List]
-    release_file_re = re.compile('^\W+-release$')
-
-    def __init__(self):
-        self.bwrap_command = self._bwrap_command()
-
-    def reconfigure(self, tenant):
-        pass
-
-    def stop(self):
-        pass
-
-    def setMountsMap(self, ro_paths=None, rw_paths=None):
-        if not ro_paths:
-            ro_paths = []
-        if not rw_paths:
-            rw_paths = []
+    def __init__(self, bwrap_command, ro_paths, rw_paths, secrets):
+        self.bwrap_command = bwrap_command
         self.mounts_map = {'ro': ro_paths, 'rw': rw_paths}
+        self.secrets = secrets
+
+    def startPipeWriter(self, pipe, data):
+        # In case we have a large amount of data to write through a
+        # pipe, spawn a thread to handle the writes.
+        t = threading.Thread(target=self._writer, args=(pipe, data))
+        t.daemon = True
+        t.start()
+
+    def _writer(self, pipe, data):
+        os.write(pipe, data)
+        os.close(pipe)
 
     def getPopen(self, **kwargs):
         # Set zuul_dir if it was not passed in
@@ -110,6 +101,9 @@
             for bind in self.mounts_map[mount_type]:
                 bwrap_command.extend([bind_arg, bind, bind])
 
+        # A list of file descriptors which must be held open so that
+        # bwrap may read from them.
+        read_fds = []
         # Need users and groups
         uid = os.getuid()
         passwd = list(pwd.getpwuid(uid))
@@ -121,6 +115,7 @@
         os.write(passwd_w, passwd_bytes)
         os.write(passwd_w, b'\n')
         os.close(passwd_w)
+        read_fds.append(passwd_r)
 
         gid = os.getgid()
         group = grp.getgrgid(gid)
@@ -130,6 +125,20 @@
         os.write(group_w, group_bytes)
         os.write(group_w, b'\n')
         os.close(group_w)
+        read_fds.append(group_r)
+
+        # Create a tmpfs for each directory which holds secrets, and
+        # tell bubblewrap to write the contents to a file therein.
+        secret_dirs = set()
+        for fn, content in self.secrets.items():
+            secret_dir = os.path.dirname(fn)
+            if secret_dir not in secret_dirs:
+                bwrap_command.extend(['--tmpfs', secret_dir])
+                secret_dirs.add(secret_dir)
+            secret_r, secret_w = os.pipe()
+            self.startPipeWriter(secret_w, content.encode('utf8'))
+            bwrap_command.extend(['--file', str(secret_r), fn])
+            read_fds.append(secret_r)
 
         kwargs = dict(kwargs)  # Don't update passed in dict
         kwargs['uid'] = uid
@@ -141,10 +150,26 @@
         self.log.debug("Bubblewrap command: %s",
                        " ".join(shlex.quote(c) for c in command))
 
-        wrapped_popen = WrappedPopen(command, passwd_r, group_r)
+        wrapped_popen = WrappedPopen(command, read_fds)
 
         return wrapped_popen
 
+
+class BubblewrapDriver(Driver, WrapperInterface):
+    log = logging.getLogger("zuul.BubblewrapDriver")
+    name = 'bubblewrap'
+
+    release_file_re = re.compile('^\W+-release$')
+
+    def __init__(self):
+        self.bwrap_command = self._bwrap_command()
+
+    def reconfigure(self, tenant):
+        pass
+
+    def stop(self):
+        pass
+
     def _bwrap_command(self):
         bwrap_command = [
             'bwrap',
@@ -185,6 +210,18 @@
 
         return bwrap_command
 
+    def getExecutionContext(self, ro_paths=None, rw_paths=None, secrets=None):
+        if not ro_paths:
+            ro_paths = []
+        if not rw_paths:
+            rw_paths = []
+        if not secrets:
+            secrets = {}
+        return BubblewrapExecutionContext(
+            self.bwrap_command,
+            ro_paths, rw_paths,
+            secrets)
+
 
 def main(args=None):
     logging.basicConfig(level=logging.DEBUG)
@@ -192,18 +229,27 @@
     driver = BubblewrapDriver()
 
     parser = argparse.ArgumentParser()
-    parser.add_argument('--ro-bind', nargs='+')
-    parser.add_argument('--rw-bind', nargs='+')
+    parser.add_argument('--ro-paths', nargs='+')
+    parser.add_argument('--rw-paths', nargs='+')
+    parser.add_argument('--secret', nargs='+')
     parser.add_argument('work_dir')
     parser.add_argument('run_args', nargs='+')
     cli_args = parser.parse_args()
 
     ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK')
 
-    driver.setMountsMap(cli_args.ro_bind, cli_args.rw_bind)
+    secrets = {}
+    if cli_args.secret:
+        for secret in cli_args.secret:
+            fn, content = secret.split('=', 1)
+            secrets[fn]=content
 
-    popen = driver.getPopen(work_dir=cli_args.work_dir,
-                            ssh_auth_sock=ssh_auth_sock)
+    context = driver.getExecutionContext(
+        cli_args.ro_paths, cli_args.rw_paths,
+        secrets)
+
+    popen = context.getPopen(work_dir=cli_args.work_dir,
+                             ssh_auth_sock=ssh_auth_sock)
     x = popen(cli_args.run_args)
     x.wait()
 
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index de72c69..35137c7 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -122,6 +122,17 @@
                              "from Gerrit. Can not get account information." %
                              (event.type,))
 
+        # This checks whether the event created or deleted a branch so
+        # that Zuul may know to perform a reconfiguration on the
+        # project.
+        if event.type == 'ref-updated' and not event.ref.startswith('refs/'):
+            if event.oldrev == '0' * 40:
+                event.branch_created = True
+                event.branch = event.ref
+            if event.newrev == '0' * 40:
+                event.branch_deleted = True
+                event.branch = event.ref
+
         if event.change_number:
             # TODO(jhesketh): Check if the project exists?
             # and self.connection.sched.getProject(event.project_name):
@@ -219,13 +230,16 @@
                     stdout.channel.close()
             ret = stdout.channel.recv_exit_status()
             self.log.debug("SSH exit status: %s" % ret)
-            client.close()
 
             if ret and ret not in [-1, 130]:
                 raise Exception("Gerrit error executing stream-events")
         except:
             self.log.exception("Exception on ssh event stream:")
             time.sleep(5)
+        finally:
+            # If we don't close on exceptions to connect we can leak the
+            # connection and DoS Gerrit.
+            client.close()
 
     def run(self):
         while not self._stopped:
@@ -456,6 +470,9 @@
             self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
                            (change, dep_num, dep_ps))
             dep = self._getChange(dep_num, dep_ps, history=history)
+            # This is a git commit dependency. So we only ignore it if it is
+            # already merged. So even if it is "ABANDONED", we should not
+            # ignore it.
             if (not dep.is_merged) and dep not in needs_changes:
                 needs_changes.append(dep)
 
@@ -467,7 +484,7 @@
                            "change %s,%s" %
                            (change, dep_num, dep_ps))
             dep = self._getChange(dep_num, dep_ps, history=history)
-            if (not dep.is_merged) and dep not in needs_changes:
+            if dep.open and dep not in needs_changes:
                 needs_changes.append(dep)
         change.needs_changes = needs_changes
 
@@ -479,7 +496,7 @@
                 self.log.debug("Updating %s: Getting git-needed change %s,%s" %
                                (change, dep_num, dep_ps))
                 dep = self._getChange(dep_num, dep_ps, history=history)
-                if (not dep.is_merged) and dep.is_current_patchset:
+                if dep.open and dep.is_current_patchset:
                     needed_by_changes.append(dep)
 
         for record in self._getNeededByFromCommit(data['id'], change):
@@ -495,7 +512,7 @@
             refresh = (dep_num, dep_ps) not in history
             dep = self._getChange(
                 dep_num, dep_ps, refresh=refresh, history=history)
-            if (not dep.is_merged) and dep.is_current_patchset:
+            if dep.open and dep.is_current_patchset:
                 needed_by_changes.append(dep)
         change.needed_by_changes = needed_by_changes
 
@@ -720,16 +737,25 @@
         return out
 
     def _open(self):
-        client = paramiko.SSHClient()
-        client.load_system_host_keys()
-        client.set_missing_host_key_policy(paramiko.WarningPolicy())
-        client.connect(self.server,
-                       username=self.user,
-                       port=self.port,
-                       key_filename=self.keyfile)
-        transport = client.get_transport()
-        transport.set_keepalive(self.keepalive)
-        self.client = client
+        if self.client:
+            # Paramiko needs explicit closes, its possible we will open even
+            # with an unclosed client so explicitly close here.
+            self.client.close()
+        try:
+            client = paramiko.SSHClient()
+            client.load_system_host_keys()
+            client.set_missing_host_key_policy(paramiko.WarningPolicy())
+            client.connect(self.server,
+                           username=self.user,
+                           port=self.port,
+                           key_filename=self.keyfile)
+            transport = client.get_transport()
+            transport.set_keepalive(self.keepalive)
+            self.client = client
+        except Exception:
+            client.close()
+            self.client = None
+            raise
 
     def _ssh(self, command, stdin_data=None):
         if not self.client:
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 616e774..f31df6a 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -163,6 +163,14 @@
             # necessary for the scheduler to match against particular branches
             event.branch = ref_parts[2]
 
+        # This checks whether the event created or deleted a branch so
+        # that Zuul may know to perform a reconfiguration on the
+        # project.
+        if event.oldrev == '0' * 40:
+            event.branch_created = True
+        if event.newrev == '0' * 40:
+            event.branch_deleted = True
+
         return event
 
     def _event_pull_request(self, body):
@@ -982,5 +990,5 @@
 
 
 def getSchema():
-    github_connection = v.Any(str, v.Schema({}, extra=True))
+    github_connection = v.Any(str, v.Schema(dict))
     return github_connection
diff --git a/zuul/driver/nullwrap/__init__.py b/zuul/driver/nullwrap/__init__.py
index 50fea27..178e4c7 100644
--- a/zuul/driver/nullwrap/__init__.py
+++ b/zuul/driver/nullwrap/__init__.py
@@ -18,14 +18,30 @@
 import subprocess
 
 from zuul.driver import (Driver, WrapperInterface)
+from zuul.execution_context import BaseExecutionContext
+
+
+class NullExecutionContext(BaseExecutionContext):
+    log = logging.getLogger("zuul.NullExecutionContext")
+
+    def getPopen(self, **kwargs):
+        return subprocess.Popen
 
 
 class NullwrapDriver(Driver, WrapperInterface):
     name = 'nullwrap'
     log = logging.getLogger("zuul.NullwrapDriver")
 
-    def getPopen(self, **kwargs):
-        return subprocess.Popen
-
-    def setMountsMap(self, **kwargs):
-        pass
+    def getExecutionContext(self, ro_paths=None, rw_paths=None, secrets=None):
+        # The bubblewrap driver writes secrets to a tmpfs so that they
+        # don't hit the disk (unless the kernel swaps the memory to
+        # disk, which can be mitigated with encrypted swap).  We
+        # haven't implemented similar functionality in nullwrap, so
+        # for safety, raise an exception in that case.  If you are
+        # interested in implementing this functionality, please
+        # contact us on the mailing list.
+        if secrets:
+            raise NotImplementedError(
+                "The nullwrap driver does not support the use of secrets. "
+                "Consider using the bubblewrap driver instead.")
+        return NullExecutionContext()
diff --git a/zuul/execution_context/__init__.py b/zuul/execution_context/__init__.py
new file mode 100644
index 0000000..29b2912
--- /dev/null
+++ b/zuul/execution_context/__init__.py
@@ -0,0 +1,40 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+
+class BaseExecutionContext(object, metaclass=abc.ABCMeta):
+    """The execution interface returned by a wrapper.
+
+    Wrapper drivers return instances which implement this interface.
+
+    It is used to hold information and aid in the execution of a
+    single command.
+
+    """
+
+    @abc.abstractmethod
+    def getPopen(self, **kwargs):
+        """Create and return a subprocess.Popen factory wrapped however the
+        driver sees fit.
+
+        This method is required by the interface
+
+        :arg dict kwargs: key/values for use by driver as needed
+
+        :returns: a callable that takes the same args as subprocess.Popen
+        :rtype: Callable
+        """
+        pass
diff --git a/zuul/executor/ansiblelaunchserver.py b/zuul/executor/ansiblelaunchserver.py
deleted file mode 100644
index 18762b2..0000000
--- a/zuul/executor/ansiblelaunchserver.py
+++ /dev/null
@@ -1,1580 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-############################################################################
-# NOTE(jhesketh): This file has been superceeded by zuul/launcher/server.py.
-# It is kept here to make merging master back into v3 easier. Once closer
-# to completion it can be removed.
-############################################################################
-
-
-import json
-import logging
-import os
-import re
-import shutil
-import signal
-import socket
-import subprocess
-import tempfile
-import threading
-import time
-import traceback
-import uuid
-import Queue
-
-import gear
-import jenkins_jobs.builder
-import jenkins_jobs.formatter
-import zmq
-
-import zuul.ansible.library
-from zuul.lib import commandsocket
-from zuul.lib import yamlutil as yaml
-
-ANSIBLE_WATCHDOG_GRACE = 5 * 60
-ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
-ANSIBLE_DEFAULT_PRE_TIMEOUT = 10 * 60
-ANSIBLE_DEFAULT_POST_TIMEOUT = 30 * 60
-
-
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
-            'verbose', 'unverbose']
-
-
-def boolify(x):
-    if isinstance(x, str):
-        return bool(int(x))
-    return bool(x)
-
-
-class LaunchGearWorker(gear.TextWorker):
-    def __init__(self, *args, **kw):
-        self.__launch_server = kw.pop('launch_server')
-        super(LaunchGearWorker, self).__init__(*args, **kw)
-
-    def handleNoop(self, packet):
-        workers = len(self.__launch_server.node_workers)
-        delay = (workers ** 2) / 1000.0
-        time.sleep(delay)
-        return super(LaunchGearWorker, self).handleNoop(packet)
-
-
-class NodeGearWorker(gear.TextWorker):
-    MASS_DO = 101
-
-    def sendMassDo(self, functions):
-        names = [gear.convert_to_bytes(x) for x in functions]
-        data = b'\x00'.join(names)
-        new_function_dict = {}
-        for name in names:
-            new_function_dict[name] = gear.FunctionRecord(name)
-        self.broadcast_lock.acquire()
-        try:
-            p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
-            self.broadcast(p)
-            self.functions = new_function_dict
-        finally:
-            self.broadcast_lock.release()
-
-
-class Watchdog(object):
-    def __init__(self, timeout, function, args):
-        self.timeout = timeout
-        self.function = function
-        self.args = args
-        self.thread = threading.Thread(target=self._run)
-        self.thread.daemon = True
-
-    def _run(self):
-        while self._running and time.time() < self.end:
-            time.sleep(10)
-        if self._running:
-            self.function(*self.args)
-
-    def start(self):
-        self._running = True
-        self.end = time.time() + self.timeout
-        self.thread.start()
-
-    def stop(self):
-        self._running = False
-
-
-class JobDir(object):
-    def __init__(self, keep=False):
-        self.keep = keep
-        self.root = tempfile.mkdtemp()
-        self.ansible_root = os.path.join(self.root, 'ansible')
-        os.makedirs(self.ansible_root)
-        self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
-        self.inventory = os.path.join(self.ansible_root, 'inventory')
-        self.vars = os.path.join(self.ansible_root, 'vars.yaml')
-        self.pre_playbook = os.path.join(self.ansible_root, 'pre_playbook')
-        self.playbook = os.path.join(self.ansible_root, 'playbook')
-        self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
-        self.config = os.path.join(self.ansible_root, 'ansible.cfg')
-        self.pre_post_config = os.path.join(self.ansible_root,
-                                            'ansible_pre_post.cfg')
-        self.script_root = os.path.join(self.ansible_root, 'scripts')
-        self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
-        os.makedirs(self.script_root)
-        self.staging_root = os.path.join(self.root, 'staging')
-        os.makedirs(self.staging_root)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, etype, value, tb):
-        if not self.keep:
-            shutil.rmtree(self.root)
-
-
-class LaunchServer(object):
-    log = logging.getLogger("zuul.LaunchServer")
-    site_section_re = re.compile('site "(.*?)"')
-    node_section_re = re.compile('node "(.*?)"')
-
-    def __init__(self, config, keep_jobdir=False):
-        self.config = config
-        self.options = dict(
-            verbose=False
-        )
-        self.keep_jobdir = keep_jobdir
-        self.hostname = socket.gethostname()
-        self.registered_functions = set()
-        self.node_workers = {}
-        self.jobs = {}
-        self.builds = {}
-        self.zmq_send_queue = Queue.Queue()
-        self.termination_queue = Queue.Queue()
-        self.sites = {}
-        self.static_nodes = {}
-        self.command_map = dict(
-            reconfigure=self.reconfigure,
-            stop=self.stop,
-            pause=self.pause,
-            unpause=self.unpause,
-            release=self.release,
-            graceful=self.graceful,
-            verbose=self.verboseOn,
-            unverbose=self.verboseOff,
-        )
-
-        if config.has_option('launcher', 'accept_nodes'):
-            self.accept_nodes = config.getboolean('launcher',
-                                                  'accept_nodes')
-        else:
-            self.accept_nodes = True
-        self.config_accept_nodes = self.accept_nodes
-
-        if self.config.has_option('zuul', 'state_dir'):
-            state_dir = os.path.expanduser(
-                self.config.get('zuul', 'state_dir'))
-        else:
-            state_dir = '/var/lib/zuul'
-        path = os.path.join(state_dir, 'launcher.socket')
-        self.command_socket = commandsocket.CommandSocket(path)
-        ansible_dir = os.path.join(state_dir, 'ansible')
-        self.library_dir = os.path.join(ansible_dir, 'library')
-        if not os.path.exists(self.library_dir):
-            os.makedirs(self.library_dir)
-        self.pre_post_library_dir = os.path.join(ansible_dir,
-                                                 'pre_post_library')
-        if not os.path.exists(self.pre_post_library_dir):
-            os.makedirs(self.pre_post_library_dir)
-
-        library_path = os.path.dirname(os.path.abspath(
-            zuul.ansible.library.__file__))
-        # Ansible library modules that should be available to all
-        # playbooks:
-        all_libs = ['zuul_log.py', 'zuul_console.py', 'zuul_afs.py']
-        # Modules that should only be used by job playbooks:
-        job_libs = ['command.py']
-
-        for fn in all_libs:
-            shutil.copy(os.path.join(library_path, fn), self.library_dir)
-            shutil.copy(os.path.join(library_path, fn),
-                        self.pre_post_library_dir)
-        for fn in job_libs:
-            shutil.copy(os.path.join(library_path, fn), self.library_dir)
-
-        def get_config_default(section, option, default):
-            if config.has_option(section, option):
-                return config.get(section, option)
-            return default
-
-        for section in config.sections():
-            m = self.site_section_re.match(section)
-            if m:
-                sitename = m.group(1)
-                d = {}
-                d['host'] = get_config_default(section, 'host', None)
-                d['user'] = get_config_default(section, 'user', '')
-                d['pass'] = get_config_default(section, 'pass', '')
-                d['root'] = get_config_default(section, 'root', '/')
-                d['keytab'] = get_config_default(section, 'keytab', None)
-                self.sites[sitename] = d
-                continue
-            m = self.node_section_re.match(section)
-            if m:
-                nodename = m.group(1)
-                d = {}
-                d['name'] = nodename
-                d['host'] = config.get(section, 'host')
-                d['description'] = get_config_default(section,
-                                                      'description', '')
-                if config.has_option(section, 'labels'):
-                    d['labels'] = config.get(section, 'labels').split(',')
-                else:
-                    d['labels'] = []
-                self.static_nodes[nodename] = d
-                continue
-
-    def start(self):
-        self._gearman_running = True
-        self._zmq_running = True
-        self._reaper_running = True
-        self._command_running = True
-
-        # Setup ZMQ
-        self.zcontext = zmq.Context()
-        self.zsocket = self.zcontext.socket(zmq.PUB)
-        self.zsocket.bind("tcp://*:8888")
-
-        # Setup Gearman
-        server = self.config.get('gearman', 'server')
-        if self.config.has_option('gearman', 'port'):
-            port = self.config.get('gearman', 'port')
-        else:
-            port = 4730
-        self.worker = LaunchGearWorker('Zuul Launch Server',
-                                       launch_server=self)
-        self.worker.addServer(server, port)
-        self.log.debug("Waiting for server")
-        self.worker.waitForServer()
-        self.log.debug("Registering")
-        self.register()
-
-        # Start command socket
-        self.log.debug("Starting command processor")
-        self.command_socket.start()
-        self.command_thread = threading.Thread(target=self.runCommand)
-        self.command_thread.daemon = True
-        self.command_thread.start()
-
-        # Load JJB config
-        self.loadJobs()
-
-        # Start ZMQ worker thread
-        self.log.debug("Starting ZMQ processor")
-        self.zmq_thread = threading.Thread(target=self.runZMQ)
-        self.zmq_thread.daemon = True
-        self.zmq_thread.start()
-
-        # Start node worker reaper thread
-        self.log.debug("Starting reaper")
-        self.reaper_thread = threading.Thread(target=self.runReaper)
-        self.reaper_thread.daemon = True
-        self.reaper_thread.start()
-
-        # Start Gearman worker thread
-        self.log.debug("Starting worker")
-        self.gearman_thread = threading.Thread(target=self.run)
-        self.gearman_thread.daemon = True
-        self.gearman_thread.start()
-
-        # Start static workers
-        for node in self.static_nodes.values():
-            self.log.debug("Creating static node with arguments: %s" % (node,))
-            self._launchWorker(node)
-
-    def loadJobs(self):
-        self.log.debug("Loading jobs")
-        builder = JJB()
-        path = self.config.get('launcher', 'jenkins_jobs')
-        builder.load_files([path])
-        builder.parser.expandYaml()
-        unseen = set(self.jobs.keys())
-        for job in builder.parser.jobs:
-            builder.expandMacros(job)
-            self.jobs[job['name']] = job
-            unseen.discard(job['name'])
-        for name in unseen:
-            del self.jobs[name]
-
-    def register(self):
-        new_functions = set()
-        if self.accept_nodes:
-            new_functions.add("node_assign:zuul")
-        new_functions.add("stop:%s" % self.hostname)
-        new_functions.add("set_description:%s" % self.hostname)
-        new_functions.add("node_revoke:%s" % self.hostname)
-
-        for function in new_functions - self.registered_functions:
-            self.worker.registerFunction(function)
-        for function in self.registered_functions - new_functions:
-            self.worker.unRegisterFunction(function)
-        self.registered_functions = new_functions
-
-    def reconfigure(self):
-        self.log.debug("Reconfiguring")
-        self.loadJobs()
-        for node in self.node_workers.values():
-            try:
-                if node.isAlive():
-                    node.queue.put(dict(action='reconfigure'))
-            except Exception:
-                self.log.exception("Exception sending reconfigure command "
-                                   "to worker:")
-        self.log.debug("Reconfiguration complete")
-
-    def pause(self):
-        self.log.debug("Pausing")
-        self.accept_nodes = False
-        self.register()
-        for node in self.node_workers.values():
-            try:
-                if node.isAlive():
-                    node.queue.put(dict(action='pause'))
-            except Exception:
-                self.log.exception("Exception sending pause command "
-                                   "to worker:")
-        self.log.debug("Paused")
-
-    def unpause(self):
-        self.log.debug("Unpausing")
-        self.accept_nodes = self.config_accept_nodes
-        self.register()
-        for node in self.node_workers.values():
-            try:
-                if node.isAlive():
-                    node.queue.put(dict(action='unpause'))
-            except Exception:
-                self.log.exception("Exception sending unpause command "
-                                   "to worker:")
-        self.log.debug("Unpaused")
-
-    def release(self):
-        self.log.debug("Releasing idle nodes")
-        for node in self.node_workers.values():
-            if node.name in self.static_nodes:
-                continue
-            try:
-                if node.isAlive():
-                    node.queue.put(dict(action='release'))
-            except Exception:
-                self.log.exception("Exception sending release command "
-                                   "to worker:")
-        self.log.debug("Finished releasing idle nodes")
-
-    def graceful(self):
-        # Note: this is run in the command processing thread; no more
-        # external commands will be processed after this.
-        self.log.debug("Gracefully stopping")
-        self.pause()
-        self.release()
-        self.log.debug("Waiting for all builds to finish")
-        while self.builds:
-            time.sleep(5)
-        self.log.debug("All builds are finished")
-        self.stop()
-
-    def stop(self):
-        self.log.debug("Stopping")
-        # First, stop accepting new jobs
-        self._gearman_running = False
-        self._reaper_running = False
-        self.worker.shutdown()
-        # Then stop all of the workers
-        for node in self.node_workers.values():
-            try:
-                if node.isAlive():
-                    node.stop()
-            except Exception:
-                self.log.exception("Exception sending stop command to worker:")
-        # Stop ZMQ afterwords so that the send queue is flushed
-        self._zmq_running = False
-        self.zmq_send_queue.put(None)
-        self.zmq_send_queue.join()
-        # Stop command processing
-        self._command_running = False
-        self.command_socket.stop()
-        # Join the gearman thread which was stopped earlier.
-        self.gearman_thread.join()
-        # The command thread is joined in the join() method of this
-        # class, which is called by the command shell.
-        self.log.debug("Stopped")
-
-    def verboseOn(self):
-        self.log.debug("Enabling verbose mode")
-        self.options['verbose'] = True
-
-    def verboseOff(self):
-        self.log.debug("Disabling verbose mode")
-        self.options['verbose'] = False
-
-    def join(self):
-        self.command_thread.join()
-
-    def runCommand(self):
-        while self._command_running:
-            try:
-                command = self.command_socket.get()
-                self.command_map[command]()
-            except Exception:
-                self.log.exception("Exception while processing command")
-
-    def runZMQ(self):
-        while self._zmq_running or not self.zmq_send_queue.empty():
-            try:
-                item = self.zmq_send_queue.get()
-                self.log.debug("Got ZMQ event %s" % (item,))
-                if item is None:
-                    continue
-                self.zsocket.send(item)
-            except Exception:
-                self.log.exception("Exception while processing ZMQ events")
-            finally:
-                self.zmq_send_queue.task_done()
-
-    def run(self):
-        while self._gearman_running:
-            try:
-                job = self.worker.getJob()
-                try:
-                    if job.name.startswith('node_assign:'):
-                        self.log.debug("Got node_assign job: %s" % job.unique)
-                        self.assignNode(job)
-                    elif job.name.startswith('stop:'):
-                        self.log.debug("Got stop job: %s" % job.unique)
-                        self.stopJob(job)
-                    elif job.name.startswith('set_description:'):
-                        self.log.debug("Got set_description job: %s" %
-                                       job.unique)
-                        job.sendWorkComplete()
-                    elif job.name.startswith('node_revoke:'):
-                        self.log.debug("Got node_revoke job: %s" % job.unique)
-                        self.revokeNode(job)
-                    else:
-                        self.log.error("Unable to handle job %s" % job.name)
-                        job.sendWorkFail()
-                except Exception:
-                    self.log.exception("Exception while running job")
-                    job.sendWorkException(traceback.format_exc())
-            except gear.InterruptedError:
-                return
-            except Exception:
-                self.log.exception("Exception while getting job")
-
-    def assignNode(self, job):
-        args = json.loads(job.arguments)
-        self.log.debug("Assigned node with arguments: %s" % (args,))
-        self._launchWorker(args)
-        data = dict(manager=self.hostname)
-        job.sendWorkData(json.dumps(data))
-        job.sendWorkComplete()
-
-    def _launchWorker(self, args):
-        worker = NodeWorker(self.config, self.jobs, self.builds,
-                            self.sites, args['name'], args['host'],
-                            args['description'], args['labels'],
-                            self.hostname, self.zmq_send_queue,
-                            self.termination_queue, self.keep_jobdir,
-                            self.library_dir, self.pre_post_library_dir,
-                            self.options)
-        self.node_workers[worker.name] = worker
-
-        worker.thread = threading.Thread(target=worker.run)
-        worker.thread.start()
-
-    def revokeNode(self, job):
-        try:
-            args = json.loads(job.arguments)
-            self.log.debug("Revoke job with arguments: %s" % (args,))
-            name = args['name']
-            node = self.node_workers.get(name)
-            if not node:
-                self.log.debug("Unable to find worker %s" % (name,))
-                return
-            try:
-                if node.isAlive():
-                    node.queue.put(dict(action='stop'))
-                else:
-                    self.log.debug("Node %s is not alive while revoking node" %
-                                   (node.name,))
-            except Exception:
-                self.log.exception("Exception sending stop command "
-                                   "to worker:")
-        finally:
-            job.sendWorkComplete()
-
-    def stopJob(self, job):
-        try:
-            args = json.loads(job.arguments)
-            self.log.debug("Stop job with arguments: %s" % (args,))
-            unique = args['number']
-            build_worker_name = self.builds.get(unique)
-            if not build_worker_name:
-                self.log.debug("Unable to find build for job %s" % (unique,))
-                return
-            node = self.node_workers.get(build_worker_name)
-            if not node:
-                self.log.debug("Unable to find worker for job %s" % (unique,))
-                return
-            try:
-                if node.isAlive():
-                    node.queue.put(dict(action='abort'))
-                else:
-                    self.log.debug("Node %s is not alive while aborting job" %
-                                   (node.name,))
-            except Exception:
-                self.log.exception("Exception sending abort command "
-                                   "to worker:")
-        finally:
-            job.sendWorkComplete()
-
-    def runReaper(self):
-        # We don't actually care if all the events are processed
-        while self._reaper_running:
-            try:
-                item = self.termination_queue.get()
-                self.log.debug("Got termination event %s" % (item,))
-                if item is None:
-                    continue
-                worker = self.node_workers[item]
-                self.log.debug("Joining %s" % (item,))
-                worker.thread.join()
-                self.log.debug("Joined %s" % (item,))
-                del self.node_workers[item]
-            except Exception:
-                self.log.exception("Exception while processing "
-                                   "termination events:")
-            finally:
-                self.termination_queue.task_done()
-
-
-class NodeWorker(object):
-    retry_args = dict(register='task_result',
-                      until='task_result.rc == 0',
-                      retries=3,
-                      delay=30)
-
-    def __init__(self, config, jobs, builds, sites, name, host,
-                 description, labels, manager_name, zmq_send_queue,
-                 termination_queue, keep_jobdir, library_dir,
-                 pre_post_library_dir, options):
-        self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
-        self.log.debug("Creating node worker %s" % (name,))
-        self.config = config
-        self.jobs = jobs
-        self.builds = builds
-        self.sites = sites
-        self.name = name
-        self.host = host
-        self.description = description
-        if not isinstance(labels, list):
-            labels = [labels]
-        self.labels = labels
-        self.thread = None
-        self.registered_functions = set()
-        # If the unpaused Event is set, that means we should run jobs.
-        # If it is clear, then we are paused and should not run jobs.
-        self.unpaused = threading.Event()
-        self.unpaused.set()
-        self._running = True
-        self.queue = Queue.Queue()
-        self.manager_name = manager_name
-        self.zmq_send_queue = zmq_send_queue
-        self.termination_queue = termination_queue
-        self.keep_jobdir = keep_jobdir
-        self.running_job_lock = threading.Lock()
-        self.pending_registration = False
-        self.registration_lock = threading.Lock()
-        self._get_job_lock = threading.Lock()
-        self._got_job = False
-        self._job_complete_event = threading.Event()
-        self._running_job = False
-        self._aborted_job = False
-        self._watchdog_timeout = False
-        self._sent_complete_event = False
-        self.ansible_pre_proc = None
-        self.ansible_job_proc = None
-        self.ansible_post_proc = None
-        self.workspace_root = config.get('launcher', 'workspace_root')
-        if self.config.has_option('launcher', 'private_key_file'):
-            self.private_key_file = config.get('launcher', 'private_key_file')
-        else:
-            self.private_key_file = '~/.ssh/id_rsa'
-        if self.config.has_option('launcher', 'username'):
-            self.username = config.get('launcher', 'username')
-        else:
-            self.username = 'zuul'
-        self.library_dir = library_dir
-        self.pre_post_library_dir = pre_post_library_dir
-        self.options = options
-
-    def isAlive(self):
-        # Meant to be called from the manager
-        if self.thread and self.thread.is_alive():
-            return True
-        return False
-
-    def run(self):
-        self.log.debug("Node worker %s starting" % (self.name,))
-        server = self.config.get('gearman', 'server')
-        if self.config.has_option('gearman', 'port'):
-            port = self.config.get('gearman', 'port')
-        else:
-            port = 4730
-        self.worker = NodeGearWorker(self.name)
-        self.worker.addServer(server, port)
-        self.log.debug("Waiting for server")
-        self.worker.waitForServer()
-        self.log.debug("Registering")
-        self.register()
-
-        self.gearman_thread = threading.Thread(target=self.runGearman)
-        self.gearman_thread.daemon = True
-        self.gearman_thread.start()
-
-        self.log.debug("Started")
-
-        while self._running or not self.queue.empty():
-            try:
-                self._runQueue()
-            except Exception:
-                self.log.exception("Exception in queue manager:")
-
-    def stop(self):
-        # If this is called locally, setting _running will be
-        # effictive, if it's called remotely, it will not be, but it
-        # will be set by the queue thread.
-        self.log.debug("Submitting stop request")
-        self._running = False
-        self.unpaused.set()
-        self.queue.put(dict(action='stop'))
-        self.queue.join()
-
-    def pause(self):
-        self.unpaused.clear()
-        self.worker.stopWaitingForJobs()
-
-    def unpause(self):
-        self.unpaused.set()
-
-    def release(self):
-        # If this node is idle, stop it.
-        old_unpaused = self.unpaused.is_set()
-        if old_unpaused:
-            self.pause()
-        with self._get_job_lock:
-            if self._got_job:
-                self.log.debug("This worker is not idle")
-                if old_unpaused:
-                    self.unpause()
-                return
-        self.log.debug("Stopping due to release command")
-        self.queue.put(dict(action='stop'))
-
-    def _runQueue(self):
-        item = self.queue.get()
-        try:
-            if item['action'] == 'stop':
-                self.log.debug("Received stop request")
-                self._running = False
-                self.termination_queue.put(self.name)
-                if not self.abortRunningJob():
-                    self.sendFakeCompleteEvent()
-                else:
-                    self._job_complete_event.wait()
-                self.worker.shutdown()
-            if item['action'] == 'pause':
-                self.log.debug("Received pause request")
-                self.pause()
-            if item['action'] == 'unpause':
-                self.log.debug("Received unpause request")
-                self.unpause()
-            if item['action'] == 'release':
-                self.log.debug("Received release request")
-                self.release()
-            elif item['action'] == 'reconfigure':
-                self.log.debug("Received reconfigure request")
-                self.register()
-            elif item['action'] == 'abort':
-                self.log.debug("Received abort request")
-                self.abortRunningJob()
-        finally:
-            self.queue.task_done()
-
-    def runGearman(self):
-        while self._running:
-            try:
-                self.unpaused.wait()
-                if self._running:
-                    self._runGearman()
-            except Exception:
-                self.log.exception("Exception in gearman manager:")
-            with self._get_job_lock:
-                self._got_job = False
-
-    def _runGearman(self):
-        if self.pending_registration:
-            self.register()
-        with self._get_job_lock:
-            try:
-                job = self.worker.getJob()
-                self._got_job = True
-            except gear.InterruptedError:
-                return
-        self.log.debug("Node worker %s got job %s" % (self.name, job.name))
-        try:
-            if job.name not in self.registered_functions:
-                self.log.error("Unable to handle job %s" % job.name)
-                job.sendWorkFail()
-                return
-            self.launch(job)
-        except Exception:
-            self.log.exception("Exception while running job")
-            job.sendWorkException(traceback.format_exc())
-
-    def generateFunctionNames(self, job):
-        # This only supports "node: foo" and "node: foo || bar"
-        ret = set()
-        job_labels = job.get('node')
-        matching_labels = set()
-        if job_labels:
-            job_labels = [x.strip() for x in job_labels.split('||')]
-            matching_labels = set(self.labels) & set(job_labels)
-            if not matching_labels:
-                return ret
-        ret.add('build:%s' % (job['name'],))
-        for label in matching_labels:
-            ret.add('build:%s:%s' % (job['name'], label))
-        return ret
-
-    def register(self):
-        if not self.registration_lock.acquire(False):
-            self.log.debug("Registration already in progress")
-            return
-        try:
-            if self._running_job:
-                self.pending_registration = True
-                self.log.debug("Ignoring registration due to running job")
-                return
-            self.log.debug("Updating registration")
-            self.pending_registration = False
-            new_functions = set()
-            for job in self.jobs.values():
-                new_functions |= self.generateFunctionNames(job)
-            self.worker.sendMassDo(new_functions)
-            self.registered_functions = new_functions
-        finally:
-            self.registration_lock.release()
-
-    def abortRunningJob(self):
-        self._aborted_job = True
-        return self.abortRunningProc(self.ansible_job_proc)
-
-    def abortRunningProc(self, proc):
-        aborted = False
-        self.log.debug("Abort: acquiring job lock")
-        with self.running_job_lock:
-            if self._running_job:
-                self.log.debug("Abort: a job is running")
-                if proc:
-                    self.log.debug("Abort: sending kill signal to job "
-                                   "process group")
-                    try:
-                        pgid = os.getpgid(proc.pid)
-                        os.killpg(pgid, signal.SIGKILL)
-                        aborted = True
-                    except Exception:
-                        self.log.exception("Exception while killing "
-                                           "ansible process:")
-            else:
-                self.log.debug("Abort: no job is running")
-
-        return aborted
-
-    def launch(self, job):
-        self.log.info("Node worker %s launching job %s" %
-                      (self.name, job.name))
-
-        # Make sure we can parse what we need from the job first
-        args = json.loads(job.arguments)
-        offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
-        job_name = job.name.split(':')[1]
-
-        # Initialize the result so we have something regardless of
-        # whether the job actually runs
-        result = None
-        self._sent_complete_event = False
-        self._aborted_job = False
-        self._watchdog_timeout = False
-
-        try:
-            self.sendStartEvent(job_name, args)
-        except Exception:
-            self.log.exception("Exception while sending job start event")
-
-        try:
-            result = self.runJob(job, args)
-        except Exception:
-            self.log.exception("Exception while launching job thread")
-
-        self._running_job = False
-
-        try:
-            data = json.dumps(dict(result=result))
-            job.sendWorkComplete(data)
-        except Exception:
-            self.log.exception("Exception while sending job completion packet")
-
-        try:
-            self.sendCompleteEvent(job_name, result, args)
-        except Exception:
-            self.log.exception("Exception while sending job completion event")
-
-        try:
-            del self.builds[job.unique]
-        except Exception:
-            self.log.exception("Exception while clearing build record")
-
-        self._job_complete_event.set()
-        if offline and self._running:
-            self.stop()
-
-    def sendStartEvent(self, name, parameters):
-        build = dict(node_name=self.name,
-                     host_name=self.manager_name,
-                     parameters=parameters)
-
-        event = dict(name=name,
-                     build=build)
-
-        item = "onStarted %s" % json.dumps(event)
-        self.log.debug("Sending over ZMQ: %s" % (item,))
-        self.zmq_send_queue.put(item)
-
-    def sendCompleteEvent(self, name, status, parameters):
-        build = dict(status=status,
-                     node_name=self.name,
-                     host_name=self.manager_name,
-                     parameters=parameters)
-
-        event = dict(name=name,
-                     build=build)
-
-        item = "onFinalized %s" % json.dumps(event)
-        self.log.debug("Sending over ZMQ: %s" % (item,))
-        self.zmq_send_queue.put(item)
-        self._sent_complete_event = True
-
-    def sendFakeCompleteEvent(self):
-        if self._sent_complete_event:
-            return
-        self.sendCompleteEvent('zuul:launcher-shutdown',
-                               'SUCCESS', {})
-
-    def runJob(self, job, args):
-        self.ansible_pre_proc = None
-        self.ansible_job_proc = None
-        self.ansible_post_proc = None
-        result = None
-        with self.running_job_lock:
-            if not self._running:
-                return result
-            self._running_job = True
-            self._job_complete_event.clear()
-
-        self.log.debug("Job %s: beginning" % (job.unique,))
-        self.builds[job.unique] = self.name
-        with JobDir(self.keep_jobdir) as jobdir:
-            self.log.debug("Job %s: job root at %s" %
-                           (job.unique, jobdir.root))
-            timeout = self.prepareAnsibleFiles(jobdir, job, args)
-
-            data = {
-                'manager': self.manager_name,
-                'number': job.unique,
-            }
-            if ':' in self.host:
-                data['url'] = 'telnet://[%s]:19885' % self.host
-            else:
-                data['url'] = 'telnet://%s:19885' % self.host
-
-            job.sendWorkData(json.dumps(data))
-            job.sendWorkStatus(0, 100)
-
-            pre_status = self.runAnsiblePrePlaybook(jobdir)
-            if pre_status is None:
-                # These should really never fail, so return None and have
-                # zuul try again
-                return result
-
-            job_status = self.runAnsiblePlaybook(jobdir, timeout)
-            if job_status is None:
-                # The result of the job is indeterminate.  Zuul will
-                # run it again.
-                return result
-
-            post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
-            if not post_status:
-                result = 'POST_FAILURE'
-            elif job_status:
-                result = 'SUCCESS'
-            else:
-                result = 'FAILURE'
-
-            if self._aborted_job and not self._watchdog_timeout:
-                # A Null result will cause zuul to relaunch the job if
-                # it needs to.
-                result = None
-
-        return result
-
-    def getHostList(self):
-        return [('node', dict(
-            ansible_host=self.host, ansible_user=self.username))]
-
-    def _substituteVariables(self, text, variables):
-        def lookup(match):
-            return variables.get(match.group(1), '')
-        return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
-
-    def _getRsyncOptions(self, source, parameters):
-        # Treat the publisher source as a filter; ant and rsync behave
-        # fairly close in this manner, except for leading directories.
-        source = self._substituteVariables(source, parameters)
-        # If the source starts with ** then we want to match any
-        # number of directories, so don't anchor the include filter.
-        # If it does not start with **, then the intent is likely to
-        # at least start by matching an immediate file or subdirectory
-        # (even if later we have a ** in the middle), so in this case,
-        # anchor it to the root of the transfer (the workspace).
-        if not source.startswith('**'):
-            source = os.path.join('/', source)
-        # These options mean: include the thing we want, include any
-        # directories (so that we continue to search for the thing we
-        # want no matter how deep it is), exclude anything that
-        # doesn't match the thing we want or is a directory, then get
-        # rid of empty directories left over at the end.
-        rsync_opts = ['--include="%s"' % source,
-                      '--include="*/"',
-                      '--exclude="*"',
-                      '--prune-empty-dirs']
-        return rsync_opts
-
-    def _makeSCPTask(self, jobdir, publisher, parameters):
-        tasks = []
-        for scpfile in publisher['scp']['files']:
-            scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
-            os.chmod(scproot, 0o755)
-
-            site = publisher['scp']['site']
-            if scpfile.get('copy-console'):
-                # Include the local ansible directory in the console
-                # upload.  This uploads the playbook and ansible logs.
-                copyargs = dict(src=jobdir.ansible_root + '/',
-                                dest=os.path.join(scproot, '_zuul_ansible'))
-                task = dict(name='copy console log',
-                            copy=copyargs,
-                            delegate_to='127.0.0.1')
-                # This is a local copy and should not fail, so does
-                # not need a retry stanza.
-                tasks.append(task)
-
-                # Fetch the console log from the remote host.
-                src = '/tmp/console.html'
-                rsync_opts = []
-            else:
-                src = parameters['WORKSPACE']
-                if not src.endswith('/'):
-                    src = src + '/'
-                rsync_opts = self._getRsyncOptions(scpfile['source'],
-                                                   parameters)
-
-            syncargs = dict(src=src,
-                            dest=scproot,
-                            copy_links='yes',
-                            mode='pull')
-            if rsync_opts:
-                syncargs['rsync_opts'] = rsync_opts
-            task = dict(name='copy files from node',
-                        synchronize=syncargs)
-            if not scpfile.get('copy-after-failure'):
-                task['when'] = 'success|bool'
-            # We don't use retry_args here because there is a bug in
-            # the synchronize module that breaks subsequent attempts at
-            # retrying. Better to try once and get an accurate error
-            # message if it fails.
-            # https://github.com/ansible/ansible/issues/18281
-            tasks.append(task)
-
-            task = self._makeSCPTaskLocalAction(
-                site, scpfile, scproot, parameters)
-            task.update(self.retry_args)
-            tasks.append(task)
-        return tasks
-
-    def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
-        if site not in self.sites:
-            raise Exception("Undefined SCP site: %s" % (site,))
-        site = self.sites[site]
-        dest = scpfile['target'].lstrip('/')
-        dest = self._substituteVariables(dest, parameters)
-        dest = os.path.join(site['root'], dest)
-        dest = os.path.normpath(dest)
-        if not dest.startswith(site['root']):
-            raise Exception("Target path %s is not below site root" %
-                            (dest,))
-
-        rsync_cmd = [
-            '/usr/bin/rsync', '--delay-updates', '-F',
-            '--compress', '-rt', '--safe-links',
-            '--rsync-path="mkdir -p {dest} && rsync"',
-            '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
-            '-o StrictHostKeyChecking=no -q"',
-            '--out-format="<<CHANGED>>%i %n%L"',
-            '{source}', '"{user}@{host}:{dest}"'
-        ]
-        if scpfile.get('keep-hierarchy'):
-            source = '"%s/"' % scproot
-        else:
-            source = '`/usr/bin/find "%s" -type f`' % scproot
-        shellargs = ' '.join(rsync_cmd).format(
-            source=source,
-            dest=dest,
-            private_key_file=self.private_key_file,
-            host=site['host'],
-            user=site['user'])
-        task = dict(name='rsync logs to server',
-                    shell=shellargs,
-                    delegate_to='127.0.0.1')
-        if not scpfile.get('copy-after-failure'):
-            task['when'] = 'success|bool'
-
-        return task
-
-    def _makeFTPTask(self, jobdir, publisher, parameters):
-        tasks = []
-        ftp = publisher['ftp']
-        site = ftp['site']
-        if site not in self.sites:
-            raise Exception("Undefined FTP site: %s" % site)
-        site = self.sites[site]
-
-        ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
-        ftpcontent = os.path.join(ftproot, 'content')
-        os.makedirs(ftpcontent)
-        ftpscript = os.path.join(ftproot, 'script')
-
-        src = parameters['WORKSPACE']
-        if not src.endswith('/'):
-            src = src + '/'
-        rsync_opts = self._getRsyncOptions(ftp['source'],
-                                           parameters)
-        syncargs = dict(src=src,
-                        dest=ftpcontent,
-                        copy_links='yes',
-                        mode='pull')
-        if rsync_opts:
-            syncargs['rsync_opts'] = rsync_opts
-        task = dict(name='copy files from node',
-                    synchronize=syncargs,
-                    when='success|bool')
-        # We don't use retry_args here because there is a bug in the
-        # synchronize module that breaks subsequent attempts at retrying.
-        # Better to try once and get an accurate error message if it fails.
-        # https://github.com/ansible/ansible/issues/18281
-        tasks.append(task)
-        task = dict(name='FTP files to server',
-                    shell='lftp -f %s' % ftpscript,
-                    when='success|bool',
-                    delegate_to='127.0.0.1')
-        ftpsource = ftpcontent
-        if ftp.get('remove-prefix'):
-            ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
-        while ftpsource[-1] == '/':
-            ftpsource = ftpsource[:-1]
-        ftptarget = ftp['target'].lstrip('/')
-        ftptarget = self._substituteVariables(ftptarget, parameters)
-        ftptarget = os.path.join(site['root'], ftptarget)
-        ftptarget = os.path.normpath(ftptarget)
-        if not ftptarget.startswith(site['root']):
-            raise Exception("Target path %s is not below site root" %
-                            (ftptarget,))
-        while ftptarget[-1] == '/':
-            ftptarget = ftptarget[:-1]
-        with open(ftpscript, 'w') as script:
-            script.write('open %s\n' % site['host'])
-            script.write('user %s %s\n' % (site['user'], site['pass']))
-            script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
-        task.update(self.retry_args)
-        tasks.append(task)
-        return tasks
-
-    def _makeAFSTask(self, jobdir, publisher, parameters):
-        tasks = []
-        afs = publisher['afs']
-        site = afs['site']
-        if site not in self.sites:
-            raise Exception("Undefined AFS site: %s" % site)
-        site = self.sites[site]
-
-        afsroot = tempfile.mkdtemp(dir=jobdir.staging_root)
-        afscontent = os.path.join(afsroot, 'content')
-        afssource = afscontent
-        if afs.get('remove-prefix'):
-            afssource = os.path.join(afscontent, afs['remove-prefix'])
-        while afssource[-1] == '/':
-            afssource = afssource[:-1]
-
-        src = parameters['WORKSPACE']
-        if not src.endswith('/'):
-            src = src + '/'
-        rsync_opts = self._getRsyncOptions(afs['source'],
-                                           parameters)
-        syncargs = dict(src=src,
-                        dest=afscontent,
-                        copy_links='yes',
-                        mode='pull')
-        if rsync_opts:
-            syncargs['rsync_opts'] = rsync_opts
-        task = dict(name='copy files from node',
-                    synchronize=syncargs,
-                    when='success|bool')
-        # We don't use retry_args here because there is a bug in the
-        # synchronize module that breaks subsequent attempts at retrying.
-        # Better to try once and get an accurate error message if it fails.
-        # https://github.com/ansible/ansible/issues/18281
-        tasks.append(task)
-
-        afstarget = afs['target'].lstrip('/')
-        afstarget = self._substituteVariables(afstarget, parameters)
-        afstarget = os.path.join(site['root'], afstarget)
-        afstarget = os.path.normpath(afstarget)
-        if not afstarget.startswith(site['root']):
-            raise Exception("Target path %s is not below site root" %
-                            (afstarget,))
-
-        afsargs = dict(user=site['user'],
-                       keytab=site['keytab'],
-                       root=afsroot,
-                       source=afssource,
-                       target=afstarget)
-
-        task = dict(name='Synchronize files to AFS',
-                    zuul_afs=afsargs,
-                    when='success|bool',
-                    delegate_to='127.0.0.1')
-        tasks.append(task)
-
-        return tasks
-
-    def _makeBuilderTask(self, jobdir, builder, parameters, sequence):
-        tasks = []
-        script_fn = '%02d-%s.sh' % (sequence, str(uuid.uuid4().hex))
-        script_path = os.path.join(jobdir.script_root, script_fn)
-        with open(script_path, 'w') as script:
-            data = builder['shell']
-            if not data.startswith('#!'):
-                data = '#!/bin/bash -x\n %s' % (data,)
-            script.write(data)
-
-        remote_path = os.path.join('/tmp', script_fn)
-        copy = dict(src=script_path,
-                    dest=remote_path,
-                    mode=0o555)
-        task = dict(copy=copy)
-        tasks.append(task)
-
-        task = dict(command=remote_path)
-        task['name'] = 'command generated from JJB'
-        task['environment'] = "{{ zuul.environment }}"
-        task['args'] = dict(chdir=parameters['WORKSPACE'])
-        tasks.append(task)
-
-        filetask = dict(path=remote_path,
-                        state='absent')
-        task = dict(file=filetask)
-        tasks.append(task)
-
-        return tasks
-
-    def _transformPublishers(self, jjb_job):
-        early_publishers = []
-        late_publishers = []
-        old_publishers = jjb_job.get('publishers', [])
-        for publisher in old_publishers:
-            early_scpfiles = []
-            late_scpfiles = []
-            if 'scp' not in publisher:
-                early_publishers.append(publisher)
-                continue
-            copy_console = False
-            for scpfile in publisher['scp']['files']:
-                if scpfile.get('copy-console'):
-                    scpfile['keep-hierarchy'] = True
-                    late_scpfiles.append(scpfile)
-                    copy_console = True
-                else:
-                    early_scpfiles.append(scpfile)
-            publisher['scp']['files'] = early_scpfiles + late_scpfiles
-            if copy_console:
-                late_publishers.append(publisher)
-            else:
-                early_publishers.append(publisher)
-        publishers = early_publishers + late_publishers
-        if old_publishers != publishers:
-            self.log.debug("Transformed job publishers")
-        return early_publishers, late_publishers
-
-    def prepareAnsibleFiles(self, jobdir, gearman_job, args):
-        job_name = gearman_job.name.split(':')[1]
-        jjb_job = self.jobs[job_name]
-
-        parameters = args.copy()
-        parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
-
-        with open(jobdir.inventory, 'w') as inventory:
-            for host_name, host_vars in self.getHostList():
-                inventory.write(host_name)
-                for k, v in host_vars.items():
-                    inventory.write(' %s=%s' % (k, v))
-                inventory.write('\n')
-
-        timeout = None
-        timeout_var = None
-        for wrapper in jjb_job.get('wrappers', []):
-            if isinstance(wrapper, dict):
-                build_timeout = wrapper.get('timeout')
-                if isinstance(build_timeout, dict):
-                    timeout_var = build_timeout.get('timeout-var')
-                    timeout = build_timeout.get('timeout')
-                    if timeout is not None:
-                        timeout = int(timeout) * 60
-        if not timeout:
-            timeout = ANSIBLE_DEFAULT_TIMEOUT
-        if timeout_var:
-            parameters[timeout_var] = str(timeout * 1000)
-
-        with open(jobdir.vars, 'w') as vars_yaml:
-            variables = dict(
-                timeout=timeout,
-                environment=parameters,
-            )
-            zuul_vars = dict(zuul=variables)
-            vars_yaml.write(
-                yaml.safe_dump(zuul_vars, default_flow_style=False))
-
-        with open(jobdir.pre_playbook, 'w') as pre_playbook:
-
-            shellargs = "ssh-keyscan {{ ansible_host }} > %s" % (
-                jobdir.known_hosts)
-            tasks = []
-            tasks.append(dict(shell=shellargs, delegate_to='127.0.0.1'))
-
-            task = dict(file=dict(path='/tmp/console.html', state='absent'))
-            tasks.append(task)
-
-            task = dict(zuul_console=dict(path='/tmp/console.html',
-                                          port=19885))
-            tasks.append(task)
-
-            task = dict(file=dict(path=parameters['WORKSPACE'],
-                                  state='directory'))
-            tasks.append(task)
-
-            msg = [
-                "Launched by %s" % self.manager_name,
-                "Building remotely on %s in workspace %s" % (
-                    self.name, parameters['WORKSPACE'])]
-            task = dict(zuul_log=dict(msg=msg))
-            tasks.append(task)
-
-            play = dict(hosts='node', name='Job setup', tasks=tasks)
-            pre_playbook.write(
-                yaml.safe_dump([play], default_flow_style=False))
-
-        with open(jobdir.playbook, 'w') as playbook:
-            tasks = []
-
-            sequence = 0
-            for builder in jjb_job.get('builders', []):
-                if 'shell' in builder:
-                    sequence += 1
-                    tasks.extend(
-                        self._makeBuilderTask(jobdir, builder, parameters,
-                                              sequence))
-
-            play = dict(hosts='node', name='Job body', tasks=tasks)
-            playbook.write(yaml.safe_dump([play], default_flow_style=False))
-
-        early_publishers, late_publishers = self._transformPublishers(jjb_job)
-
-        with open(jobdir.post_playbook, 'w') as playbook:
-            blocks = []
-            for publishers in [early_publishers, late_publishers]:
-                block = []
-                for publisher in publishers:
-                    if 'scp' in publisher:
-                        block.extend(self._makeSCPTask(jobdir, publisher,
-                                                       parameters))
-                    if 'ftp' in publisher:
-                        block.extend(self._makeFTPTask(jobdir, publisher,
-                                                       parameters))
-                    if 'afs' in publisher:
-                        block.extend(self._makeAFSTask(jobdir, publisher,
-                                                       parameters))
-                blocks.append(block)
-
-            # The 'always' section contains the log publishing tasks,
-            # the 'block' contains all the other publishers.  This way
-            # we run the log publisher regardless of whether the rest
-            # of the publishers succeed.
-            tasks = []
-
-            task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"),
-                        when='success|bool')
-            blocks[0].insert(0, task)
-            task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"),
-                        when='not success|bool and not timedout|bool')
-            blocks[0].insert(0, task)
-            task = dict(zuul_log=dict(msg="Job timed out, result: FAILURE"),
-                        when='not success|bool and timedout|bool')
-            blocks[0].insert(0, task)
-
-            tasks.append(dict(block=blocks[0],
-                              always=blocks[1]))
-
-            play = dict(hosts='node', name='Publishers',
-                        tasks=tasks)
-            playbook.write(yaml.safe_dump([play], default_flow_style=False))
-
-        self._writeAnsibleConfig(jobdir, jobdir.config,
-                                 library=self.library_dir)
-        self._writeAnsibleConfig(jobdir, jobdir.pre_post_config,
-                                 library=self.pre_post_library_dir)
-
-        return timeout
-
-    def _writeAnsibleConfig(self, jobdir, fn, library):
-        with open(fn, 'w') as config:
-            config.write('[defaults]\n')
-            config.write('hostfile = %s\n' % jobdir.inventory)
-            config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
-            config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
-            config.write('private_key_file = %s\n' % self.private_key_file)
-            config.write('retry_files_enabled = False\n')
-            config.write('log_path = %s\n' % jobdir.ansible_log)
-            config.write('gathering = explicit\n')
-            config.write('library = %s\n' % library)
-            # TODO(mordred) This can be removed once we're using ansible 2.2
-            config.write('module_set_locale = False\n')
-            # bump the timeout because busy nodes may take more than
-            # 10s to respond
-            config.write('timeout = 30\n')
-
-            config.write('[ssh_connection]\n')
-            # NB: when setting pipelining = True, keep_remote_files
-            # must be False (the default).  Otherwise it apparently
-            # will override the pipelining option and effectively
-            # disable it.  Pipelining has a side effect of running the
-            # command without a tty (ie, without the -tt argument to
-            # ssh).  We require this behavior so that if a job runs a
-            # command which expects interactive input on a tty (such
-            # as sudo) it does not hang.
-            config.write('pipelining = True\n')
-            ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
-                "-o UserKnownHostsFile=%s" % jobdir.known_hosts
-            config.write('ssh_args = %s\n' % ssh_args)
-
-    def _ansibleTimeout(self, proc, msg):
-        self._watchdog_timeout = True
-        self.log.warning(msg)
-        self.abortRunningProc(proc)
-
-    def runAnsiblePrePlaybook(self, jobdir):
-        # Set LOGNAME env variable so Ansible log_path log reports
-        # the correct user.
-        env_copy = os.environ.copy()
-        env_copy['LOGNAME'] = 'zuul'
-        env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
-
-        if self.options['verbose']:
-            verbose = '-vvv'
-        else:
-            verbose = '-v'
-
-        cmd = ['ansible-playbook', jobdir.pre_playbook,
-               '-e@%s' % jobdir.vars, verbose]
-        self.log.debug("Ansible pre command: %s" % (cmd,))
-
-        self.ansible_pre_proc = subprocess.Popen(
-            cmd,
-            cwd=jobdir.ansible_root,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            preexec_fn=os.setsid,
-            env=env_copy,
-        )
-        ret = None
-        watchdog = Watchdog(ANSIBLE_DEFAULT_PRE_TIMEOUT,
-                            self._ansibleTimeout,
-                            (self.ansible_pre_proc,
-                             "Ansible pre timeout exceeded"))
-        watchdog.start()
-        try:
-            for line in iter(self.ansible_pre_proc.stdout.readline, b''):
-                line = line[:1024].rstrip()
-                self.log.debug("Ansible pre output: %s" % (line,))
-            ret = self.ansible_pre_proc.wait()
-        finally:
-            watchdog.stop()
-        self.log.debug("Ansible pre exit code: %s" % (ret,))
-        self.ansible_pre_proc = None
-        return ret == 0
-
-    def runAnsiblePlaybook(self, jobdir, timeout):
-        # Set LOGNAME env variable so Ansible log_path log reports
-        # the correct user.
-        env_copy = os.environ.copy()
-        env_copy['LOGNAME'] = 'zuul'
-        env_copy['ANSIBLE_CONFIG'] = jobdir.config
-
-        if self.options['verbose']:
-            verbose = '-vvv'
-        else:
-            verbose = '-v'
-
-        cmd = ['ansible-playbook', jobdir.playbook, verbose,
-               '-e@%s' % jobdir.vars]
-        self.log.debug("Ansible command: %s" % (cmd,))
-
-        self.ansible_job_proc = subprocess.Popen(
-            cmd,
-            cwd=jobdir.ansible_root,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            preexec_fn=os.setsid,
-            env=env_copy,
-        )
-        ret = None
-        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
-                            self._ansibleTimeout,
-                            (self.ansible_job_proc,
-                             "Ansible timeout exceeded"))
-        watchdog.start()
-        try:
-            for line in iter(self.ansible_job_proc.stdout.readline, b''):
-                line = line[:1024].rstrip()
-                self.log.debug("Ansible output: %s" % (line,))
-            ret = self.ansible_job_proc.wait()
-        finally:
-            watchdog.stop()
-        self.log.debug("Ansible exit code: %s" % (ret,))
-        self.ansible_job_proc = None
-        if self._watchdog_timeout:
-            return False
-        if ret == 3:
-            # AnsibleHostUnreachable: We had a network issue connecting to
-            # our zuul-worker.
-            return None
-        elif ret == -9:
-            # Received abort request.
-            return None
-        return ret == 0
-
-    def runAnsiblePostPlaybook(self, jobdir, success):
-        # Set LOGNAME env variable so Ansible log_path log reports
-        # the correct user.
-        env_copy = os.environ.copy()
-        env_copy['LOGNAME'] = 'zuul'
-        env_copy['ANSIBLE_CONFIG'] = jobdir.pre_post_config
-
-        if self.options['verbose']:
-            verbose = '-vvv'
-        else:
-            verbose = '-v'
-
-        cmd = ['ansible-playbook', jobdir.post_playbook,
-               '-e', 'success=%s' % success,
-               '-e', 'timedout=%s' % self._watchdog_timeout,
-               '-e@%s' % jobdir.vars,
-               verbose]
-        self.log.debug("Ansible post command: %s" % (cmd,))
-
-        self.ansible_post_proc = subprocess.Popen(
-            cmd,
-            cwd=jobdir.ansible_root,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            preexec_fn=os.setsid,
-            env=env_copy,
-        )
-        ret = None
-        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
-                            self._ansibleTimeout,
-                            (self.ansible_post_proc,
-                             "Ansible post timeout exceeded"))
-        watchdog.start()
-        try:
-            for line in iter(self.ansible_post_proc.stdout.readline, b''):
-                line = line[:1024].rstrip()
-                self.log.debug("Ansible post output: %s" % (line,))
-            ret = self.ansible_post_proc.wait()
-        finally:
-            watchdog.stop()
-        self.log.debug("Ansible post exit code: %s" % (ret,))
-        self.ansible_post_proc = None
-        return ret == 0
-
-
-class JJB(jenkins_jobs.builder.Builder):
-    def __init__(self):
-        self.global_config = None
-        self._plugins_list = []
-
-    def expandComponent(self, component_type, component, template_data):
-        component_list_type = component_type + 's'
-        new_components = []
-        if isinstance(component, dict):
-            name, component_data = next(iter(component.items()))
-            if template_data:
-                component_data = jenkins_jobs.formatter.deep_format(
-                    component_data, template_data, True)
-        else:
-            name = component
-            component_data = {}
-
-        new_component = self.parser.data.get(component_type, {}).get(name)
-        if new_component:
-            for new_sub_component in new_component[component_list_type]:
-                new_components.extend(
-                    self.expandComponent(component_type,
-                                         new_sub_component, component_data))
-        else:
-            new_components.append({name: component_data})
-        return new_components
-
-    def expandMacros(self, job):
-        for component_type in ['builder', 'publisher', 'wrapper']:
-            component_list_type = component_type + 's'
-            new_components = []
-            for new_component in job.get(component_list_type, []):
-                new_components.extend(self.expandComponent(component_type,
-                                                           new_component, {}))
-            job[component_list_type] = new_components
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 45937ef..6a39096 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -255,8 +255,10 @@
         self.roles_path = []
         self.ansible_config = os.path.join(self.root, 'ansible.cfg')
         self.project_link = os.path.join(self.root, 'project')
-        self.secrets = os.path.join(self.root, 'secrets.yaml')
-        self.has_secrets = False
+        self.secrets_root = os.path.join(self.root, 'secrets')
+        os.makedirs(self.secrets_root)
+        self.secrets = os.path.join(self.secrets_root, 'secrets.yaml')
+        self.secrets_content = None
 
     def addRole(self):
         count = len(self.roles)
@@ -922,7 +924,6 @@
         args = json.loads(self.job.arguments)
         self.log.debug("Beginning job %s for ref %s" %
                        (self.job.name, args['zuul']['ref']))
-        self.log.debug("Args: %s" % (self.job.arguments,))
         self.log.debug("Job root: %s" % (self.jobdir.root,))
         tasks = []
         projects = set()
@@ -1266,6 +1267,13 @@
         for role in playbook['roles']:
             self.prepareRole(jobdir_playbook, role, args)
 
+        secrets = playbook['secrets']
+        if secrets:
+            if 'zuul' in secrets:
+                raise Exception("Defining secrets named 'zuul' is not allowed")
+            jobdir_playbook.secrets_content = yaml.safe_dump(
+                secrets, default_flow_style=False)
+
         self.writeAnsibleConfig(jobdir_playbook, playbook)
 
     def checkoutTrustedProject(self, project, branch):
@@ -1390,15 +1398,6 @@
     def writeAnsibleConfig(self, jobdir_playbook, playbook):
         trusted = jobdir_playbook.trusted
 
-        secrets = playbook['secrets'].copy()
-        if secrets:
-            if 'zuul' in secrets:
-                raise Exception("Defining secrets named 'zuul' is not allowed")
-            with open(jobdir_playbook.secrets, 'w') as secrets_yaml:
-                secrets_yaml.write(
-                    yaml.safe_dump(secrets, default_flow_style=False))
-            jobdir_playbook.has_secrets = True
-
         # TODO(mordred) This should likely be extracted into a more generalized
         #               mechanism for deployers being able to add callback
         #               plugins.
@@ -1406,17 +1405,13 @@
             callback_path = '%s:%s' % (
                 self.executor_server.callback_dir,
                 os.path.dirname(ara_callbacks.__file__))
-            callback_whitelist = 'zuul_json,ara'
         else:
             callback_path = self.executor_server.callback_dir
-            callback_whitelist = 'zuul_json'
         with open(jobdir_playbook.ansible_config, 'w') as config:
             config.write('[defaults]\n')
             config.write('hostfile = %s\n' % self.jobdir.inventory)
             config.write('local_tmp = %s/local_tmp\n' %
                          self.jobdir.ansible_cache_root)
-            config.write('remote_tmp = %s/remote_tmp\n' %
-                         self.jobdir.ansible_cache_root)
             config.write('retry_files_enabled = False\n')
             config.write('gathering = smart\n')
             config.write('fact_caching = jsonfile\n')
@@ -1427,7 +1422,6 @@
             config.write('command_warnings = False\n')
             config.write('callback_plugins = %s\n' % callback_path)
             config.write('stdout_callback = zuul_stream\n')
-            config.write('callback_whitelist = %s\n' % callback_whitelist)
             # bump the timeout because busy nodes may take more than
             # 10s to respond
             config.write('timeout = 30\n')
@@ -1446,7 +1440,7 @@
             # role. Otherwise, printing the args could be useful for
             # debugging.
             config.write('display_args_to_stdout = %s\n' %
-                         str(not secrets))
+                         str(not playbook['secrets']))
 
             config.write('[ssh_connection]\n')
             # NB: when setting pipelining = True, keep_remote_files
@@ -1514,10 +1508,14 @@
         if self.executor_variables_file:
             ro_paths.append(self.executor_variables_file)
 
-        self.executor_server.execution_wrapper.setMountsMap(ro_paths,
-                                                            rw_paths)
+        secrets = {}
+        if playbook.secrets_content:
+            secrets[playbook.secrets] = playbook.secrets_content
 
-        popen = self.executor_server.execution_wrapper.getPopen(
+        context = self.executor_server.execution_wrapper.getExecutionContext(
+            ro_paths, rw_paths, secrets)
+
+        popen = context.getPopen(
             work_dir=self.jobdir.work_root,
             ssh_auth_sock=env_copy.get('SSH_AUTH_SOCK'))
 
@@ -1598,7 +1596,7 @@
             verbose = '-v'
 
         cmd = ['ansible-playbook', verbose, playbook.path]
-        if playbook.has_secrets:
+        if playbook.secrets_content:
             cmd.extend(['-e', '@' + playbook.secrets])
 
         if success is not None:
diff --git a/zuul/model.py b/zuul/model.py
index 5a157bc..850bbe2 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1996,6 +1996,8 @@
         # common
         self.type = None
         self.branch_updated = False
+        self.branch_created = False
+        self.branch_deleted = False
         self.ref = None
         # For management events (eg: enqueue / promote)
         self.tenant_name = None
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a64d9e0..52b34ec 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -753,12 +753,15 @@
                                    "source %s",
                                    e.change, project.source)
                     continue
-                if (event.branch_updated and
-                    hasattr(change, 'files') and
-                    change.updatesConfig()):
-                    # The change that just landed updates the config.
-                    # Clear out cached data for this project and
-                    # perform a reconfiguration.
+                if ((event.branch_updated and
+                     hasattr(change, 'files') and
+                     change.updatesConfig()) or
+                    event.branch_created or
+                    event.branch_deleted):
+                    # The change that just landed updates the config
+                    # or a branch was just created or deleted.  Clear
+                    # out cached data for this project and perform a
+                    # reconfiguration.
                     change.project.unparsed_config = None
                     self.reconfigureTenant(tenant)
                 for pipeline in tenant.layout.pipelines.values():