Merge "Add getProjectBranches to Source" into feature/zuulv3
diff --git a/README.rst b/README.rst
index ff4d938..f1ff497 100644
--- a/README.rst
+++ b/README.rst
@@ -1,7 +1,11 @@
-Zuul is a trunk gating system developed for the OpenStack Project.
+Zuul is a project gating system developed for the OpenStack Project.
+We are currently engaged in a significant development effort in
+preparation for the third major version of Zuul.  We call this effort
+`Zuul v3`_ and it is described in more detail below.
@@ -11,11 +15,110 @@
 Bugs are handled at:!/project/679
-Code reviews are, as you might expect, handled by gerrit. The gerrit they
-use is
+Code reviews are, as you might expect, handled by gerrit at
-Use `git review` to submit patches (after creating a gerrit account that links to your launchpad account). Example::
+Use `git review` to submit patches (after creating a Gerrit account
+that links to your launchpad account). Example::
     # Do your commits
     $ git review
     # Enter your username if prompted
+Zuul v3
+The Zuul v3 effort involves significant changes to Zuul, and its
+companion program, Nodepool.  The intent is for Zuul to become more
+generally useful outside of the OpenStack community.  This is the best
+way to get started with this effort:
+1) Read the Zuul v3 spec:
+   We use specification documents like this to describe large efforts
+   where we want to make sure that all the participants are in
+   agreement about what will happen and generally how before starting
+   development.  These specs should contain enough information for
+   people to evaluate the proposal generally, and sometimes include
+   specific details that need to be agreed upon in advance.  They are
+   living documents which can change as work gets underway.  However,
+   every change or detail does not need to be reflected in the spec --
+   most work is simply done with patches (and revised if necessary in
+   code review).
+2) Read the Nodepool build-workers spec:
+3) Review any proposed updates to these specs:
+   Some of the information in the specs may be effectively superceded
+   by changes here, which are still undergoing review.
+4) Read documentation on the internal data model and testing:
+   The general philosophy for Zuul tests is to perform functional
+   testing of either the individual component or the entire end-to-end
+   system with external systems (such as Gerrit) replaced with fakes.
+   Before adding additional unit tests with a narrower focus, consider
+   whether they add value to this system or are merely duplicative of
+   functional tests.
+5) Review open changes:
+   We find that the most valuable code reviews are ones that spot
+   problems with the proposed change, or raise questions about how
+   that might affect other systems or subsequent work.  It is also a
+   great way to stay involved as a team in work performed by others
+   (for instance, by observing and asking questions about development
+   while it is in progress).  We try not to sweat the small things and
+   don't worry too much about style suggestions or other nitpicky
+   things (unless they are relevant -- for instance, a -1 vote on a
+   change that introduces a yaml change out of character with existing
+   conventions is useful because it makes the system more
+   user-friendly; a -1 vote on a change which uses a sub-optimal line
+   breaking strategy is probably not the best use of anyone's time).
+6) Join #zuul on Freenode.  Let others (especially jeblair who is
+   trying to coordinate and prioritize work) know what you would like
+   to work on.
+7) TODOv3(jeblair): Coming soon: check storyboard for status of
+   current work items.  We do not have a list of work items yet, but
+   we will soon.
+Once you are up to speed on those items, it will be helpful to know
+the following:
+* Zuul v3 includes some substantial changes to Zuul, and in order to
+  implement them quickly and simultaneously, we temporarily disabled
+  most of the test suite.  That test suite still has relevance, but
+  tests are likely to need updating individually, with reasons ranging
+  from something simple such as a test-framework method changing its
+  name, to more substantial issues, such as a feature being removed as
+  part of the v3 work.  Each test will need to be evaluated
+  individually.  Feel free to, at any time, claim a test name on this
+  etherpad and work on re-enabling it:
+* Because of the importance of external systems, as well as the number
+  of internal Zuul components, actually running Zuul in a development
+  mode quickly becomes unweildy (imagine uploading changes to Gerrit
+  repeatedly while altering Zuul source code).  Instead, the best way
+  to develop with Zuul is in fact to write a functional test.
+  Construct a test to fully simulate the series of events you want to
+  see, then run it in the foreground.  For example::
+    .tox/py27/bin/python -m tests.test_scheduler.TestScheduler.test_jobs_launched
+  See TESTING.rst for more information.
+* There are many occasions, when working on sweeping changes to Zuul
+  v3, we left notes for future work items in the code marked with
+  "TODOv3".  These represent potentially serious missing functionality
+  or other issues which must be resolved before an initial v3 release
+  (unlike a more conventional TODO note, these really can not be left
+  indefinitely).  These present an opportunity to identify work items
+  not otherwise tracked.  The names associated with TODO or TODOv3
+  items do not mean that only that person can address them -- they
+  simply reflect who to ask to explain the item in more detail if it
+  is too cryptic.  In your own work, feel free to leave TODOv3 notes
+  if a change would otherwise become too large or unweildy.
diff --git a/bindep.txt b/bindep.txt
new file mode 100644
index 0000000..a2cc02e
--- /dev/null
+++ b/bindep.txt
@@ -0,0 +1 @@
+libjpeg-dev [test]
diff --git a/other-requirements.txt b/other-requirements.txt
deleted file mode 100644
index 1ade655..0000000
--- a/other-requirements.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-mysql-client [test]
-mysql-server [test]
-postgresql [test]
-postgresql-client [test]
diff --git a/tests/fixtures/config/duplicate-pipeline/git/common-config/zuul.yaml b/tests/fixtures/config/duplicate-pipeline/git/common-config/zuul.yaml
new file mode 100755
index 0000000..bc88b06
--- /dev/null
+++ b/tests/fixtures/config/duplicate-pipeline/git/common-config/zuul.yaml
@@ -0,0 +1,46 @@
+- pipeline:
+    name: dup1
+    manager: independent
+    success-message: Build succeeded (dup1).
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: change-restored
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+- pipeline:
+    name: dup2
+    manager: independent
+    success-message: Build succeeded (dup2).
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: change-restored
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+- job:
+    name: project-test1
+- project:
+    name: org/project
+    dup1:
+      queue: integrated
+      jobs:
+        - project-test1
+    dup2:
+      queue: integrated
+      jobs:
+        - project-test1
diff --git a/tests/fixtures/config/duplicate-pipeline/git/org_project/README b/tests/fixtures/config/duplicate-pipeline/git/org_project/README
new file mode 100755
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/duplicate-pipeline/git/org_project/README
@@ -0,0 +1 @@
diff --git a/tests/fixtures/config/duplicate-pipeline/main.yaml b/tests/fixtures/config/duplicate-pipeline/main.yaml
new file mode 100755
index 0000000..ba2d8f5
--- /dev/null
+++ b/tests/fixtures/config/duplicate-pipeline/main.yaml
@@ -0,0 +1,6 @@
+- tenant:
+    name: tenant-duplicate
+    source:
+      gerrit:
+        config-repos:
+          - common-config
diff --git a/tests/fixtures/config/one-job-project/git/common-config/zuul.yaml b/tests/fixtures/config/one-job-project/git/common-config/zuul.yaml
new file mode 100644
index 0000000..148ba42
--- /dev/null
+++ b/tests/fixtures/config/one-job-project/git/common-config/zuul.yaml
@@ -0,0 +1,66 @@
+- pipeline:
+    name: check
+    manager: independent
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+- pipeline:
+    name: gate
+    manager: dependent
+    success-message: Build succeeded (gate).
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - approved: 1
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    start:
+      gerrit:
+        verified: 0
+    precedence: high
+- pipeline:
+    name: post
+    manager: independent
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: ref-updated
+          ref: ^(?!refs/).*$
+- job:
+    name: one-job-project-merge
+    hold-following-changes: true
+- job:
+    name: one-job-project-post
+- project:
+    name: org/one-job-project
+    check:
+      jobs:
+        - one-job-project-merge
+    gate:
+      jobs:
+        - one-job-project-merge
+    post:
+      jobs:
+        - one-job-project-post
diff --git a/tests/fixtures/config/one-job-project/git/org_one-job-project/README b/tests/fixtures/config/one-job-project/git/org_one-job-project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/one-job-project/git/org_one-job-project/README
@@ -0,0 +1 @@
diff --git a/tests/fixtures/config/one-job-project/main.yaml b/tests/fixtures/config/one-job-project/main.yaml
new file mode 100644
index 0000000..a22ed5c
--- /dev/null
+++ b/tests/fixtures/config/one-job-project/main.yaml
@@ -0,0 +1,6 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-repos:
+          - common-config
diff --git a/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml b/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
index 01de2aa..de477dd 100644
--- a/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/single-tenant/git/common-config/zuul.yaml
@@ -36,6 +36,19 @@
         verified: 0
     precedence: high
+- pipeline:
+    name: experimental
+    manager: independent
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit: {}
+    failure:
+      gerrit: {}
 - job:
     name: project-merge
     hold-following-changes: true
@@ -56,6 +69,9 @@
 - job:
     name: project-test2
+- job:
+    name: experimental-project-test
 - project:
     name: org/project
@@ -84,3 +100,11 @@
               - project-test1
               - project-test2
+- project:
+    name: org/experimental-project
+    experimental:
+      jobs:
+        - project-merge:
+            jobs:
+              - experimental-project-test
diff --git a/tests/fixtures/config/single-tenant/git/org_experimental-project/README b/tests/fixtures/config/single-tenant/git/org_experimental-project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/single-tenant/git/org_experimental-project/README
@@ -0,0 +1 @@
diff --git a/tests/fixtures/config/templated-project/git/common-config/zuul.yaml b/tests/fixtures/config/templated-project/git/common-config/zuul.yaml
new file mode 100644
index 0000000..ee0a3e4
--- /dev/null
+++ b/tests/fixtures/config/templated-project/git/common-config/zuul.yaml
@@ -0,0 +1,65 @@
+- pipeline:
+    name: check
+    manager: independent
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+- pipeline:
+    name: gate
+    manager: dependent
+    success-message: Build succeeded (gate).
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: comment-added
+          approval:
+            - approved: 1
+    success:
+      gerrit:
+        verified: 2
+        submit: true
+    failure:
+      gerrit:
+        verified: -2
+    start:
+      gerrit:
+        verified: 0
+    precedence: high
+- pipeline:
+    name: post
+    manager: independent
+    source:
+      gerrit
+    trigger:
+      gerrit:
+        - event: ref-updated
+          ref: ^(?!refs/).*$
+- project-template:
+    name: test-one-and-two
+    check:
+      jobs:
+        - project-test1
+        - project-test2
+- job:
+    name: project-test1
+- job:
+    name: project-test2
+- project:
+    name: org/templated-project
+    templates:
+      - test-one-and-two
diff --git a/tests/fixtures/config/templated-project/git/org_templated-project/README b/tests/fixtures/config/templated-project/git/org_templated-project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/templated-project/git/org_templated-project/README
@@ -0,0 +1 @@
diff --git a/tests/fixtures/config/templated-project/main.yaml b/tests/fixtures/config/templated-project/main.yaml
new file mode 100644
index 0000000..a22ed5c
--- /dev/null
+++ b/tests/fixtures/config/templated-project/main.yaml
@@ -0,0 +1,6 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-repos:
+          - common-config
diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml
index 7d52c17..dfe8ded 100644
--- a/tests/fixtures/layout.yaml
+++ b/tests/fixtures/layout.yaml
@@ -217,14 +217,6 @@
       - project3-post
-  - name: org/one-job-project
-    check:
-      - one-job-project-merge
-    gate:
-      - one-job-project-merge
-    post:
-      - one-job-project-post
   - name: org/nonvoting-project
       - nonvoting-project-merge:
@@ -237,11 +229,6 @@
       - nonvoting-project-post
-  - name: org/templated-project
-    template:
-      - name: test-one-and-two
-        projectname: project
   - name: org/layered-project
       - name: test-one-and-two
diff --git a/tests/ b/tests/
index abb0548..f91401c 100755
--- a/tests/
+++ b/tests/
@@ -32,6 +32,7 @@
 import zuul.rpcclient
 import zuul.reporter.gerrit
 import zuul.reporter.smtp
+import zuul.model
 from tests.base import (
@@ -109,30 +110,6 @@
-    @skip("Disabled for early v3 development")
-    def test_duplicate_pipelines(self):
-        "Test that a change matching multiple pipelines works"
-        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        self.fake_gerrit.addEvent(A.getChangeRestoredEvent())
-        self.waitUntilSettled()
-        self.assertEqual(len(self.history), 2)
-        self.history[0].name == 'project-test1'
-        self.history[1].name == 'project-test1'
-        self.assertEqual(len(A.messages), 2)
-        if 'dup1/project-test1' in A.messages[0]:
-            self.assertIn('dup1/project-test1', A.messages[0])
-            self.assertNotIn('dup2/project-test1', A.messages[0])
-            self.assertNotIn('dup1/project-test1', A.messages[1])
-            self.assertIn('dup2/project-test1', A.messages[1])
-        else:
-            self.assertIn('dup1/project-test1', A.messages[1])
-            self.assertNotIn('dup2/project-test1', A.messages[1])
-            self.assertNotIn('dup1/project-test1', A.messages[0])
-            self.assertIn('dup2/project-test1', A.messages[0])
     def test_parallel_changes(self):
         "Test that changes are tested in parallel and merged in series"
@@ -446,7 +423,6 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
-    @skip("Disabled for early v3 development")
     def test_failed_change_in_middle(self):
         "Test a failed change in the middle of the queue"
@@ -498,7 +474,8 @@
         # project-test1 and project-test2 for C
         self.assertEqual(len(self.builds), 5)
-        items = self.sched.layout.pipelines['gate'].getAllItems()
+        tenant = self.sched.abide.tenants.get('tenant-one')
+        items = tenant.layout.pipelines['gate'].getAllItems()
         builds = items[0].current_build_set.getBuilds()
         self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
         self.assertEqual(self.countJobResults(builds, None), 2)
@@ -765,7 +742,6 @@
         matcher = self.sched._parseSkipIf(config_job)
         self.assertEqual(expected, matcher)
-    @skip("Disabled for early v3 development")
     def test_patch_order(self):
         "Test that dependent patches are tested in the right order"
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -810,7 +786,6 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
-    @skip("Disabled for early v3 development")
     def test_needed_changes_enqueue(self):
         "Test that a needed change is enqueued ahead"
         #          A      Given a git tree like this, if we enqueue
@@ -860,7 +835,7 @@
         # triggering events.  Since it will have the changes cached
         # already (without approvals), we need to clear the cache
         # first.
-        for connection in self.connections.values():
+        for connection in self.connections.connections.values():
         self.launch_server.hold_jobs_in_build = True
@@ -896,7 +871,6 @@
                          '1,1 2,1 3,1 4,1 5,1 6,1 7,1')
-    @skip("Disabled for early v3 development")
     def test_source_cache(self):
         "Test that the source cache operates correctly"
         self.launch_server.hold_jobs_in_build = True
@@ -943,22 +917,30 @@
         self.assertEqual(A.queried, 2)  # Initial and isMerged
         self.assertEqual(B.queried, 3)  # Initial A, refresh from B, isMerged
-    @skip("Disabled for early v3 development")
     def test_can_merge(self):
         "Test whether a change is ready to merge"
         # TODO: move to test_gerrit (this is a unit test!)
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
-        source = self.sched.layout.pipelines['gate'].source
-        a = source._getChange(1, 2)
-        mgr = self.sched.layout.pipelines['gate'].manager
+        tenant = self.sched.abide.tenants.get('tenant-one')
+        source = tenant.layout.pipelines['gate'].source
+        # TODO(pabelanger): As we add more source / trigger APIs we should make
+        # it easier for users to create events for testing.
+        event = zuul.model.TriggerEvent()
+        event.trigger_name = 'gerrit'
+        event.change_number = '1'
+        event.patch_number = '2'
+        a = source.getChange(event)
+        mgr = tenant.layout.pipelines['gate'].manager
         self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
         A.addApproval('code-review', 2)
-        a = source._getChange(1, 2, refresh=True)
+        a = source.getChange(event, refresh=True)
         self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
         A.addApproval('approved', 1)
-        a = source._getChange(1, 2, refresh=True)
+        a = source.getChange(event, refresh=True)
         self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
     @skip("Disabled for early v3 development")
@@ -1245,38 +1227,6 @@
         self.assertNotEqual(commit_A, commit_B, commit_C)
     @skip("Disabled for early v3 development")
-    def test_one_job_project(self):
-        "Test that queueing works with one job"
-        A = self.fake_gerrit.addFakeChange('org/one-job-project',
-                                           'master', 'A')
-        B = self.fake_gerrit.addFakeChange('org/one-job-project',
-                                           'master', 'B')
-        A.addApproval('code-review', 2)
-        B.addApproval('code-review', 2)
-        self.fake_gerrit.addEvent(A.addApproval('approved', 1))
-        self.fake_gerrit.addEvent(B.addApproval('approved', 1))
-        self.waitUntilSettled()
-        self.assertEqual(['status'], 'MERGED')
-        self.assertEqual(A.reported, 2)
-        self.assertEqual(['status'], 'MERGED')
-        self.assertEqual(B.reported, 2)
-    @skip("Disabled for early v3 development")
-    def test_job_from_templates_launched(self):
-        "Test whether a job generated via a template can be launched"
-        A = self.fake_gerrit.addFakeChange(
-            'org/templated-project', 'master', 'A')
-        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
-        self.waitUntilSettled()
-        self.assertEqual(self.getJobFromHistory('project-test1').result,
-                         'SUCCESS')
-        self.assertEqual(self.getJobFromHistory('project-test2').result,
-                         'SUCCESS')
-    @skip("Disabled for early v3 development")
     def test_layered_templates(self):
         "Test whether a job generated via a template can be launched"
@@ -1298,7 +1248,6 @@
-    @skip("Disabled for early v3 development")
     def test_dependent_changes_dequeue(self):
         "Test that dependent patches are not needlessly tested"
@@ -1334,7 +1283,6 @@
         self.assertEqual(C.reported, 2)
         self.assertEqual(len(self.history), 1)
-    @skip("Disabled for early v3 development")
     def test_failing_dependent_changes(self):
         "Test that failing dependent patches are taken out of stream"
         self.launch_server.hold_jobs_in_build = True
@@ -1714,30 +1662,6 @@
         self.assertEqual(A.reported, 2)
     @skip("Disabled for early v3 development")
-    def test_single_nonexistent_post_job(self):
-        "Test launching a single post job that doesn't exist"
-        e = {
-            "type": "ref-updated",
-            "submitter": {
-                "name": "User Name",
-            },
-            "refUpdate": {
-                "oldRev": "90f173846e3af9154517b88543ffbd1691f31366",
-                "newRev": "d479a0bfcb34da57a31adb2a595c0cf687812543",
-                "refName": "master",
-                "project": "org/project",
-            }
-        }
-        # Set to the state immediately after a restart
-        self.resetGearmanServer()
-        self.launcher.negative_function_cache_ttl = 0
-        self.fake_gerrit.addEvent(e)
-        self.waitUntilSettled()
-        self.assertEqual(len(self.history), 0)
-    @skip("Disabled for early v3 development")
     def test_new_patchset_dequeues_old(self):
         "Test that a new patchset causes the old to be dequeued"
         # D -> C (depends on B) -> B (depends on A) -> A -> M
@@ -1878,7 +1802,6 @@
         self.assertEqual(self.history[3].result, 'SUCCESS')
         self.assertEqual(self.history[3].changes, '1,1 2,2')
-    @skip("Disabled for early v3 development")
     def test_abandoned_gate(self):
         "Test that an abandoned change is dequeued from gate"
@@ -1897,10 +1820,10 @@
-        self.assertEqual(len(self.builds), 0, "No job running")
-        self.assertEqual(len(self.history), 1, "Only one build in history")
-        self.assertEqual(self.history[0].result, 'ABORTED',
-                         "Build should have been aborted")
+        self.assertBuilds([])
+        self.assertHistory([
+            dict(name='project-merge', result='ABORTED', changes='1,1')],
+            ordered=False)
         self.assertEqual(A.reported, 1,
                          "Abandoned gate change should report only start")
@@ -3912,7 +3835,6 @@
         running_items = client.get_running_jobs()
         self.assertEqual(0, len(running_items))
-    @skip("Disabled for early v3 development")
     def test_nonvoting_pipeline(self):
         "Test that a nonvoting pipeline (experimental) can still report"
@@ -3920,6 +3842,9 @@
                                            'master', 'A')
+        self.assertEqual(self.getJobFromHistory('project-merge').result,
+                         'SUCCESS')
@@ -4474,6 +4399,26 @@
         self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2')
     @skip("Disabled for early v3 development")
+    def test_crd_check_unknown(self):
+        "Test unknown projects in independent pipeline"
+        self.init_repo("org/unknown")
+        A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/unknown', 'master', 'D')
+        # A Depends-On: B
+['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            A.subject,['id'])
+        # Make sure zuul has seen an event on B.
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertEqual(['status'], 'NEW')
+        self.assertEqual(A.reported, 1)
+        self.assertEqual(['status'], 'NEW')
+        self.assertEqual(B.reported, 0)
+    @skip("Disabled for early v3 development")
     def test_crd_cycle_join(self):
         "Test an updated change creates a cycle"
         A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
@@ -4671,3 +4616,69 @@
             '- docs-draft-test2 https://server/job/docs-draft-test2/1/',
+class TestDuplicatePipeline(ZuulTestCase):
+    tenant_config_file = 'config/duplicate-pipeline/main.yaml'
+    def test_duplicate_pipelines(self):
+        "Test that a change matching multiple pipelines works"
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getChangeRestoredEvent())
+        self.waitUntilSettled()
+        self.assertHistory([
+            dict(name='project-test1', result='SUCCESS', changes='1,1',
+                 pipeline='dup1'),
+            dict(name='project-test1', result='SUCCESS', changes='1,1',
+                 pipeline='dup2'),
+        ])
+        self.assertEqual(len(A.messages), 2)
+        self.assertIn('dup1', A.messages[0])
+        self.assertNotIn('dup2', A.messages[0])
+        self.assertIn('project-test1', A.messages[0])
+        self.assertIn('dup2', A.messages[1])
+        self.assertNotIn('dup1', A.messages[1])
+        self.assertIn('project-test1', A.messages[1])
+class TestSchedulerOneJobProject(ZuulTestCase):
+    tenant_config_file = 'config/one-job-project/main.yaml'
+    def test_one_job_project(self):
+        "Test that queueing works with one job"
+        A = self.fake_gerrit.addFakeChange('org/one-job-project',
+                                           'master', 'A')
+        B = self.fake_gerrit.addFakeChange('org/one-job-project',
+                                           'master', 'B')
+        A.addApproval('code-review', 2)
+        B.addApproval('code-review', 2)
+        self.fake_gerrit.addEvent(A.addApproval('approved', 1))
+        self.fake_gerrit.addEvent(B.addApproval('approved', 1))
+        self.waitUntilSettled()
+        self.assertEqual(['status'], 'MERGED')
+        self.assertEqual(A.reported, 2)
+        self.assertEqual(['status'], 'MERGED')
+        self.assertEqual(B.reported, 2)
+class TestSchedulerTemplatedProject(ZuulTestCase):
+    tenant_config_file = 'config/templated-project/main.yaml'
+    def test_job_from_templates_launched(self):
+        "Test whether a job generated via a template can be launched"
+        A = self.fake_gerrit.addFakeChange(
+            'org/templated-project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+        self.assertEqual(self.getJobFromHistory('project-test1').result,
+                         'SUCCESS')
+        self.assertEqual(self.getJobFromHistory('project-test2').result,
+                         'SUCCESS')
diff --git a/zuul/ansible/library/ b/zuul/ansible/library/
index 78f3249..e70dac8 100644
--- a/zuul/ansible/library/
+++ b/zuul/ansible/library/
@@ -60,28 +60,13 @@
 class Server(object):
     def __init__(self, path, port):
         self.path = path
-        s = None
-        for res in socket.getaddrinfo(None, port, socket.AF_UNSPEC,
-                                      socket.SOCK_STREAM, 0,
-                                      socket.AI_PASSIVE):
-            af, socktype, proto, canonname, sa = res
-            try:
-                s = socket.socket(af, socktype, proto)
-                s.setsockopt(socket.SOL_SOCKET,
-                             socket.SO_REUSEADDR, 1)
-            except socket.error:
-                s = None
-                continue
-            try:
-                s.bind(sa)
-                s.listen(1)
-            except socket.error:
-                s.close()
-                s = None
-                continue
-            break
-        if s is None:
-            sys.exit(1)
+        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+        s.setsockopt(socket.SOL_SOCKET,
+                     socket.SO_REUSEADDR, 1)
+        s.bind(('::', port))
+        s.listen(1)
         self.socket = s
     def accept(self):
@@ -170,7 +155,7 @@
 def test():
-    s = Server('/tmp/console.html', 8088)
+    s = Server('/tmp/console.html', 19885)
@@ -178,7 +163,7 @@
     module = AnsibleModule(
-            port=dict(default=8088, type='int'),
+            port=dict(default=19885, type='int'),
diff --git a/zuul/ b/zuul/
index 063889b..fa526c1 100644
--- a/zuul/
+++ b/zuul/
@@ -255,9 +255,10 @@
             # create the jobs in the final definition as needed.
             pipeline_defined = False
             for template in configs:
-                ProjectParser.log.debug("Applying template %s to pipeline %s" %
-                                        (,
                 if in template.pipelines:
+                    ProjectParser.log.debug(
+                        "Applying template %s to pipeline %s" %
+                        (,
                     pipeline_defined = True
                     template_pipeline = template.pipelines[]
diff --git a/zuul/connection/ b/zuul/connection/
index 5edc9a5..f084993 100644
--- a/zuul/connection/
+++ b/zuul/connection/
@@ -269,9 +269,8 @@
         for key in remove:
             del self._change_cache[key]
-    def getChange(self, event):
+    def getChange(self, event, refresh=False):
         if event.change_number:
-            refresh = False
             change = self._getChange(event.change_number, event.patch_number,
         elif event.ref:
diff --git a/zuul/launcher/ b/zuul/launcher/
new file mode 100644
index 0000000..a800871
--- /dev/null
+++ b/zuul/launcher/
@@ -0,0 +1,1575 @@
+# Copyright 2014 OpenStack Foundation
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import json
+import logging
+import os
+import re
+import shutil
+import signal
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+import traceback
+import Queue
+import uuid
+import gear
+import yaml
+import jenkins_jobs.builder
+import jenkins_jobs.formatter
+import zmq
+import zuul.ansible.library
+import zuul.ansible.plugins.callback_plugins
+from zuul.lib import commandsocket
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
+            'verbose', 'unverbose']
+def boolify(x):
+    if isinstance(x, str):
+        return bool(int(x))
+    return bool(x)
+class LaunchGearWorker(gear.Worker):
+    def __init__(self, *args, **kw):
+        self.__launch_server = kw.pop('launch_server')
+        super(LaunchGearWorker, self).__init__(*args, **kw)
+    def handleNoop(self, packet):
+        workers = len(self.__launch_server.node_workers)
+        delay = (workers ** 2) / 1000.0
+        time.sleep(delay)
+        return super(LaunchGearWorker, self).handleNoop(packet)
+class NodeGearWorker(gear.Worker):
+    MASS_DO = 101
+    def sendMassDo(self, functions):
+        names = [gear.convert_to_bytes(x) for x in functions]
+        data = b'\x00'.join(names)
+        new_function_dict = {}
+        for name in names:
+            new_function_dict[name] = gear.FunctionRecord(name)
+        self.broadcast_lock.acquire()
+        try:
+            p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+            self.broadcast(p)
+            self.functions = new_function_dict
+        finally:
+            self.broadcast_lock.release()
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.function(*self.args)
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+    def stop(self):
+        self._running = False
+class JobDir(object):
+    def __init__(self, keep=False):
+        self.keep = keep
+        self.root = tempfile.mkdtemp()
+        self.ansible_root = os.path.join(self.root, 'ansible')
+        os.makedirs(self.ansible_root)
+        self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
+        self.inventory = os.path.join(self.ansible_root, 'inventory')
+        self.playbook = os.path.join(self.ansible_root, 'playbook')
+        self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
+        self.config = os.path.join(self.ansible_root, 'ansible.cfg')
+        self.script_root = os.path.join(self.ansible_root, 'scripts')
+        self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
+        os.makedirs(self.script_root)
+        self.staging_root = os.path.join(self.root, 'staging')
+        os.makedirs(self.staging_root)
+    def __enter__(self):
+        return self
+    def __exit__(self, etype, value, tb):
+        if not self.keep:
+            shutil.rmtree(self.root)
+class LaunchServer(object):
+    log = logging.getLogger("zuul.LaunchServer")
+    site_section_re = re.compile('site "(.*?)"')
+    node_section_re = re.compile('node "(.*?)"')
+    def __init__(self, config, keep_jobdir=False):
+        self.config = config
+        self.options = dict(
+            verbose=False
+        )
+        self.keep_jobdir = keep_jobdir
+        self.hostname = socket.gethostname()
+        self.registered_functions = set()
+        self.node_workers = {}
+ = {}
+        self.builds = {}
+        self.zmq_send_queue = Queue.Queue()
+        self.termination_queue = Queue.Queue()
+        self.sites = {}
+        self.static_nodes = {}
+        self.command_map = dict(
+            reconfigure=self.reconfigure,
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            release=self.release,
+            graceful=self.graceful,
+            verbose=self.verboseOn,
+            unverbose=self.verboseOff,
+        )
+        if config.has_option('launcher', 'accept_nodes'):
+            self.accept_nodes = config.getboolean('launcher',
+                                                  'accept_nodes')
+        else:
+            self.accept_nodes = True
+        self.config_accept_nodes = self.accept_nodes
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
+        ansible_dir = os.path.join(state_dir, 'ansible')
+        plugins_dir = os.path.join(ansible_dir, 'plugins')
+        self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
+        if not os.path.exists(self.callback_dir):
+            os.makedirs(self.callback_dir)
+        self.library_dir = os.path.join(ansible_dir, 'library')
+        if not os.path.exists(self.library_dir):
+            os.makedirs(self.library_dir)
+        callback_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.plugins.callback_plugins.__file__))
+        for fn in os.listdir(callback_path):
+            shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
+        library_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.library.__file__))
+        for fn in os.listdir(library_path):
+            shutil.copy(os.path.join(library_path, fn), self.library_dir)
+        def get_config_default(section, option, default):
+            if config.has_option(section, option):
+                return config.get(section, option)
+            return default
+        for section in config.sections():
+            m = self.site_section_re.match(section)
+            if m:
+                sitename =
+                d = {}
+                d['host'] = get_config_default(section, 'host', None)
+                d['user'] = get_config_default(section, 'user', '')
+                d['pass'] = get_config_default(section, 'pass', '')
+                d['root'] = get_config_default(section, 'root', '/')
+                d['keytab'] = get_config_default(section, 'keytab', None)
+                self.sites[sitename] = d
+                continue
+            m = self.node_section_re.match(section)
+            if m:
+                nodename =
+                d = {}
+                d['name'] = nodename
+                d['host'] = config.get(section, 'host')
+                d['description'] = get_config_default(section,
+                                                      'description', '')
+                if config.has_option(section, 'labels'):
+                    d['labels'] = config.get(section, 'labels').split(',')
+                else:
+                    d['labels'] = []
+                self.static_nodes[nodename] = d
+                continue
+    def start(self):
+        self._gearman_running = True
+        self._zmq_running = True
+        self._reaper_running = True
+        self._command_running = True
+        # Setup ZMQ
+        self.zcontext = zmq.Context()
+        self.zsocket = self.zcontext.socket(zmq.PUB)
+        self.zsocket.bind("tcp://*:8888")
+        # Setup Gearman
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = LaunchGearWorker('Zuul Launch Server',
+                                       launch_server=self)
+        self.worker.addServer(server, port)
+        self.log.debug("Waiting for server")
+        self.worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+        # Start command socket
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread.daemon = True
+        self.command_thread.start()
+        # Load JJB config
+        self.loadJobs()
+        # Start ZMQ worker thread
+        self.log.debug("Starting ZMQ processor")
+        self.zmq_thread = threading.Thread(target=self.runZMQ)
+        self.zmq_thread.daemon = True
+        self.zmq_thread.start()
+        # Start node worker reaper thread
+        self.log.debug("Starting reaper")
+        self.reaper_thread = threading.Thread(target=self.runReaper)
+        self.reaper_thread.daemon = True
+        self.reaper_thread.start()
+        # Start Gearman worker thread
+        self.log.debug("Starting worker")
+        self.gearman_thread = threading.Thread(
+        self.gearman_thread.daemon = True
+        self.gearman_thread.start()
+        # Start static workers
+        for node in self.static_nodes.values():
+            self.log.debug("Creating static node with arguments: %s" % (node,))
+            self._launchWorker(node)
+    def loadJobs(self):
+        self.log.debug("Loading jobs")
+        builder = JJB()
+        path = self.config.get('launcher', 'jenkins_jobs')
+        builder.load_files([path])
+        builder.parser.expandYaml()
+        unseen = set(
+        for job in
+            builder.expandMacros(job)
+  [job['name']] = job
+            unseen.discard(job['name'])
+        for name in unseen:
+            del[name]
+    def register(self):
+        new_functions = set()
+        if self.accept_nodes:
+            new_functions.add("node_assign:zuul")
+        new_functions.add("stop:%s" % self.hostname)
+        new_functions.add("set_description:%s" % self.hostname)
+        new_functions.add("node_revoke:%s" % self.hostname)
+        for function in new_functions - self.registered_functions:
+            self.worker.registerFunction(function)
+        for function in self.registered_functions - new_functions:
+            self.worker.unRegisterFunction(function)
+        self.registered_functions = new_functions
+    def reconfigure(self):
+        self.log.debug("Reconfiguring")
+        self.loadJobs()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='reconfigure'))
+            except Exception:
+                self.log.exception("Exception sending reconfigure command "
+                                   "to worker:")
+        self.log.debug("Reconfiguration complete")
+    def pause(self):
+        self.log.debug("Pausing")
+        self.accept_nodes = False
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='pause'))
+            except Exception:
+                self.log.exception("Exception sending pause command "
+                                   "to worker:")
+        self.log.debug("Paused")
+    def unpause(self):
+        self.log.debug("Unpausing")
+        self.accept_nodes = self.config_accept_nodes
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='unpause'))
+            except Exception:
+                self.log.exception("Exception sending unpause command "
+                                   "to worker:")
+        self.log.debug("Unpaused")
+    def release(self):
+        self.log.debug("Releasing idle nodes")
+        for node in self.node_workers.values():
+            if in self.static_nodes:
+                continue
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='release'))
+            except Exception:
+                self.log.exception("Exception sending release command "
+                                   "to worker:")
+        self.log.debug("Finished releasing idle nodes")
+    def graceful(self):
+        # Note: this is run in the command processing thread; no more
+        # external commands will be processed after this.
+        self.log.debug("Gracefully stopping")
+        self.pause()
+        self.release()
+        self.log.debug("Waiting for all builds to finish")
+        while self.builds:
+            time.sleep(5)
+        self.log.debug("All builds are finished")
+        self.stop()
+    def stop(self):
+        self.log.debug("Stopping")
+        # First, stop accepting new jobs
+        self._gearman_running = False
+        self._reaper_running = False
+        self.worker.shutdown()
+        # Then stop all of the workers
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.stop()
+            except Exception:
+                self.log.exception("Exception sending stop command to worker:")
+        # Stop ZMQ afterwords so that the send queue is flushed
+        self._zmq_running = False
+        self.zmq_send_queue.put(None)
+        self.zmq_send_queue.join()
+        # Stop command processing
+        self._command_running = False
+        self.command_socket.stop()
+        # Join the gearman thread which was stopped earlier.
+        self.gearman_thread.join()
+        # The command thread is joined in the join() method of this
+        # class, which is called by the command shell.
+        self.log.debug("Stopped")
+    def verboseOn(self):
+        self.log.debug("Enabling verbose mode")
+        self.options['verbose'] = True
+    def verboseOff(self):
+        self.log.debug("Disabling verbose mode")
+        self.options['verbose'] = False
+    def join(self):
+        self.command_thread.join()
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get()
+                self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
+    def runZMQ(self):
+        while self._zmq_running or not self.zmq_send_queue.empty():
+            try:
+                item = self.zmq_send_queue.get()
+                self.log.debug("Got ZMQ event %s" % (item,))
+                if item is None:
+                    continue
+                self.zsocket.send(item)
+            except Exception:
+                self.log.exception("Exception while processing ZMQ events")
+            finally:
+                self.zmq_send_queue.task_done()
+    def run(self):
+        while self._gearman_running:
+            try:
+                job = self.worker.getJob()
+                try:
+                    if'node_assign:'):
+                        self.log.debug("Got node_assign job: %s" % job.unique)
+                        self.assignNode(job)
+                    elif'stop:'):
+                        self.log.debug("Got stop job: %s" % job.unique)
+                        self.stopJob(job)
+                    elif'set_description:'):
+                        self.log.debug("Got set_description job: %s" %
+                                       job.unique)
+                        job.sendWorkComplete()
+                    elif'node_revoke:'):
+                        self.log.debug("Got node_revoke job: %s" % job.unique)
+                        self.revokeNode(job)
+                    else:
+                        self.log.error("Unable to handle job %s" %
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(traceback.format_exc())
+            except gear.InterruptedError:
+                return
+            except Exception:
+                self.log.exception("Exception while getting job")
+    def assignNode(self, job):
+        args = json.loads(job.arguments)
+        self.log.debug("Assigned node with arguments: %s" % (args,))
+        self._launchWorker(args)
+        data = dict(manager=self.hostname)
+        job.sendWorkData(json.dumps(data))
+        job.sendWorkComplete()
+    def _launchWorker(self, args):
+        worker = NodeWorker(self.config,, self.builds,
+                            self.sites, args['name'], args['host'],
+                            args['description'], args['labels'],
+                            self.hostname, self.zmq_send_queue,
+                            self.termination_queue, self.keep_jobdir,
+                            self.callback_dir, self.library_dir,
+                            self.options)
+        self.node_workers[] = worker
+        worker.thread = threading.Thread(
+        worker.thread.start()
+    def revokeNode(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Revoke job with arguments: %s" % (args,))
+            name = args['name']
+            node = self.node_workers.get(name)
+            if not node:
+                self.log.debug("Unable to find worker %s" % (name,))
+                return
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='stop'))
+                else:
+                    self.log.debug("Node %s is not alive while revoking node" %
+                                   (,))
+            except Exception:
+                self.log.exception("Exception sending stop command "
+                                   "to worker:")
+        finally:
+            job.sendWorkComplete()
+    def stopJob(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Stop job with arguments: %s" % (args,))
+            unique = args['number']
+            build_worker_name = self.builds.get(unique)
+            if not build_worker_name:
+                self.log.debug("Unable to find build for job %s" % (unique,))
+                return
+            node = self.node_workers.get(build_worker_name)
+            if not node:
+                self.log.debug("Unable to find worker for job %s" % (unique,))
+                return
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='abort'))
+                else:
+                    self.log.debug("Node %s is not alive while aborting job" %
+                                   (,))
+            except Exception:
+                self.log.exception("Exception sending abort command "
+                                   "to worker:")
+        finally:
+            job.sendWorkComplete()
+    def runReaper(self):
+        # We don't actually care if all the events are processed
+        while self._reaper_running:
+            try:
+                item = self.termination_queue.get()
+                self.log.debug("Got termination event %s" % (item,))
+                if item is None:
+                    continue
+                worker = self.node_workers[item]
+                self.log.debug("Joining %s" % (item,))
+                worker.thread.join()
+                self.log.debug("Joined %s" % (item,))
+                del self.node_workers[item]
+            except Exception:
+                self.log.exception("Exception while processing "
+                                   "termination events:")
+            finally:
+                self.termination_queue.task_done()
+class NodeWorker(object):
+    retry_args = dict(register='task_result',
+                      until='task_result.rc == 0',
+                      retries=3,
+                      delay=30)
+    def __init__(self, config, jobs, builds, sites, name, host,
+                 description, labels, manager_name, zmq_send_queue,
+                 termination_queue, keep_jobdir, callback_dir,
+                 library_dir, options):
+        self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
+        self.log.debug("Creating node worker %s" % (name,))
+        self.config = config
+ = jobs
+        self.builds = builds
+        self.sites = sites
+ = name
+ = host
+        self.description = description
+        if not isinstance(labels, list):
+            labels = [labels]
+        self.labels = labels
+        self.thread = None
+        self.registered_functions = set()
+        # If the unpaused Event is set, that means we should run jobs.
+        # If it is clear, then we are paused and should not run jobs.
+        self.unpaused = threading.Event()
+        self.unpaused.set()
+        self._running = True
+        self.queue = Queue.Queue()
+        self.manager_name = manager_name
+        self.zmq_send_queue = zmq_send_queue
+        self.termination_queue = termination_queue
+        self.keep_jobdir = keep_jobdir
+        self.running_job_lock = threading.Lock()
+        self.pending_registration = False
+        self.registration_lock = threading.Lock()
+        self._get_job_lock = threading.Lock()
+        self._got_job = False
+        self._job_complete_event = threading.Event()
+        self._running_job = False
+        self._aborted_job = False
+        self._watchdog_timeout = False
+        self._sent_complete_event = False
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
+        self.workspace_root = config.get('launcher', 'workspace_root')
+        if self.config.has_option('launcher', 'private_key_file'):
+            self.private_key_file = config.get('launcher', 'private_key_file')
+        else:
+            self.private_key_file = '~/.ssh/id_rsa'
+        if self.config.has_option('launcher', 'username'):
+            self.username = config.get('launcher', 'username')
+        else:
+            self.username = 'zuul'
+        self.callback_dir = callback_dir
+        self.library_dir = library_dir
+        self.options = options
+    def isAlive(self):
+        # Meant to be called from the manager
+        if self.thread and self.thread.is_alive():
+            return True
+        return False
+    def run(self):
+        self.log.debug("Node worker %s starting" % (,))
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = NodeGearWorker(
+        self.worker.addServer(server, port)
+        self.log.debug("Waiting for server")
+        self.worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+        self.gearman_thread = threading.Thread(target=self.runGearman)
+        self.gearman_thread.daemon = True
+        self.gearman_thread.start()
+        self.log.debug("Started")
+        while self._running or not self.queue.empty():
+            try:
+                self._runQueue()
+            except Exception:
+                self.log.exception("Exception in queue manager:")
+    def stop(self):
+        # If this is called locally, setting _running will be
+        # effictive, if it's called remotely, it will not be, but it
+        # will be set by the queue thread.
+        self.log.debug("Submitting stop request")
+        self._running = False
+        self.unpaused.set()
+        self.queue.put(dict(action='stop'))
+        self.queue.join()
+    def pause(self):
+        self.unpaused.clear()
+        self.worker.stopWaitingForJobs()
+    def unpause(self):
+        self.unpaused.set()
+    def release(self):
+        # If this node is idle, stop it.
+        old_unpaused = self.unpaused.is_set()
+        if old_unpaused:
+            self.pause()
+        with self._get_job_lock:
+            if self._got_job:
+                self.log.debug("This worker is not idle")
+                if old_unpaused:
+                    self.unpause()
+                return
+        self.log.debug("Stopping due to release command")
+        self.queue.put(dict(action='stop'))
+    def _runQueue(self):
+        item = self.queue.get()
+        try:
+            if item['action'] == 'stop':
+                self.log.debug("Received stop request")
+                self._running = False
+                self.termination_queue.put(
+                if not self.abortRunningJob():
+                    self.sendFakeCompleteEvent()
+                else:
+                    self._job_complete_event.wait()
+                self.worker.shutdown()
+            if item['action'] == 'pause':
+                self.log.debug("Received pause request")
+                self.pause()
+            if item['action'] == 'unpause':
+                self.log.debug("Received unpause request")
+                self.unpause()
+            if item['action'] == 'release':
+                self.log.debug("Received release request")
+                self.release()
+            elif item['action'] == 'reconfigure':
+                self.log.debug("Received reconfigure request")
+                self.register()
+            elif item['action'] == 'abort':
+                self.log.debug("Received abort request")
+                self.abortRunningJob()
+        finally:
+            self.queue.task_done()
+    def runGearman(self):
+        while self._running:
+            try:
+                self.unpaused.wait()
+                if self._running:
+                    self._runGearman()
+            except Exception:
+                self.log.exception("Exception in gearman manager:")
+            with self._get_job_lock:
+                self._got_job = False
+    def _runGearman(self):
+        if self.pending_registration:
+            self.register()
+        with self._get_job_lock:
+            try:
+                job = self.worker.getJob()
+                self._got_job = True
+            except gear.InterruptedError:
+                return
+        self.log.debug("Node worker %s got job %s" % (,
+        try:
+            if not in self.registered_functions:
+                self.log.error("Unable to handle job %s" %
+                job.sendWorkFail()
+                return
+            self.launch(job)
+        except Exception:
+            self.log.exception("Exception while running job")
+            job.sendWorkException(traceback.format_exc())
+    def generateFunctionNames(self, job):
+        # This only supports "node: foo" and "node: foo || bar"
+        ret = set()
+        job_labels = job.get('node')
+        matching_labels = set()
+        if job_labels:
+            job_labels = [x.strip() for x in job_labels.split('||')]
+            matching_labels = set(self.labels) & set(job_labels)
+            if not matching_labels:
+                return ret
+        ret.add('build:%s' % (job['name'],))
+        for label in matching_labels:
+            ret.add('build:%s:%s' % (job['name'], label))
+        return ret
+    def register(self):
+        if not self.registration_lock.acquire(False):
+            self.log.debug("Registration already in progress")
+            return
+        try:
+            if self._running_job:
+                self.pending_registration = True
+                self.log.debug("Ignoring registration due to running job")
+                return
+            self.log.debug("Updating registration")
+            self.pending_registration = False
+            new_functions = set()
+            for job in
+                new_functions |= self.generateFunctionNames(job)
+            self.worker.sendMassDo(new_functions)
+            self.registered_functions = new_functions
+        finally:
+            self.registration_lock.release()
+    def abortRunningJob(self):
+        self._aborted_job = True
+        return self.abortRunningProc(self.ansible_job_proc)
+    def abortRunningProc(self, proc):
+        aborted = False
+        self.log.debug("Abort: acquiring job lock")
+        with self.running_job_lock:
+            if self._running_job:
+                self.log.debug("Abort: a job is running")
+                if proc:
+                    self.log.debug("Abort: sending kill signal to job "
+                                   "process group")
+                    try:
+                        pgid = os.getpgid(
+                        os.killpg(pgid, signal.SIGKILL)
+                        aborted = True
+                    except Exception:
+                        self.log.exception("Exception while killing "
+                                           "ansible process:")
+            else:
+                self.log.debug("Abort: no job is running")
+        return aborted
+    def launch(self, job):
+"Node worker %s launching job %s" %
+                      (,
+        # Make sure we can parse what we need from the job first
+        args = json.loads(job.arguments)
+        offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
+        job_name =':')[1]
+        # Initialize the result so we have something regardless of
+        # whether the job actually runs
+        result = None
+        self._sent_complete_event = False
+        self._aborted_job = False
+        self._watchog_timeout = False
+        try:
+            self.sendStartEvent(job_name, args)
+        except Exception:
+            self.log.exception("Exception while sending job start event")
+        try:
+            result = self.runJob(job, args)
+        except Exception:
+            self.log.exception("Exception while launching job thread")
+        self._running_job = False
+        try:
+            data = json.dumps(dict(result=result))
+            job.sendWorkComplete(data)
+        except Exception:
+            self.log.exception("Exception while sending job completion packet")
+        try:
+            self.sendCompleteEvent(job_name, result, args)
+        except Exception:
+            self.log.exception("Exception while sending job completion event")
+        try:
+            del self.builds[job.unique]
+        except Exception:
+            self.log.exception("Exception while clearing build record")
+        self._job_complete_event.set()
+        if offline and self._running:
+            self.stop()
+    def sendStartEvent(self, name, parameters):
+        build = dict(,
+                     host_name=self.manager_name,
+                     parameters=parameters)
+        event = dict(name=name,
+                     build=build)
+        item = "onStarted %s" % json.dumps(event)
+        self.log.debug("Sending over ZMQ: %s" % (item,))
+        self.zmq_send_queue.put(item)
+    def sendCompleteEvent(self, name, status, parameters):
+        build = dict(status=status,
+           ,
+                     host_name=self.manager_name,
+                     parameters=parameters)
+        event = dict(name=name,
+                     build=build)
+        item = "onFinalized %s" % json.dumps(event)
+        self.log.debug("Sending over ZMQ: %s" % (item,))
+        self.zmq_send_queue.put(item)
+        self._sent_complete_event = True
+    def sendFakeCompleteEvent(self):
+        if self._sent_complete_event:
+            return
+        self.sendCompleteEvent('zuul:launcher-shutdown',
+                               'SUCCESS', {})
+    def runJob(self, job, args):
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
+        result = None
+        with self.running_job_lock:
+            if not self._running:
+                return result
+            self._running_job = True
+            self._job_complete_event.clear()
+        self.log.debug("Job %s: beginning" % (job.unique,))
+        self.builds[job.unique] =
+        with JobDir(self.keep_jobdir) as jobdir:
+            self.log.debug("Job %s: job root at %s" %
+                           (job.unique, jobdir.root))
+            timeout = self.prepareAnsibleFiles(jobdir, job, args)
+            data = {
+                'manager': self.manager_name,
+                'number': job.unique,
+            }
+            if ':' in
+                data['url'] = 'telnet://[%s]:19885' %
+            else:
+                data['url'] = 'telnet://%s:19885' %
+            job.sendWorkData(json.dumps(data))
+            job.sendWorkStatus(0, 100)
+            job_status = self.runAnsiblePlaybook(jobdir, timeout)
+            if job_status is None:
+                # The result of the job is indeterminate.  Zuul will
+                # run it again.
+                return result
+            post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
+            if not post_status:
+                result = 'POST_FAILURE'
+            elif job_status:
+                result = 'SUCCESS'
+            else:
+                result = 'FAILURE'
+            if self._aborted_job and not self._watchdog_timeout:
+                # A Null result will cause zuul to relaunch the job if
+                # it needs to.
+                result = None
+        return result
+    def getHostList(self):
+        return [('node', dict(
+  , ansible_user=self.username))]
+    def _substituteVariables(self, text, variables):
+        def lookup(match):
+            return variables.get(, '')
+        return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+    def _getRsyncOptions(self, source, parameters):
+        # Treat the publisher source as a filter; ant and rsync behave
+        # fairly close in this manner, except for leading directories.
+        source = self._substituteVariables(source, parameters)
+        # If the source starts with ** then we want to match any
+        # number of directories, so don't anchor the include filter.
+        # If it does not start with **, then the intent is likely to
+        # at least start by matching an immediate file or subdirectory
+        # (even if later we have a ** in the middle), so in this case,
+        # anchor it to the root of the transfer (the workspace).
+        if not source.startswith('**'):
+            source = os.path.join('/', source)
+        # These options mean: include the thing we want, include any
+        # directories (so that we continue to search for the thing we
+        # want no matter how deep it is), exclude anything that
+        # doesn't match the thing we want or is a directory, then get
+        # rid of empty directories left over at the end.
+        rsync_opts = ['--include="%s"' % source,
+                      '--include="*/"',
+                      '--exclude="*"',
+                      '--prune-empty-dirs']
+        return rsync_opts
+    def _makeSCPTask(self, jobdir, publisher, parameters):
+        tasks = []
+        for scpfile in publisher['scp']['files']:
+            scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+            os.chmod(scproot, 0o755)
+            site = publisher['scp']['site']
+            if scpfile.get('copy-console'):
+                # Include the local ansible directory in the console
+                # upload.  This uploads the playbook and ansible logs.
+                copyargs = dict(src=jobdir.ansible_root + '/',
+                                dest=os.path.join(scproot, '_zuul_ansible'))
+                task = dict(copy=copyargs,
+                            delegate_to='')
+                # This is a local copy and should not fail, so does
+                # not need a retry stanza.
+                tasks.append(task)
+                # Fetch the console log from the remote host.
+                src = '/tmp/console.html'
+                rsync_opts = []
+            else:
+                src = parameters['WORKSPACE']
+                if not src.endswith('/'):
+                    src = src + '/'
+                rsync_opts = self._getRsyncOptions(scpfile['source'],
+                                                   parameters)
+            syncargs = dict(src=src,
+                            dest=scproot,
+                            copy_links='yes',
+                            mode='pull')
+            if rsync_opts:
+                syncargs['rsync_opts'] = rsync_opts
+            task = dict(synchronize=syncargs)
+            if not scpfile.get('copy-after-failure'):
+                task['when'] = 'success'
+            task.update(self.retry_args)
+            tasks.append(task)
+            task = self._makeSCPTaskLocalAction(
+                site, scpfile, scproot, parameters)
+            task.update(self.retry_args)
+            tasks.append(task)
+        return tasks
+    def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
+        if site not in self.sites:
+            raise Exception("Undefined SCP site: %s" % (site,))
+        site = self.sites[site]
+        dest = scpfile['target'].lstrip('/')
+        dest = self._substituteVariables(dest, parameters)
+        dest = os.path.join(site['root'], dest)
+        dest = os.path.normpath(dest)
+        if not dest.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (dest,))
+        rsync_cmd = [
+            '/usr/bin/rsync', '--delay-updates', '-F',
+            '--compress', '-rt', '--safe-links',
+            '--rsync-path="mkdir -p {dest} && rsync"',
+            '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
+            '-o StrictHostKeyChecking=no -q"',
+            '--out-format="<<CHANGED>>%i %n%L"',
+            '{source}', '"{user}@{host}:{dest}"'
+        ]
+        if scpfile.get('keep-hierarchy'):
+            source = '"%s/"' % scproot
+        else:
+            source = '`/usr/bin/find "%s" -type f`' % scproot
+        shellargs = ' '.join(rsync_cmd).format(
+            source=source,
+            dest=dest,
+            private_key_file=self.private_key_file,
+            host=site['host'],
+            user=site['user'])
+        task = dict(shell=shellargs,
+                    delegate_to='')
+        if not scpfile.get('copy-after-failure'):
+            task['when'] = 'success'
+        return task
+    def _makeFTPTask(self, jobdir, publisher, parameters):
+        tasks = []
+        ftp = publisher['ftp']
+        site = ftp['site']
+        if site not in self.sites:
+            raise Exception("Undefined FTP site: %s" % site)
+        site = self.sites[site]
+        ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+        ftpcontent = os.path.join(ftproot, 'content')
+        os.makedirs(ftpcontent)
+        ftpscript = os.path.join(ftproot, 'script')
+        src = parameters['WORKSPACE']
+        if not src.endswith('/'):
+            src = src + '/'
+        rsync_opts = self._getRsyncOptions(ftp['source'],
+                                           parameters)
+        syncargs = dict(src=src,
+                        dest=ftpcontent,
+                        copy_links='yes',
+                        mode='pull')
+        if rsync_opts:
+            syncargs['rsync_opts'] = rsync_opts
+        task = dict(synchronize=syncargs,
+                    when='success')
+        task.update(self.retry_args)
+        tasks.append(task)
+        task = dict(shell='lftp -f %s' % ftpscript,
+                    when='success',
+                    delegate_to='')
+        ftpsource = ftpcontent
+        if ftp.get('remove-prefix'):
+            ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
+        while ftpsource[-1] == '/':
+            ftpsource = ftpsource[:-1]
+        ftptarget = ftp['target'].lstrip('/')
+        ftptarget = self._substituteVariables(ftptarget, parameters)
+        ftptarget = os.path.join(site['root'], ftptarget)
+        ftptarget = os.path.normpath(ftptarget)
+        if not ftptarget.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (ftptarget,))
+        while ftptarget[-1] == '/':
+            ftptarget = ftptarget[:-1]
+        with open(ftpscript, 'w') as script:
+            script.write('open %s\n' % site['host'])
+            script.write('user %s %s\n' % (site['user'], site['pass']))
+            script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
+        task.update(self.retry_args)
+        tasks.append(task)
+        return tasks
+    def _makeAFSTask(self, jobdir, publisher, parameters):
+        tasks = []
+        afs = publisher['afs']
+        site = afs['site']
+        if site not in self.sites:
+            raise Exception("Undefined AFS site: %s" % site)
+        site = self.sites[site]
+        # It is possible that this could be done in one rsync step,
+        # however, the current rysnc from the host is complicated (so
+        # that we can match the behavior of ant), and then rsync to
+        # afs is complicated and involves a pre-processing step in
+        # both locations (so that we can exclude directories).  Each
+        # is well understood individually so it is easier to compose
+        # them in series than combine them together.  A better,
+        # longer-lived solution (with better testing) would do just
+        # that.
+        afsroot = tempfile.mkdtemp(dir=jobdir.staging_root)
+        afscontent = os.path.join(afsroot, 'content')
+        src = parameters['WORKSPACE']
+        if not src.endswith('/'):
+            src = src + '/'
+        rsync_opts = self._getRsyncOptions(afs['source'],
+                                           parameters)
+        syncargs = dict(src=src,
+                        dest=afscontent,
+                        copy_links='yes',
+                        mode='pull')
+        if rsync_opts:
+            syncargs['rsync_opts'] = rsync_opts
+        task = dict(synchronize=syncargs,
+                    when='success')
+        task.update(self.retry_args)
+        tasks.append(task)
+        afstarget = afs['target']
+        afstarget = self._substituteVariables(afstarget, parameters)
+        afstarget = os.path.join(site['root'], afstarget)
+        afstarget = os.path.normpath(afstarget)
+        if not afstarget.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (afstarget,))
+        src_markers_file = os.path.join(afsroot, 'src-markers')
+        dst_markers_file = os.path.join(afsroot, 'dst-markers')
+        exclude_file = os.path.join(afsroot, 'exclude')
+        filter_file = os.path.join(afsroot, 'filter')
+        find_pipe = [
+            "/usr/bin/find {path} -name .root-marker -printf '%P\n'",
+            "/usr/bin/xargs -I{{}} dirname {{}}",
+            "/usr/bin/sort > {file}"]
+        find_pipe = ' | '.join(find_pipe)
+        # Find the list of root markers in the just-completed build
+        # (usually there will only be one, but some builds produce
+        # content at the root *and* at a tag location).
+        task = dict(shell=find_pipe.format(path=afscontent,
+                                           file=src_markers_file),
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        # Find the list of root markers that already exist in the
+        # published site.
+        task = dict(shell=find_pipe.format(path=afstarget,
+                                           file=dst_markers_file),
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        # Create a file that contains the set of directories with root
+        # markers in the published site that do not have root markers
+        # in the built site.
+        exclude_command = "/usr/bin/comm -23 {dst} {src} > {exclude}".format(
+            src=src_markers_file,
+            dst=dst_markers_file,
+            exclude=exclude_file)
+        task = dict(shell=exclude_command,
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        # Create a filter list for rsync so that we copy exactly the
+        # directories we want to without deleting any existing
+        # directories in the published site that were placed there by
+        # previous builds.
+        # The first group of items in the filter list are the
+        # directories in the current build with root markers, except
+        # for the root of the build.  This is so that if, later, the
+        # build root ends up as an exclude, we still copy the
+        # directories in this build underneath it (since these
+        # includes will have matched first).  We can't include the
+        # build root itself here, even if we do want to synchronize
+        # it, since that would defeat later excludes.  In other words,
+        # if the build produces a root marker in "/subdir" but not in
+        # "/", this section is needed so that "/subdir" is copied at
+        # all, since "/" will be excluded later.
+        command = ("/bin/grep -v '^/$' {src} | "
+                   "/bin/sed -e 's/^+ /' > {filter}".format(
+                       src=src_markers_file,
+                       filter=filter_file))
+        task = dict(shell=command,
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        # The second group is the set of directories that are in the
+        # published site but not in the built site.  This is so that
+        # if the built site does contain a marker at root (meaning
+        # that there is content that should be copied into the root)
+        # that we don't delete everything else previously built
+        # underneath the root.
+        command = ("/bin/grep -v '^/$' {exclude} | "
+                   "/bin/sed -e 's/^- /' >> {filter}".format(
+                       exclude=exclude_file,
+                       filter=filter_file))
+        task = dict(shell=command,
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        # The last entry in the filter file is for the build root.  If
+        # there is no marker in the build root, then we need to
+        # exclude it from the rsync, so we add it here.  It needs to
+        # be in the form of '/*' so that it matches all of the files
+        # in the build root.  If there is no marker at the build root,
+        # then we should omit the '/*' exclusion so that it is
+        # implicitly included.
+        command = "grep '^/$' {exclude} && echo '- /*' >> {filter}".format(
+            exclude=exclude_file,
+            filter=filter_file)
+        task = dict(shell=command,
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        # Perform the rsync with the filter list.
+        rsync_cmd = [
+            '/usr/bin/k5start', '-t', '-k', '{keytab}', '--',
+            '/usr/bin/rsync', '-rtp', '--safe-links', '--delete-after',
+            "--filter='merge {filter}'", '{src}/', '{dst}/',
+        ]
+        shellargs = ' '.join(rsync_cmd).format(
+            src=afscontent,
+            dst=afstarget,
+            filter=filter_file,
+            keytab=site['keytab'])
+        task = dict(shell=shellargs,
+                    when='success',
+                    delegate_to='')
+        tasks.append(task)
+        return tasks
+    def _makeBuilderTask(self, jobdir, builder, parameters):
+        tasks = []
+        script_fn = '' % str(uuid.uuid4().hex)
+        script_path = os.path.join(jobdir.script_root, script_fn)
+        with open(script_path, 'w') as script:
+            data = builder['shell']
+            if not data.startswith('#!'):
+                data = '#!/bin/bash -x\n %s' % (data,)
+            script.write(data)
+        remote_path = os.path.join('/tmp', script_fn)
+        copy = dict(src=script_path,
+                    dest=remote_path,
+                    mode=0o555)
+        task = dict(copy=copy)
+        tasks.append(task)
+        runner = dict(command=remote_path,
+                      cwd=parameters['WORKSPACE'],
+                      parameters=parameters)
+        task = dict(zuul_runner=runner)
+        task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
+                        'second timeout')
+        task['when'] = '{{ elapsed_time < timeout | int }}'
+        task['async'] = '{{ timeout | int - elapsed_time }}'
+        task['poll'] = 5
+        tasks.append(task)
+        filetask = dict(path=remote_path,
+                        state='absent')
+        task = dict(file=filetask)
+        tasks.append(task)
+        return tasks
+    def _transformPublishers(self, jjb_job):
+        early_publishers = []
+        late_publishers = []
+        old_publishers = jjb_job.get('publishers', [])
+        for publisher in old_publishers:
+            early_scpfiles = []
+            late_scpfiles = []
+            if 'scp' not in publisher:
+                early_publishers.append(publisher)
+                continue
+            copy_console = False
+            for scpfile in publisher['scp']['files']:
+                if scpfile.get('copy-console'):
+                    scpfile['keep-hierarchy'] = True
+                    late_scpfiles.append(scpfile)
+                    copy_console = True
+                else:
+                    early_scpfiles.append(scpfile)
+            publisher['scp']['files'] = early_scpfiles + late_scpfiles
+            if copy_console:
+                late_publishers.append(publisher)
+            else:
+                early_publishers.append(publisher)
+        publishers = early_publishers + late_publishers
+        if old_publishers != publishers:
+            self.log.debug("Transformed job publishers")
+        return early_publishers, late_publishers
+    def prepareAnsibleFiles(self, jobdir, gearman_job, args):
+        job_name =':')[1]
+        jjb_job =[job_name]
+        parameters = args.copy()
+        parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
+        with open(jobdir.inventory, 'w') as inventory:
+            for host_name, host_vars in self.getHostList():
+                inventory.write(host_name)
+                for k, v in host_vars.items():
+                    inventory.write(' %s=%s' % (k, v))
+                inventory.write('\n')
+        timeout = None
+        timeout_var = None
+        for wrapper in jjb_job.get('wrappers', []):
+            if isinstance(wrapper, dict):
+                build_timeout = wrapper.get('timeout')
+                if isinstance(build_timeout, dict):
+                    timeout_var = build_timeout.get('timeout-var')
+                    timeout = build_timeout.get('timeout')
+                    if timeout is not None:
+                        timeout = int(timeout) * 60
+        if not timeout:
+            timeout = ANSIBLE_DEFAULT_TIMEOUT
+        if timeout_var:
+            parameters[timeout_var] = str(timeout * 1000)
+        with open(jobdir.playbook, 'w') as playbook:
+            pre_tasks = []
+            tasks = []
+            main_block = []
+            error_block = []
+            variables = []
+            shellargs = "ssh-keyscan {{ ansible_host }} > %s" % (
+                jobdir.known_hosts)
+            pre_tasks.append(dict(shell=shellargs,
+                             delegate_to=''))
+            tasks.append(dict(block=main_block,
+                              rescue=error_block))
+            task = dict(file=dict(path='/tmp/console.html', state='absent'))
+            main_block.append(task)
+            task = dict(zuul_console=dict(path='/tmp/console.html',
+                                          port=19885))
+            main_block.append(task)
+            task = dict(file=dict(path=parameters['WORKSPACE'],
+                                  state='directory'))
+            main_block.append(task)
+            msg = [
+                "Launched by %s" % self.manager_name,
+                "Building remotely on %s in workspace %s" % (
+          , parameters['WORKSPACE'])]
+            task = dict(zuul_log=dict(msg=msg))
+            main_block.append(task)
+            for builder in jjb_job.get('builders', []):
+                if 'shell' in builder:
+                    main_block.extend(
+                        self._makeBuilderTask(jobdir, builder, parameters))
+            task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
+            main_block.append(task)
+            task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
+            error_block.append(task)
+            error_block.append(dict(fail=dict(msg='FAILURE')))
+            variables.append(dict(timeout=timeout))
+            play = dict(hosts='node', name='Job body', vars=variables,
+                        pre_tasks=pre_tasks, tasks=tasks)
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
+        early_publishers, late_publishers = self._transformPublishers(jjb_job)
+        with open(jobdir.post_playbook, 'w') as playbook:
+            blocks = []
+            for publishers in [early_publishers, late_publishers]:
+                block = []
+                for publisher in publishers:
+                    if 'scp' in publisher:
+                        block.extend(self._makeSCPTask(jobdir, publisher,
+                                                       parameters))
+                    if 'ftp' in publisher:
+                        block.extend(self._makeFTPTask(jobdir, publisher,
+                                                       parameters))
+                    if 'afs' in publisher:
+                        block.extend(self._makeAFSTask(jobdir, publisher,
+                                                       parameters))
+                blocks.append(block)
+            # The 'always' section contains the log publishing tasks,
+            # the 'block' contains all the other publishers.  This way
+            # we run the log publisher regardless of whether the rest
+            # of the publishers succeed.
+            tasks = []
+            tasks.append(dict(block=blocks[0],
+                              always=blocks[1]))
+            play = dict(hosts='node', name='Publishers',
+                        tasks=tasks)
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
+        with open(jobdir.config, 'w') as config:
+            config.write('[defaults]\n')
+            config.write('hostfile = %s\n' % jobdir.inventory)
+            config.write('keep_remote_files = True\n')
+            config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
+            config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
+            config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
+            config.write('log_path = %s\n' % jobdir.ansible_log)
+            config.write('gathering = explicit\n')
+            config.write('callback_plugins = %s\n' % self.callback_dir)
+            config.write('library = %s\n' % self.library_dir)
+            # bump the timeout because busy nodes may take more than
+            # 10s to respond
+            config.write('timeout = 30\n')
+            config.write('[ssh_connection]\n')
+            ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
+                "-o UserKnownHostsFile=%s" % jobdir.known_hosts
+            config.write('ssh_args = %s\n' % ssh_args)
+        return timeout
+    def _ansibleTimeout(self, proc, msg):
+        self._watchdog_timeout = True
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+    def runAnsiblePlaybook(self, jobdir, timeout):
+        # Set LOGNAME env variable so Ansible log_path log reports
+        # the correct user.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+        if self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+        cmd = ['ansible-playbook', jobdir.playbook, verbose]
+        self.log.debug("Ansible command: %s" % (cmd,))
+        self.ansible_job_proc = subprocess.Popen(
+            cmd,
+            cwd=jobdir.ansible_root,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            preexec_fn=os.setsid,
+            env=env_copy,
+        )
+        ret = None
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (self.ansible_job_proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_job_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = self.ansible_job_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
+        self.ansible_job_proc = None
+        if self._watchdog_timeout:
+            return False
+        if ret == 3:
+            # AnsibleHostUnreachable: We had a network issue connecting to
+            # our zuul-worker.
+            return None
+        elif ret == -9:
+            # Received abort request.
+            return None
+        return ret == 0
+    def runAnsiblePostPlaybook(self, jobdir, success):
+        # Set LOGNAME env variable so Ansible log_path log reports
+        # the correct user.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+        if self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+        cmd = ['ansible-playbook', jobdir.post_playbook,
+               '-e', 'success=%s' % success, verbose]
+        self.log.debug("Ansible post command: %s" % (cmd,))
+        self.ansible_post_proc = subprocess.Popen(
+            cmd,
+            cwd=jobdir.ansible_root,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            preexec_fn=os.setsid,
+            env=env_copy,
+        )
+        ret = None
+        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+                            self._ansibleTimeout,
+                            (self.ansible_post_proc,
+                             "Ansible post timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_post_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible post output: %s" % (line,))
+            ret = self.ansible_post_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible post exit code: %s" % (ret,))
+        self.ansible_post_proc = None
+        return ret == 0
+class JJB(jenkins_jobs.builder.Builder):
+    def __init__(self):
+        self.global_config = None
+        self._plugins_list = []
+    def expandComponent(self, component_type, component, template_data):
+        component_list_type = component_type + 's'
+        new_components = []
+        if isinstance(component, dict):
+            name, component_data = next(iter(component.items()))
+            if template_data:
+                component_data = jenkins_jobs.formatter.deep_format(
+                    component_data, template_data, True)
+        else:
+            name = component
+            component_data = {}
+        new_component =, {}).get(name)
+        if new_component:
+            for new_sub_component in new_component[component_list_type]:
+                new_components.extend(
+                    self.expandComponent(component_type,
+                                         new_sub_component, component_data))
+        else:
+            new_components.append({name: component_data})
+        return new_components
+    def expandMacros(self, job):
+        for component_type in ['builder', 'publisher', 'wrapper']:
+            component_list_type = component_type + 's'
+            new_components = []
+            for new_component in job.get(component_list_type, []):
+                new_components.extend(self.expandComponent(component_type,
+                                                           new_component, {}))
+            job[component_list_type] = new_components
diff --git a/zuul/launcher/ b/zuul/launcher/
index bc533a9..9e29d7f 100644
--- a/zuul/launcher/
+++ b/zuul/launcher/
@@ -17,39 +17,77 @@
 import logging
 import os
 import shutil
+import signal
 import socket
 import subprocess
 import tempfile
 import threading
+import time
 import traceback
 import gear
 import yaml
 import zuul.merger
+import zuul.ansible.library
+import zuul.ansible.plugins.callback_plugins
+from zuul.lib import commandsocket
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+        self.timed_out = None
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.timed_out = True
+            self.function(*self.args)
+        self.timed_out = False
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+    def stop(self):
+        self._running = False
 # TODOv3(mordred): put git repos in a hierarchy that includes source
 # hostname, eg:  Also, configure
 # sources to have an alias, so that the source
 # repos end up in
 class JobDir(object):
-    def __init__(self):
+    def __init__(self, keep=False):
+        self.keep = keep
         self.root = tempfile.mkdtemp()
         self.git_root = os.path.join(self.root, 'git')
         self.ansible_root = os.path.join(self.root, 'ansible')
+        self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
         self.inventory = os.path.join(self.ansible_root, 'inventory')
         self.playbook = os.path.join(self.ansible_root, 'playbook')
+        self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
         self.config = os.path.join(self.ansible_root, 'ansible.cfg')
+        self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
     def __enter__(self):
         return self
     def __exit__(self, etype, value, tb):
-        shutil.rmtree(self.root)
+        if not self.keep:
+            shutil.rmtree(self.root)
 class UpdateTask(object):
@@ -112,12 +150,21 @@
 class LaunchServer(object):
     log = logging.getLogger("zuul.LaunchServer")
-    def __init__(self, config, connections={}):
+    def __init__(self, config, connections={}, keep_jobdir=False):
         self.config = config
+        self.keep_jobdir = keep_jobdir
         # TODOv3(mordred): make the launcher name more unique --
         # perhaps hostname+pid.
         self.hostname = socket.gethostname()
         self.zuul_url = config.get('merger', 'zuul_url')
+        self.command_map = dict(
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            graceful=self.graceful,
+            verbose=self.verboseOn,
+            unverbose=self.verboseOff,
+        )
         if self.config.has_option('merger', 'git_dir'):
             self.merge_root = self.config.get('merger', 'git_dir')
@@ -133,17 +180,48 @@
             self.merge_name = self.config.get('merger', 'git_user_name')
             self.merge_name = None
+        if self.config.has_option('launcher', 'private_key_file'):
+            self.private_key_file = config.get('launcher', 'private_key_file')
+        else:
+            self.private_key_file = '~/.ssh/id_rsa'
         self.connections = connections
         self.merger = self._getMerger(self.merge_root)
         self.update_queue = DeduplicateQueue()
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
+        ansible_dir = os.path.join(state_dir, 'ansible')
+        plugins_dir = os.path.join(ansible_dir, 'plugins')
+        self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
+        if not os.path.exists(self.callback_dir):
+            os.makedirs(self.callback_dir)
+        self.library_dir = os.path.join(ansible_dir, 'library')
+        if not os.path.exists(self.library_dir):
+            os.makedirs(self.library_dir)
+        callback_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.plugins.callback_plugins.__file__))
+        for fn in os.listdir(callback_path):
+            shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
+        library_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.library.__file__))
+        for fn in os.listdir(library_path):
+            shutil.copy(os.path.join(library_path, fn), self.library_dir)
     def _getMerger(self, root):
         return zuul.merger.merger.Merger(root, self.connections,
                                          self.merge_email, self.merge_name)
     def start(self):
         self._running = True
+        self._command_running = True
         server = self.config.get('gearman', 'server')
         if self.config.has_option('gearman', 'port'):
             port = self.config.get('gearman', 'port')
@@ -155,6 +233,13 @@
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread.daemon = True
+        self.command_thread.start()
         self.log.debug("Starting worker")
         self.update_thread = threading.Thread(target=self._updateLoop)
         self.update_thread.daemon = True
@@ -173,12 +258,42 @@
         self._running = False
+        self._command_running = False
+        self.command_socket.stop()
+    def pause(self):
+        # TODOv3: implement
+        pass
+    def unpause(self):
+        # TODOv3: implement
+        pass
+    def graceful(self):
+        # TODOv3: implement
+        pass
+    def verboseOn(self):
+        # TODOv3: implement
+        pass
+    def verboseOff(self):
+        # TODOv3: implement
+        pass
     def join(self):
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get()
+                self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
     def _updateLoop(self):
         while self._running:
@@ -271,7 +386,6 @@
             job.sendWorkStatus(0, 100)
             result = self.runAnsible(jobdir, job)
             result = dict(result=result)
@@ -305,23 +419,93 @@
         with open(jobdir.config, 'w') as config:
             config.write('hostfile = %s\n' % jobdir.inventory)
+            config.write('keep_remote_files = True\n')
+            config.write('local_tmp = %s/.ansible/local_tmp\n' % jobdir.root)
+            config.write('remote_tmp = %s/.ansible/remote_tmp\n' % jobdir.root)
+            config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
+            config.write('log_path = %s\n' % jobdir.ansible_log)
+            config.write('gathering = explicit\n')
+            config.write('callback_plugins = %s\n' % self.callback_dir)
+            config.write('library = %s\n' % self.library_dir)
+            # bump the timeout because busy nodes may take more than
+            # 10s to respond
+            config.write('timeout = 30\n')
+            config.write('[ssh_connection]\n')
+            ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
+                "-o UserKnownHostsFile=%s" % jobdir.known_hosts
+            config.write('ssh_args = %s\n' % ssh_args)
+    def _ansibleTimeout(self, proc, msg):
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+    def abortRunningProc(self, proc):
+        aborted = False
+        self.log.debug("Abort: sending kill signal to job "
+                       "process group")
+        try:
+            pgid = os.getpgid(
+            os.killpg(pgid, signal.SIGKILL)
+            aborted = True
+        except Exception:
+            self.log.exception("Exception while killing "
+                               "ansible process:")
+        return aborted
     def runAnsible(self, jobdir, job):
         # Job is included here for the benefit of the test framework.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+        if False:  # TODOv3: self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+        cmd = ['ansible-playbook', jobdir.playbook, verbose]
+        self.log.debug("Ansible command: %s" % (cmd,))
+        # TODOv3: verbose
         proc = subprocess.Popen(
-            ['ansible-playbook', jobdir.playbook],
+            cmd,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            preexec_fn=os.setsid,
+            env=env_copy,
-        (out, err) = proc.communicate()
-        ret = proc.wait()
-        print(out)
-        print(err)
+        ret = None
+        # TODOv3: get this from the job
+        timeout = 60
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
+        if watchdog.timed_out:
+            return 'TIMED_OUT'
+        if ret == 3:
+            # AnsibleHostUnreachable: We had a network issue connecting to
+            # our zuul-worker.
+            return None
+        elif ret == -9:
+            # Received abort request.
+            return None
         if ret == 0:
             return 'SUCCESS'
-        else:
-            return 'FAILURE'
+        return 'FAILURE'
     def cat(self, job):
         args = json.loads(job.arguments)
diff --git a/zuul/manager/ b/zuul/manager/
index d3b6b0d..3d28327 100644
--- a/zuul/manager/
+++ b/zuul/manager/
@@ -29,8 +29,6 @@
         # creates a new change queue for every change
         if existing:
             return DynamicChangeQueueContextManager(existing)
-        if change.project not in self.pipeline.getProjects():
-            self.pipeline.addProject(change.project)
         change_queue = model.ChangeQueue(self.pipeline)
diff --git a/zuul/merger/ b/zuul/merger/
index a3bccc0..692dd83 100644
--- a/zuul/merger/
+++ b/zuul/merger/
@@ -91,8 +91,12 @@
             repo.create_head(ref.remote_head, ref, force=True)
-        # Reset to remote HEAD (usually origin/master)
-        repo.head.reference = origin.refs['HEAD']
+        # try reset to remote HEAD (usually origin/master)
+        # If it fails, pick the first reference
+        try:
+            repo.head.reference = origin.refs['HEAD']
+        except IndexError:
+            repo.head.reference = origin.refs[0]
         repo.git.clean('-x', '-f', '-d')
@@ -178,7 +182,14 @@
         repo = self.createRepoObject()
         self.log.debug("Updating repository %s" % self.local_path)
         origin = repo.remotes.origin
-        origin.update()
+        if repo.git.version_info[:2] < (1, 9):
+            # Before 1.9, 'git fetch --tags' did not include the
+            # behavior covered by 'git --fetch', so we run both
+            # commands in that case.  Starting with 1.9, 'git fetch
+            # --tags' is all that is necessary.  See
+            #
+            origin.fetch()
+        origin.fetch(tags=True)
     def getFiles(self, files, branch=None, commit=None):
         ret = {}
diff --git a/zuul/ b/zuul/
index 0aa1ad5..53c4646 100644
--- a/zuul/
+++ b/zuul/
@@ -1770,9 +1770,13 @@
     def createJobTree(self, item):
         project_config = self.project_configs[]
-        project_tree = project_config.pipelines[].job_tree
         ret = JobTree(None)
-        self._createJobTree(item.change, project_tree.job_trees, ret)
+        # NOTE(pabelanger): It is possible for a foreign project not to have a
+        # configured pipeline, if so return an empty JobTree.
+        if in project_config.pipelines:
+            project_tree = \
+                project_config.pipelines[].job_tree
+            self._createJobTree(item.change, project_tree.job_trees, ret)
         return ret
diff --git a/zuul/source/ b/zuul/source/
index 8b85a46..8b03135 100644
--- a/zuul/source/
+++ b/zuul/source/
@@ -32,8 +32,8 @@
     def postConfig(self):
-    def getChange(self, event):
-        return self.connection.getChange(event)
+    def getChange(self, event, refresh=False):
+        return self.connection.getChange(event, refresh)
     def getProject(self, name):
         return self.connection.getProject(name)