Merge "Register connections when testing configuration"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
 [DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
 test_id_option=--load-list $IDFILE
 test_list_option=--list
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c61cea8..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
    https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
 
 .. _`Turbo-Hipster`:
-   http://git.openstack.org/cgit/stackforge/turbo-hipster/
+   https://git.openstack.org/cgit/openstack/turbo-hipster/
 
 .. _`Turbo-Hipster Documentation`:
    http://turbo-hipster.rtfd.org/
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index d8d72e6..be9570c 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -49,6 +49,11 @@
   Port on which the Gearman server is listening.
   ``port=4730``
 
+**check_job_registration**
+  Check to see if job is registered with Gearman or not. When True
+  a build result of NOT_REGISTERED will be return if job is not found.
+  ``check_job_registration=True``
+
 gearman_server
 """"""""""""""
 
@@ -394,11 +399,12 @@
   approval matching all specified requirements.
 
     *username*
-    If present, an approval from this username is required.
+    If present, an approval from this username is required.  It is
+    treated as a regular expression.
 
     *email*
     If present, an approval with this email address is required.  It
-    is treated as a regular expression as above.
+    is treated as a regular expression.
 
     *email-filter* (deprecated)
     A deprecated alternate spelling of *email*.  Only one of *email* or
@@ -997,9 +1003,8 @@
 
 If you send signal 1 (SIGHUP) to the zuul-server process, Zuul will
 stop executing new jobs, wait until all executing jobs are finished,
-reload its configuration, and resume.  Any values in any of the
-configuration files may be changed, except the location of Zuul's PID
-file (a change to that will be ignored until Zuul is restarted).
+reload its layout.yaml, and resume. Changes to any connections or
+the PID  file will be ignored until Zuul is restarted.
 
 If you send a SIGUSR1 to the zuul-server process, Zuul will stop
 executing new jobs, wait until all executing jobs are finished,
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index c63700a..9df44ce 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -490,10 +490,12 @@
                 $header_div.append($heading);
 
                 if (typeof pipeline.description === 'string') {
+                    var descr = $('<small />')
+                    $.each( pipeline.description.split(/\r?\n\r?\n/), function(index, descr_part){
+                        descr.append($('<p />').text(descr_part));
+                    });
                     $header_div.append(
-                        $('<p />').append(
-                            $('<small />').text(pipeline.description)
-                        )
+                        $('<p />').append(descr)
                     );
                 }
                 return $header_div;
diff --git a/other-requirements.txt b/other-requirements.txt
new file mode 100644
index 0000000..1ade655
--- /dev/null
+++ b/other-requirements.txt
@@ -0,0 +1,4 @@
+mysql-client [test]
+mysql-server [test]
+postgresql [test]
+postgresql-client [test]
diff --git a/requirements.txt b/requirements.txt
index 8388f0b..77ac0a5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,7 +3,7 @@
 PyYAML>=3.1.0
 Paste
 WebOb>=1.2.3
-paramiko>=1.8.0
+paramiko>=1.8.0,<2.0.0
 GitPython>=0.3.3
 ordereddict
 python-daemon>=2.0.4,<2.1.0
diff --git a/setup.cfg b/setup.cfg
index 620e1ac..7ddeb84 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -25,6 +25,7 @@
     zuul-merger = zuul.cmd.merger:main
     zuul = zuul.cmd.client:main
     zuul-cloner = zuul.cmd.cloner:main
+    zuul-launcher = zuul.cmd.launcher:main
 
 [build_sphinx]
 source-dir = doc/source
diff --git a/tests/base.py b/tests/base.py
index 405caa0..2d7f918 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -22,22 +22,22 @@
 import os
 import pprint
 from six.moves import queue as Queue
+from six.moves import urllib
 import random
 import re
 import select
 import shutil
+from six.moves import reload_module
 import socket
 import string
 import subprocess
 import swiftclient
 import threading
 import time
-import urllib2
 
 import git
 import gear
 import fixtures
-import six.moves.urllib.parse as urlparse
 import statsd
 import testtools
 from git import GitCommandError
@@ -479,7 +479,7 @@
         self.url = url
 
     def read(self):
-        res = urlparse.urlparse(self.url)
+        res = urllib.parse.urlparse(self.url)
         path = res.path
         project = '/'.join(path.split('/')[2:-2])
         ret = '001e# service=git-upload-pack\n'
@@ -862,6 +862,28 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
+            # NOTE(notmorgan): Extract logging overrides for specific libraries
+            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+            # each. This is used to limit the output during test runs from
+            # libraries that zuul depends on such as gear.
+            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+            if log_defaults_from_env:
+                for default in log_defaults_from_env.split(','):
+                    try:
+                        name, level_str = default.split('=', 1)
+                        level = getattr(logging, level_str, logging.DEBUG)
+                        self.useFixture(fixtures.FakeLogger(
+                            name=name,
+                            level=level,
+                            format='%(asctime)s %(name)-32s '
+                                   '%(levelname)-8s %(message)s'))
+                    except ValueError:
+                        # NOTE(notmorgan): Invalid format of the log default,
+                        # skip and don't try and apply a logger for the
+                        # specified module
+                        pass
+
 
 class ZuulTestCase(BaseTestCase):
 
@@ -876,11 +898,13 @@
         self.test_root = os.path.join(tmp_root, "zuul-test")
         self.upstream_root = os.path.join(self.test_root, "upstream")
         self.git_root = os.path.join(self.test_root, "git")
+        self.state_root = os.path.join(self.test_root, "lib")
 
         if os.path.exists(self.test_root):
             shutil.rmtree(self.test_root)
         os.makedirs(self.test_root)
         os.makedirs(self.upstream_root)
+        os.makedirs(self.state_root)
 
         # Make per test copy of Configuration.
         self.setup_config()
@@ -888,6 +912,7 @@
                         os.path.join(FIXTURE_DIR,
                                      self.config.get('zuul', 'layout_config')))
         self.config.set('merger', 'git_dir', self.git_root)
+        self.config.set('zuul', 'state_dir', self.state_root)
 
         # For each project in config:
         self.init_repo("org/project")
@@ -914,8 +939,8 @@
         os.environ['STATSD_PORT'] = str(self.statsd.port)
         self.statsd.start()
         # the statsd client object is configured in the statsd module import
-        reload(statsd)
-        reload(zuul.scheduler)
+        reload_module(statsd)
+        reload_module(zuul.scheduler)
 
         self.gearman_server = FakeGearmanServer()
 
@@ -944,12 +969,12 @@
         self.sched.registerConnections(self.connections)
 
         def URLOpenerFactory(*args, **kw):
-            if isinstance(args[0], urllib2.Request):
+            if isinstance(args[0], urllib.request.Request):
                 return old_urlopen(*args, **kw)
             return FakeURLOpener(self.upstream_root, *args, **kw)
 
-        old_urlopen = urllib2.urlopen
-        urllib2.urlopen = URLOpenerFactory
+        old_urlopen = urllib.request.urlopen
+        urllib.request.urlopen = URLOpenerFactory
 
         self.merge_server = zuul.merger.server.MergeServer(self.config,
                                                            self.connections)
@@ -1132,6 +1157,17 @@
         zuul.merger.merger.reset_repo_to_head(repo)
         repo.git.clean('-x', '-f', '-d')
 
+    def create_commit(self, project):
+        path = os.path.join(self.upstream_root, project)
+        repo = git.Repo(path)
+        repo.head.reference = repo.heads['master']
+        file_name = os.path.join(path, 'README')
+        with open(file_name, 'a') as f:
+            f.write('creating fake commit\n')
+        repo.index.add([file_name])
+        commit = repo.index.commit('Creating a fake commit')
+        return commit.hexsha
+
     def ref_has_change(self, ref, change):
         path = os.path.join(self.git_root, change.project)
         repo = git.Repo(path)
@@ -1293,9 +1329,11 @@
         start = time.time()
         while True:
             if time.time() - start > 10:
-                print 'queue status:',
-                print ' '.join(self.eventQueuesEmpty())
-                print self.areAllBuildsWaiting()
+                self.log.debug("Queue status:")
+                for queue in self.event_queues:
+                    self.log.debug("  %s: %s" % (queue, queue.empty()))
+                self.log.debug("All builds waiting: %s" %
+                               (self.areAllBuildsWaiting(),))
                 raise Exception("Timeout waiting for Zuul to settle")
             # Make sure no new events show up while we're checking
             self.worker.lock.acquire()
@@ -1333,8 +1371,8 @@
         for pipeline in self.sched.layout.pipelines.values():
             for queue in pipeline.queues:
                 if len(queue.queue) != 0:
-                    print 'pipeline %s queue %s contents %s' % (
-                        pipeline.name, queue.name, queue.queue)
+                    print('pipeline %s queue %s contents %s' % (
+                        pipeline.name, queue.name, queue.queue))
                 self.assertEqual(len(queue.queue), 0,
                                  "Pipelines queues should be empty")
 
diff --git a/tests/fixtures/layout-requirement-username.yaml b/tests/fixtures/layout-requirement-username.yaml
index 7a549f0..f9e6477 100644
--- a/tests/fixtures/layout-requirement-username.yaml
+++ b/tests/fixtures/layout-requirement-username.yaml
@@ -3,7 +3,7 @@
     manager: IndependentPipelineManager
     require:
       approval:
-        - username: jenkins
+        - username: ^(jenkins|zuul)$
     trigger:
       gerrit:
         - event: comment-added
diff --git a/tests/test_cloner.py b/tests/test_cloner.py
index 137c157..e3576bd 100644
--- a/tests/test_cloner.py
+++ b/tests/test_cloner.py
@@ -566,3 +566,57 @@
         self.worker.hold_jobs_in_build = False
         self.worker.release()
         self.waitUntilSettled()
+
+    def test_post_checkout(self):
+        project = "org/project"
+        path = os.path.join(self.upstream_root, project)
+        repo = git.Repo(path)
+        repo.head.reference = repo.heads['master']
+        commits = []
+        for i in range(0, 3):
+            commits.append(self.create_commit(project))
+        newRev = commits[1]
+
+        cloner = zuul.lib.cloner.Cloner(
+            git_base_url=self.upstream_root,
+            projects=[project],
+            workspace=self.workspace_root,
+            zuul_branch=None,
+            zuul_ref='master',
+            zuul_url=self.git_root,
+            zuul_project=project,
+            zuul_newrev=newRev,
+        )
+        cloner.execute()
+        repos = self.getWorkspaceRepos([project])
+        cloned_sha = repos[project].rev_parse('HEAD').hexsha
+        self.assertEqual(newRev, cloned_sha)
+
+    def test_post_and_master_checkout(self):
+        project = "org/project1"
+        master_project = "org/project2"
+        path = os.path.join(self.upstream_root, project)
+        repo = git.Repo(path)
+        repo.head.reference = repo.heads['master']
+        commits = []
+        for i in range(0, 3):
+            commits.append(self.create_commit(project))
+        newRev = commits[1]
+
+        cloner = zuul.lib.cloner.Cloner(
+            git_base_url=self.upstream_root,
+            projects=[project, master_project],
+            workspace=self.workspace_root,
+            zuul_branch=None,
+            zuul_ref='master',
+            zuul_url=self.git_root,
+            zuul_project=project,
+            zuul_newrev=newRev
+        )
+        cloner.execute()
+        repos = self.getWorkspaceRepos([project, master_project])
+        cloned_sha = repos[project].rev_parse('HEAD').hexsha
+        self.assertEqual(newRev, cloned_sha)
+        self.assertEqual(
+            repos[master_project].rev_parse('HEAD').hexsha,
+            repos[master_project].rev_parse('master').hexsha)
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3dc3234..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import ConfigParser
+from six.moves import configparser as ConfigParser
 import os
 import re
 
@@ -33,13 +33,13 @@
 class TestLayoutValidator(testtools.TestCase):
     def test_layouts(self):
         """Test layout file validation"""
-        print
+        print()
         errors = []
         for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')):
             m = LAYOUT_RE.match(fn)
             if not m:
                 continue
-            print fn
+            print(fn)
 
             # Load any .conf file by the same name but .conf extension.
             config_file = ("%s.conf" %
@@ -69,7 +69,7 @@
                                     fn)
                 except voluptuous.Invalid as e:
                     error = str(e)
-                    print '  ', error
+                    print('  ', error)
                     if error in errors:
                         raise Exception("Error has already been tested: %s" %
                                         error)
diff --git a/tests/test_model.py b/tests/test_model.py
index 2711618..ac19383 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -12,6 +12,11 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import os
+import random
+
+import fixtures
+
 from zuul import change_matcher as cm
 from zuul import model
 
@@ -62,3 +67,76 @@
         metajob = model.Job('^job')
         job.copy(metajob)
         self._assert_job_booleans_are_not_none(job)
+
+
+class TestJobTimeData(BaseTestCase):
+    def setUp(self):
+        super(TestJobTimeData, self).setUp()
+        self.tmp_root = self.useFixture(fixtures.TempDir(
+            rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+        ).path
+
+    def test_empty_timedata(self):
+        path = os.path.join(self.tmp_root, 'job-name')
+        self.assertFalse(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+
+    def test_save_reload(self):
+        path = os.path.join(self.tmp_root, 'job-name')
+        self.assertFalse(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        success_times = []
+        failure_times = []
+        results = []
+        for x in range(10):
+            success_times.append(int(random.random() * 1000))
+            failure_times.append(int(random.random() * 1000))
+            results.append(0)
+            results.append(1)
+        random.shuffle(results)
+        s = f = 0
+        for result in results:
+            if result:
+                td.add(failure_times[f], 'FAILURE')
+                f += 1
+            else:
+                td.add(success_times[s], 'SUCCESS')
+                s += 1
+        self.assertEqual(td.success_times, success_times)
+        self.assertEqual(td.failure_times, failure_times)
+        self.assertEqual(td.results, results[10:])
+        td.save()
+        self.assertTrue(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        td.load()
+        self.assertEqual(td.success_times, success_times)
+        self.assertEqual(td.failure_times, failure_times)
+        self.assertEqual(td.results, results[10:])
+
+
+class TestTimeDataBase(BaseTestCase):
+    def setUp(self):
+        super(TestTimeDataBase, self).setUp()
+        self.tmp_root = self.useFixture(fixtures.TempDir(
+            rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+        ).path
+        self.db = model.TimeDataBase(self.tmp_root)
+
+    def test_timedatabase(self):
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
+        self.db.update('job-name', 50, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
+        self.db.update('job-name', 100, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
+        for x in range(10):
+            self.db.update('job-name', 100, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 100)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index b585fea..ea512a2 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -20,11 +20,10 @@
 import re
 import shutil
 import time
-import urllib
-import urllib2
 import yaml
 
 import git
+from six.moves import urllib
 import testtools
 
 import zuul.change_matcher
@@ -34,7 +33,6 @@
 import zuul.reporter.smtp
 
 from tests.base import (
-    BaseTestCase,
     ZuulTestCase,
     repack_repo,
 )
@@ -44,40 +42,6 @@
                     '%(levelname)-8s %(message)s')
 
 
-class TestSchedulerConfigParsing(BaseTestCase):
-
-    def test_parse_skip_if(self):
-        job_yaml = """
-jobs:
-  - name: job_name
-    skip-if:
-      - project: ^project_name$
-        branch: ^stable/icehouse$
-        all-files-match-any:
-          - ^filename$
-      - project: ^project2_name$
-        all-files-match-any:
-          - ^filename2$
-    """.strip()
-        data = yaml.load(job_yaml)
-        config_job = data.get('jobs')[0]
-        sched = zuul.scheduler.Scheduler({})
-        cm = zuul.change_matcher
-        expected = cm.MatchAny([
-            cm.MatchAll([
-                cm.ProjectMatcher('^project_name$'),
-                cm.BranchMatcher('^stable/icehouse$'),
-                cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
-            ]),
-            cm.MatchAll([
-                cm.ProjectMatcher('^project2_name$'),
-                cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
-            ]),
-        ])
-        matcher = sched._parseSkipIf(config_job)
-        self.assertEqual(expected, matcher)
-
-
 class TestScheduler(ZuulTestCase):
 
     def test_jobs_launched(self):
@@ -495,6 +459,46 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def _test_time_database(self, iteration):
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        time.sleep(2)
+
+        data = json.loads(self.sched.formatStatusJSON())
+        found_job = None
+        for pipeline in data['pipelines']:
+            if pipeline['name'] != 'gate':
+                continue
+            for queue in pipeline['change_queues']:
+                for head in queue['heads']:
+                    for item in head:
+                        for job in item['jobs']:
+                            if job['name'] == 'project-merge':
+                                found_job = job
+                                break
+
+        self.assertIsNotNone(found_job)
+        if iteration == 1:
+            self.assertIsNotNone(found_job['estimated_time'])
+            self.assertIsNone(found_job['remaining_time'])
+        else:
+            self.assertIsNotNone(found_job['estimated_time'])
+            self.assertTrue(found_job['estimated_time'] >= 2)
+            self.assertIsNotNone(found_job['remaining_time'])
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+    def test_time_database(self):
+        "Test the time database"
+
+        self._test_time_database(1)
+        self._test_time_database(2)
+
     def test_two_failed_changes_at_head(self):
         "Test that changes are reparented correctly if 2 fail at head"
 
@@ -600,6 +604,36 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def test_parse_skip_if(self):
+        job_yaml = """
+jobs:
+  - name: job_name
+    skip-if:
+      - project: ^project_name$
+        branch: ^stable/icehouse$
+        all-files-match-any:
+          - ^filename$
+      - project: ^project2_name$
+        all-files-match-any:
+          - ^filename2$
+    """.strip()
+        data = yaml.load(job_yaml)
+        config_job = data.get('jobs')[0]
+        cm = zuul.change_matcher
+        expected = cm.MatchAny([
+            cm.MatchAll([
+                cm.ProjectMatcher('^project_name$'),
+                cm.BranchMatcher('^stable/icehouse$'),
+                cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
+            ]),
+            cm.MatchAll([
+                cm.ProjectMatcher('^project2_name$'),
+                cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
+            ]),
+        ])
+        matcher = self.sched._parseSkipIf(config_job)
+        self.assertEqual(expected, matcher)
+
     def test_patch_order(self):
         "Test that dependent patches are tested in the right order"
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -1449,7 +1483,7 @@
         self.worker.build_history = []
 
         path = os.path.join(self.git_root, "org/project")
-        print repack_repo(path)
+        print(repack_repo(path))
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
@@ -1474,9 +1508,9 @@
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         A.addPatchset(large=True)
         path = os.path.join(self.upstream_root, "org/project1")
-        print repack_repo(path)
+        print(repack_repo(path))
         path = os.path.join(self.git_root, "org/project1")
-        print repack_repo(path)
+        print(repack_repo(path))
 
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@@ -2235,10 +2269,13 @@
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
         self.waitUntilSettled()
 
+        self.worker.release('project-merge')
+        self.waitUntilSettled()
+
         port = self.webapp.server.socket.getsockname()[1]
 
-        req = urllib2.Request("http://localhost:%s/status.json" % port)
-        f = urllib2.urlopen(req)
+        req = urllib.request.Request("http://localhost:%s/status.json" % port)
+        f = urllib.request.urlopen(req)
         headers = f.info()
         self.assertIn('Content-Length', headers)
         self.assertIn('Content-Type', headers)
@@ -2255,7 +2292,7 @@
         self.waitUntilSettled()
 
         data = json.loads(data)
-        status_jobs = set()
+        status_jobs = []
         for p in data['pipelines']:
             for q in p['change_queues']:
                 if p['name'] in ['gate', 'conflict']:
@@ -2267,10 +2304,24 @@
                         self.assertTrue(change['active'])
                         self.assertEqual(change['id'], '1,1')
                         for job in change['jobs']:
-                            status_jobs.add(job['name'])
-        self.assertIn('project-merge', status_jobs)
-        self.assertIn('project-test1', status_jobs)
-        self.assertIn('project-test2', status_jobs)
+                            status_jobs.append(job)
+        self.assertEqual('project-merge', status_jobs[0]['name'])
+        self.assertEqual('https://server/job/project-merge/0/',
+                         status_jobs[0]['url'])
+        self.assertEqual('http://logs.example.com/1/1/gate/project-merge/0',
+                         status_jobs[0]['report_url'])
+
+        self.assertEqual('project-test1', status_jobs[1]['name'])
+        self.assertEqual('https://server/job/project-test1/1/',
+                         status_jobs[1]['url'])
+        self.assertEqual('http://logs.example.com/1/1/gate/project-test1/1',
+                         status_jobs[1]['report_url'])
+
+        self.assertEqual('project-test2', status_jobs[2]['name'])
+        self.assertEqual('https://server/job/project-test2/2/',
+                         status_jobs[2]['url'])
+        self.assertEqual('http://logs.example.com/1/1/gate/project-test2/2',
+                         status_jobs[2]['report_url'])
 
     def test_merging_queues(self):
         "Test that transitively-connected change queues are merged"
@@ -2829,7 +2880,8 @@
 
         port = self.webapp.server.socket.getsockname()[1]
 
-        f = urllib.urlopen("http://localhost:%s/status.json" % port)
+        req = urllib.request.Request("http://localhost:%s/status.json" % port)
+        f = urllib.request.urlopen(req)
         data = f.read()
 
         self.worker.hold_jobs_in_build = False
@@ -4215,6 +4267,45 @@
         self.waitUntilSettled()
         self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2')
 
+    def test_crd_cycle_join(self):
+        "Test an updated change creates a cycle"
+        A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
+
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        # Create B->A
+        B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+        B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            B.subject, A.data['id'])
+        self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+        self.waitUntilSettled()
+
+        # Update A to add A->B (a cycle).
+        A.addPatchset()
+        A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+            A.subject, B.data['id'])
+        # Normally we would submit the patchset-created event for
+        # processing here, however, we have no way of noting whether
+        # the dependency cycle detection correctly raised an
+        # exception, so instead, we reach into the source driver and
+        # call the method that would ultimately be called by the event
+        # processing.
+
+        source = self.sched.layout.pipelines['gate'].source
+        with testtools.ExpectedException(
+            Exception, "Dependency cycle detected"):
+            source._getChange(u'1', u'2', True)
+        self.log.debug("Got expected dependency cycle exception")
+
+        # Now if we update B to remove the depends-on, everything
+        # should be okay.  B; A->B
+
+        B.addPatchset()
+        B.data['commitMessage'] = '%s\n' % (B.subject,)
+        source._getChange(u'1', u'2', True)
+        source._getChange(u'2', u'2', True)
+
     def test_disable_at(self):
         "Test a pipeline will only report to the disabled trigger when failing"
 
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index b127c51..94f097a 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -16,7 +16,8 @@
 # under the License.
 
 import json
-import urllib2
+
+from six.moves import urllib
 
 from tests.base import ZuulTestCase
 
@@ -44,41 +45,41 @@
     def test_webapp_status(self):
         "Test that we can filter to only certain changes in the webapp."
 
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertIn('pipelines', data)
 
     def test_webapp_status_compat(self):
         # testing compat with status.json
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status.json" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertIn('pipelines', data)
 
     def test_webapp_bad_url(self):
         # do we 404 correctly
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/foo" % self.port)
-        self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req)
+        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
 
     def test_webapp_find_change(self):
         # can we filter by change id
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/change/1,1" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertEqual(1, len(data), data)
         self.assertEqual("org/project", data[0]['project'])
 
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/change/2,1" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertEqual(1, len(data), data)
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 9dbf504..8b854c7 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -35,7 +35,7 @@
                 if not change['live']:
                     continue
                 cid, cps = change['id'].split(',')
-                print (
+                print(
                     "zuul enqueue --trigger gerrit --pipeline %s "
                     "--project %s --change %s,%s" % (
                         options.pipeline_name,
diff --git a/tox.ini b/tox.ini
index 79ea939..06ccbcd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
          STATSD_PORT=8125
          VIRTUAL_ENV={envdir}
          OS_TEST_TIMEOUT=30
+         OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
 passenv = ZUUL_TEST_ROOT
 usedevelop = True
 install_command = pip install {opts} {packages}
@@ -17,7 +18,17 @@
 commands =
   python setup.py testr --slowest --testr-args='{posargs}'
 
+[testenv:bindep]
+# Do not install any requirements. We want this to be fast and work even if
+# system dependencies are missing, since it's used to tell you what system
+# dependencies are missing! This also means that bindep must be installed
+# separately, outside of the requirements files.
+deps = bindep
+commands = bindep test
+
 [testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
 commands = flake8 {posargs}
 
 [testenv:cover]
diff --git a/zuul/ansible/__init__.py b/zuul/ansible/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/__init__.py
diff --git a/zuul/ansible/library/__init__.py b/zuul/ansible/library/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/library/__init__.py
diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py
new file mode 100644
index 0000000..78f3249
--- /dev/null
+++ b/zuul/ansible/library/zuul_console.py
@@ -0,0 +1,199 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software.  If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import socket
+import threading
+
+
+def daemonize():
+    # A really basic daemonize method that should work well enough for
+    # now in this circumstance. Based on the public domain code at:
+    # http://web.archive.org/web/20131017130434/http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
+
+    pid = os.fork()
+    if pid > 0:
+        return True
+
+    os.chdir('/')
+    os.setsid()
+    os.umask(0)
+
+    pid = os.fork()
+    if pid > 0:
+        sys.exit(0)
+
+    sys.stdout.flush()
+    sys.stderr.flush()
+    i = open('/dev/null', 'r')
+    o = open('/dev/null', 'a+')
+    e = open('/dev/null', 'a+', 0)
+    os.dup2(i.fileno(), sys.stdin.fileno())
+    os.dup2(o.fileno(), sys.stdout.fileno())
+    os.dup2(e.fileno(), sys.stderr.fileno())
+    return False
+
+
+class Console(object):
+    def __init__(self, path):
+        self.path = path
+        self.file = open(path)
+        self.stat = os.stat(path)
+        self.size = self.stat.st_size
+
+
+class Server(object):
+    def __init__(self, path, port):
+        self.path = path
+        s = None
+        for res in socket.getaddrinfo(None, port, socket.AF_UNSPEC,
+                                      socket.SOCK_STREAM, 0,
+                                      socket.AI_PASSIVE):
+            af, socktype, proto, canonname, sa = res
+            try:
+                s = socket.socket(af, socktype, proto)
+                s.setsockopt(socket.SOL_SOCKET,
+                             socket.SO_REUSEADDR, 1)
+            except socket.error:
+                s = None
+                continue
+            try:
+                s.bind(sa)
+                s.listen(1)
+            except socket.error:
+                s.close()
+                s = None
+                continue
+            break
+        if s is None:
+            sys.exit(1)
+        self.socket = s
+
+    def accept(self):
+        conn, addr = self.socket.accept()
+        return conn
+
+    def run(self):
+        while True:
+            conn = self.accept()
+            t = threading.Thread(target=self.handleOneConnection, args=(conn,))
+            t.daemon = True
+            t.start()
+
+    def chunkConsole(self, conn):
+        try:
+            console = Console(self.path)
+        except Exception:
+            return
+        while True:
+            chunk = console.file.read(4096)
+            if not chunk:
+                break
+            conn.send(chunk)
+        return console
+
+    def followConsole(self, console, conn):
+        while True:
+            # As long as we have unread data, keep reading/sending
+            while True:
+                chunk = console.file.read(4096)
+                if chunk:
+                    conn.send(chunk)
+                else:
+                    break
+
+            # At this point, we are waiting for more data to be written
+            time.sleep(0.5)
+
+            # Check to see if the remote end has sent any data, if so,
+            # discard
+            r, w, e = select.select([conn], [], [conn], 0)
+            if conn in e:
+                return False
+            if conn in r:
+                ret = conn.recv(1024)
+                # Discard anything read, if input is eof, it has
+                # disconnected.
+                if not ret:
+                    return False
+
+            # See if the file has been truncated
+            try:
+                st = os.stat(console.path)
+                if (st.st_ino != console.stat.st_ino or
+                    st.st_size < console.size):
+                    return True
+            except Exception:
+                return True
+            console.size = st.st_size
+
+    def handleOneConnection(self, conn):
+        # FIXME: this won't notice disconnects until it tries to send
+        console = None
+        try:
+            while True:
+                if console is not None:
+                    try:
+                        console.file.close()
+                    except:
+                        pass
+                while True:
+                    console = self.chunkConsole(conn)
+                    if console:
+                        break
+                    time.sleep(0.5)
+                while True:
+                    if self.followConsole(console, conn):
+                        break
+                    else:
+                        return
+        finally:
+            try:
+                conn.close()
+            except Exception:
+                pass
+
+
+def test():
+    s = Server('/tmp/console.html', 8088)
+    s.run()
+
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            path=dict(default='/tmp/console.html'),
+            port=dict(default=8088, type='int'),
+        )
+    )
+
+    p = module.params
+    path = p['path']
+    port = p['port']
+
+    if daemonize():
+        module.exit_json()
+
+    s = Server(path, port)
+    s.run()
+
+from ansible.module_utils.basic import *  # noqa
+
+if __name__ == '__main__':
+    main()
+# test()
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
new file mode 100644
index 0000000..4b377d9
--- /dev/null
+++ b/zuul/ansible/library/zuul_log.py
@@ -0,0 +1,58 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+# Copyright (c) 2016 Red Hat
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software.  If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+
+
+class Console(object):
+    def __enter__(self):
+        self.logfile = open('/tmp/console.html', 'a', 0)
+        return self
+
+    def __exit__(self, etype, value, tb):
+        self.logfile.close()
+
+    def addLine(self, ln):
+        ts = datetime.datetime.now()
+        outln = '%s | %s' % (str(ts), ln)
+        self.logfile.write(outln)
+
+
+def log(msg):
+    if not isinstance(msg, list):
+        msg = [msg]
+    with Console() as console:
+        for line in msg:
+            console.addLine("[Zuul] %s\n" % line)
+
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            msg=dict(required=True, type='raw'),
+        )
+    )
+
+    p = module.params
+    log(p['msg'])
+    module.exit_json(changed=True)
+
+from ansible.module_utils.basic import *  # noqa
+
+if __name__ == '__main__':
+    main()
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
new file mode 100644
index 0000000..7689fb3
--- /dev/null
+++ b/zuul/ansible/library/zuul_runner.py
@@ -0,0 +1,131 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software.  If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+import getpass
+import os
+import subprocess
+import threading
+
+
+class Console(object):
+    def __enter__(self):
+        self.logfile = open('/tmp/console.html', 'a', 0)
+        return self
+
+    def __exit__(self, etype, value, tb):
+        self.logfile.close()
+
+    def addLine(self, ln):
+        # Note this format with deliminator is "inspired" by the old
+        # Jenkins format but with microsecond resolution instead of
+        # millisecond.  It is kept so log parsing/formatting remains
+        # consistent.
+        ts = datetime.datetime.now()
+        outln = '%s | %s' % (ts, ln)
+        self.logfile.write(outln)
+
+
+def get_env():
+    env = {}
+    env['HOME'] = os.path.expanduser('~')
+    env['USER'] = getpass.getuser()
+
+    # Known locations for PAM mod_env sources
+    for fn in ['/etc/environment', '/etc/default/locale']:
+        if os.path.exists(fn):
+            with open(fn) as f:
+                for line in f:
+                    if not line:
+                        continue
+                    if line[0] == '#':
+                        continue
+                    if '=' not in line:
+                        continue
+                    k, v = line.strip().split('=')
+                    for q in ["'", '"']:
+                        if v[0] == q:
+                            v = v.strip(q)
+                    env[k] = v
+    return env
+
+
+def follow(fd):
+    newline_warning = False
+    with Console() as console:
+        while True:
+            line = fd.readline()
+            if not line:
+                break
+            if not line.endswith('\n'):
+                line += '\n'
+                newline_warning = True
+            console.addLine(line)
+        if newline_warning:
+            console.addLine('[Zuul] No trailing newline\n')
+
+
+def run(cwd, cmd, args):
+    env = get_env()
+    env.update(args)
+    proc = subprocess.Popen(
+        ['/bin/bash', '-l', '-c', cmd],
+        cwd=cwd,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.STDOUT,
+        env=env,
+    )
+
+    t = threading.Thread(target=follow, args=(proc.stdout,))
+    t.daemon = True
+    t.start()
+
+    ret = proc.wait()
+    # Give the thread that is writing the console log up to 10 seconds
+    # to catch up and exit.  If it hasn't done so by then, it is very
+    # likely stuck in readline() because it spawed a child that is
+    # holding stdout or stderr open.
+    t.join(10)
+    with Console() as console:
+        if t.isAlive():
+            console.addLine("[Zuul] standard output/error still open "
+                            "after child exited")
+        console.addLine("[Zuul] Task exit code: %s\n" % ret)
+    return ret
+
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            command=dict(required=True, default=None),
+            cwd=dict(required=True, default=None),
+            parameters=dict(default={}, type='dict')
+        )
+    )
+
+    p = module.params
+    env = p['parameters'].copy()
+    ret = run(p['cwd'], p['command'], env)
+    if ret == 0:
+        module.exit_json(changed=True, rc=ret)
+    else:
+        module.fail_json(msg="Exit code %s" % ret, rc=ret)
+
+from ansible.module_utils.basic import *  # noqa
+
+if __name__ == '__main__':
+    main()
diff --git a/zuul/ansible/plugins/__init__.py b/zuul/ansible/plugins/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/plugins/__init__.py
diff --git a/zuul/ansible/plugins/callback_plugins/__init__.py b/zuul/ansible/plugins/callback_plugins/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/plugins/callback_plugins/__init__.py
diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py
new file mode 100644
index 0000000..1cfd10d
--- /dev/null
+++ b/zuul/ansible/plugins/callback_plugins/timeout.py
@@ -0,0 +1,52 @@
+# Copyright 2016 IBM Corp.
+#
+# This file is part of Zuul
+#
+# This file is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This file is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this file.  If not, see <http://www.gnu.org/licenses/>.
+
+import time
+
+from ansible.executor.task_result import TaskResult
+from ansible.plugins.callback import CallbackBase
+
+
+class CallbackModule(CallbackBase):
+    def __init__(self, *args, **kw):
+        super(CallbackModule, self).__init__(*args, **kw)
+        self._elapsed_time = 0.0
+        self._task_start_time = None
+        self._play = None
+
+    def v2_playbook_on_play_start(self, play):
+        self._play = play
+
+    def playbook_on_task_start(self, name, is_conditional):
+        self._task_start_time = time.time()
+
+    def v2_on_any(self, *args, **kw):
+        result = None
+        if args and isinstance(args[0], TaskResult):
+            result = args[0]
+        if not result:
+            return
+
+        if self._task_start_time is not None:
+            task_time = time.time() - self._task_start_time
+            self._elapsed_time += task_time
+        if self._play and result._host:
+            manager = self._play.get_variable_manager()
+            facts = dict(elapsed_time=int(self._elapsed_time))
+
+            manager.set_nonpersistent_facts(result._host, facts)
+        self._task_start_time = None
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..5ffd431 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -14,8 +14,8 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import six
 from six.moves import configparser as ConfigParser
-import cStringIO
 import extras
 import logging
 import logging.config
@@ -47,7 +47,7 @@
             yappi.start()
         else:
             yappi.stop()
-            yappi_out = cStringIO.StringIO()
+            yappi_out = six.BytesIO()
             yappi.get_func_stats().print_all(out=yappi_out)
             yappi.get_thread_stats().print_all(out=yappi_out)
             log.debug(yappi_out.getvalue())
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 59ac419..1ce2828 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -154,7 +154,7 @@
         running_items = client.get_running_jobs()
 
         if len(running_items) == 0:
-            print "No jobs currently running"
+            print("No jobs currently running")
             return True
 
         all_fields = self._show_running_jobs_columns()
@@ -181,7 +181,7 @@
                         v += all_fields[f]['append']
                     values.append(v)
                 table.add_row(values)
-        print table
+        print(table)
         return True
 
     def _epoch_to_relative_time(self, epoch):
diff --git a/zuul/cmd/cloner.py b/zuul/cmd/cloner.py
index c616aa1..4f8b9f4 100755
--- a/zuul/cmd/cloner.py
+++ b/zuul/cmd/cloner.py
@@ -27,6 +27,8 @@
     'branch',
     'ref',
     'url',
+    'project',
+    'newrev',
 )
 
 
@@ -98,6 +100,10 @@
             parser.error("Specifying a Zuul ref requires a Zuul url. "
                          "Define Zuul arguments either via environment "
                          "variables or using options above.")
+        if 'zuul_newrev' in zuul_args and 'zuul_project' not in zuul_args:
+            parser.error("ZUUL_NEWREV has been specified without "
+                         "ZUUL_PROJECT. Please define a ZUUL_PROJECT or do "
+                         "not set ZUUL_NEWREV.")
 
         self.args = args
 
@@ -145,6 +151,8 @@
             clone_map_file=self.args.clone_map_file,
             project_branches=project_branches,
             cache_dir=self.args.cache_dir,
+            zuul_newrev=self.args.zuul_newrev,
+            zuul_project=self.args.zuul_project,
         )
         cloner.execute()
 
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
new file mode 100644
index 0000000..49643ae
--- /dev/null
+++ b/zuul/cmd/launcher.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+# Copyright 2012 Hewlett-Packard Development Company, L.P.
+# Copyright 2013-2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import daemon
+import extras
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+import logging
+import os
+import socket
+import sys
+import signal
+
+import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
+
+# No zuul imports that pull in paramiko here; it must not be
+# imported until after the daemonization.
+# https://github.com/paramiko/paramiko/issues/59
+# Similar situation with gear and statsd.
+
+
+class Launcher(zuul.cmd.ZuulApp):
+
+    def parse_arguments(self):
+        parser = argparse.ArgumentParser(description='Zuul launch worker.')
+        parser.add_argument('-c', dest='config',
+                            help='specify the config file')
+        parser.add_argument('-d', dest='nodaemon', action='store_true',
+                            help='do not run as a daemon')
+        parser.add_argument('--version', dest='version', action='version',
+                            version=self._get_version(),
+                            help='show zuul version')
+        parser.add_argument('--keep-jobdir', dest='keep_jobdir',
+                            action='store_true',
+                            help='keep local jobdirs after run completes')
+        parser.add_argument('command',
+                            choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
+                            nargs='?')
+
+        self.args = parser.parse_args()
+
+    def send_command(self, cmd):
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(path)
+        s.sendall('%s\n' % cmd)
+
+    def exit_handler(self):
+        self.launcher.stop()
+        self.launcher.join()
+
+    def main(self, daemon=True):
+        # See comment at top of file about zuul imports
+
+        self.setup_logging('launcher', 'log_config')
+
+        self.log = logging.getLogger("zuul.Launcher")
+
+        LaunchServer = zuul.launcher.ansiblelaunchserver.LaunchServer
+        self.launcher = LaunchServer(self.config,
+                                     keep_jobdir=self.args.keep_jobdir)
+        self.launcher.start()
+
+        signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
+        if daemon:
+            self.launcher.join()
+        else:
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking launcher to exit nicely...\n")
+                    self.exit_handler()
+                    sys.exit(0)
+
+
+def main():
+    server = Launcher()
+    server.parse_arguments()
+    server.read_config()
+
+    if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+        server.send_command(server.args.command)
+        sys.exit(0)
+
+    server.configure_connections()
+
+    if server.config.has_option('launcher', 'pidfile'):
+        pid_fn = os.path.expanduser(server.config.get('launcher', 'pidfile'))
+    else:
+        pid_fn = '/var/run/zuul-launcher/zuul-launcher.pid'
+    pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+    if server.args.nodaemon:
+        server.main(False)
+    else:
+        with daemon.DaemonContext(pidfile=pid):
+            server.main(True)
+
+
+if __name__ == "__main__":
+    sys.path.insert(0, '.')
+    main()
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index df215fd..797a990 100644
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -68,7 +68,7 @@
             try:
                 signal.pause()
             except KeyboardInterrupt:
-                print "Ctrl + C: asking merger to exit nicely...\n"
+                print("Ctrl + C: asking merger to exit nicely...\n")
                 self.exit_handler(signal.SIGINT, None)
 
 
@@ -89,9 +89,7 @@
         f.close()
         os.unlink(test_fn)
     except Exception:
-        print
-        print "Unable to write to state directory: %s" % state_dir
-        print
+        print("\nUnable to write to state directory: %s\n" % state_dir)
         raise
 
     if server.config.has_option('merger', 'pidfile'):
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 850fecb..187e59d 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -61,12 +61,9 @@
     def reconfigure_handler(self, signum, frame):
         signal.signal(signal.SIGHUP, signal.SIG_IGN)
         self.log.debug("Reconfiguration triggered")
-        self.sched.stopConnections()
         self.read_config()
         self.setup_logging('zuul', 'log_config')
         try:
-            self.configure_connections()
-            self.sched.registerConnections(self.connections)
             self.sched.reconfigure(self.config)
         except Exception:
             self.log.exception("Reconfiguration failed:")
@@ -89,7 +86,8 @@
         import zuul.trigger.gerrit
 
         logging.basicConfig(level=logging.DEBUG)
-        self.sched = zuul.scheduler.Scheduler(self.config)
+        self.sched = zuul.scheduler.Scheduler(self.config,
+                                              testonly=True)
         self.configure_connections()
         self.sched.registerConnections(self.connections, load=False)
         layout = self.sched.testConfig(self.config.get('zuul',
@@ -110,7 +108,7 @@
                 jobs.add(v)
         for job in sorted(layout.jobs):
             if job not in jobs:
-                print "Job %s not defined" % job
+                print("Job %s not defined" % job)
                 failure = True
         return failure
 
@@ -120,18 +118,18 @@
         if child_pid == 0:
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
-            import gear
+            import zuul.lib.gearserver
             statsd_host = os.environ.get('STATSD_HOST')
             statsd_port = int(os.environ.get('STATSD_PORT', 8125))
             if self.config.has_option('gearman_server', 'listen_address'):
                 host = self.config.get('gearman_server', 'listen_address')
             else:
                 host = None
-            gear.Server(4730,
-                        host=host,
-                        statsd_host=statsd_host,
-                        statsd_port=statsd_port,
-                        statsd_prefix='zuul.geard')
+            zuul.lib.gearserver.GearServer(4730,
+                                           host=host,
+                                           statsd_host=statsd_host,
+                                           statsd_port=statsd_port,
+                                           statsd_prefix='zuul.geard')
 
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
@@ -199,7 +197,7 @@
             try:
                 signal.pause()
             except KeyboardInterrupt:
-                print "Ctrl + C: asking scheduler to exit nicely...\n"
+                print("Ctrl + C: asking scheduler to exit nicely...\n")
                 self.exit_handler(signal.SIGINT, None)
 
 
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index f8e5add..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -18,11 +18,11 @@
 import json
 import time
 from six.moves import queue as Queue
+from six.moves import urllib
 import paramiko
 import logging
 import pprint
 import voluptuous as v
-import urllib2
 
 from zuul.connection import BaseConnection
 from zuul.model import TriggerEvent
@@ -32,7 +32,7 @@
     """Move events from Gerrit to the scheduler."""
 
     log = logging.getLogger("zuul.GerritEventConnector")
-    delay = 5.0
+    delay = 10.0
 
     def __init__(self, connection):
         super(GerritEventConnector, self).__init__()
@@ -94,7 +94,7 @@
         try:
             event.account = data.get(accountfield_from_type[event.type])
         except KeyError:
-            self.log.error("Received unrecognized event type '%s' from Gerrit.\
+            self.log.warning("Received unrecognized event type '%s' from Gerrit.\
                     Can not get account information." % event.type)
             event.account = None
 
@@ -132,6 +132,7 @@
 
 class GerritWatcher(threading.Thread):
     log = logging.getLogger("gerrit.GerritWatcher")
+    poll_timeout = 500
 
     def __init__(self, gerrit_connection, username, hostname, port=29418,
                  keyfile=None):
@@ -154,7 +155,7 @@
         poll = select.poll()
         poll.register(stdout.channel)
         while not self._stopped:
-            ret = poll.poll()
+            ret = poll.poll(self.poll_timeout)
             for (fd, event) in ret:
                 if fd == stdout.channel.fileno():
                     if event == select.POLLIN:
@@ -387,10 +388,10 @@
         url = "%s/p/%s/info/refs?service=git-upload-pack" % (
             self.baseurl, project)
         try:
-            data = urllib2.urlopen(url).read()
+            data = urllib.request.urlopen(url).read()
         except:
             self.log.error("Cannot get references from %s" % url)
-            raise  # keeps urllib2 error informations
+            raise  # keeps urllib error informations
         ret = {}
         read_headers = False
         read_advertisement = False
diff --git a/zuul/exceptions.py b/zuul/exceptions.py
index 2bd2c6b..40a1e40 100644
--- a/zuul/exceptions.py
+++ b/zuul/exceptions.py
@@ -22,5 +22,14 @@
         super(ChangeNotFound, self).__init__(message)
 
 
+class RevNotFound(Exception):
+    def __init__(self, project, rev):
+        self.project = project
+        self.revision = rev
+        message = ("Failed to checkout project '%s' at revision '%s'"
+                   % (self.project, self.revision))
+        super(RevNotFound, self).__init__(message)
+
+
 class MergeFailure(Exception):
     pass
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
new file mode 100644
index 0000000..95fc2fa
--- /dev/null
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -0,0 +1,1384 @@
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import logging
+import os
+import re
+import shutil
+import signal
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+import traceback
+import Queue
+import uuid
+
+import gear
+import yaml
+import jenkins_jobs.builder
+import jenkins_jobs.formatter
+import zmq
+
+import zuul.ansible.library
+import zuul.ansible.plugins.callback_plugins
+from zuul.lib import commandsocket
+
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
+            'verbose', 'unverbose']
+
+
+def boolify(x):
+    if isinstance(x, str):
+        return bool(int(x))
+    return bool(x)
+
+
+class LaunchGearWorker(gear.Worker):
+    def __init__(self, *args, **kw):
+        self.__launch_server = kw.pop('launch_server')
+        super(LaunchGearWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        workers = len(self.__launch_server.node_workers)
+        delay = (workers ** 2) / 1000.0
+        time.sleep(delay)
+        return super(LaunchGearWorker, self).handleNoop(packet)
+
+
+class NodeGearWorker(gear.Worker):
+    MASS_DO = 101
+
+    def sendMassDo(self, functions):
+        data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
+        self.broadcast_lock.acquire()
+        try:
+            p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.function(*self.args)
+
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
+
+
+class JobDir(object):
+    def __init__(self, keep=False):
+        self.keep = keep
+        self.root = tempfile.mkdtemp()
+        self.ansible_root = os.path.join(self.root, 'ansible')
+        os.makedirs(self.ansible_root)
+        self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
+        self.inventory = os.path.join(self.ansible_root, 'inventory')
+        self.playbook = os.path.join(self.ansible_root, 'playbook')
+        self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
+        self.config = os.path.join(self.ansible_root, 'ansible.cfg')
+        self.script_root = os.path.join(self.ansible_root, 'scripts')
+        self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
+        os.makedirs(self.script_root)
+        self.staging_root = os.path.join(self.root, 'staging')
+        os.makedirs(self.staging_root)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, etype, value, tb):
+        if not self.keep:
+            shutil.rmtree(self.root)
+
+
+class LaunchServer(object):
+    log = logging.getLogger("zuul.LaunchServer")
+    site_section_re = re.compile('site "(.*?)"')
+    node_section_re = re.compile('node "(.*?)"')
+
+    def __init__(self, config, keep_jobdir=False):
+        self.config = config
+        self.options = dict(
+            verbose=False
+        )
+        self.keep_jobdir = keep_jobdir
+        self.hostname = socket.gethostname()
+        self.registered_functions = set()
+        self.node_workers = {}
+        self.jobs = {}
+        self.builds = {}
+        self.zmq_send_queue = Queue.Queue()
+        self.termination_queue = Queue.Queue()
+        self.sites = {}
+        self.static_nodes = {}
+        self.command_map = dict(
+            reconfigure=self.reconfigure,
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            release=self.release,
+            graceful=self.graceful,
+            verbose=self.verboseOn,
+            unverbose=self.verboseOff,
+        )
+
+        if config.has_option('launcher', 'accept_nodes'):
+            self.accept_nodes = config.getboolean('launcher',
+                                                  'accept_nodes')
+        else:
+            self.accept_nodes = True
+        self.config_accept_nodes = self.accept_nodes
+
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
+        ansible_dir = os.path.join(state_dir, 'ansible')
+        plugins_dir = os.path.join(ansible_dir, 'plugins')
+        self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
+        if not os.path.exists(self.callback_dir):
+            os.makedirs(self.callback_dir)
+        self.library_dir = os.path.join(ansible_dir, 'library')
+        if not os.path.exists(self.library_dir):
+            os.makedirs(self.library_dir)
+
+        callback_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.plugins.callback_plugins.__file__))
+        for fn in os.listdir(callback_path):
+            shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
+
+        library_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.library.__file__))
+        for fn in os.listdir(library_path):
+            shutil.copy(os.path.join(library_path, fn), self.library_dir)
+
+        for section in config.sections():
+            m = self.site_section_re.match(section)
+            if m:
+                sitename = m.group(1)
+                d = {}
+                d['host'] = config.get(section, 'host')
+                d['user'] = config.get(section, 'user')
+                if config.has_option(section, 'pass'):
+                    d['pass'] = config.get(section, 'pass')
+                else:
+                    d['pass'] = ''
+                if config.has_option(section, 'root'):
+                    d['root'] = config.get(section, 'root')
+                else:
+                    d['root'] = '/'
+                self.sites[sitename] = d
+                continue
+            m = self.node_section_re.match(section)
+            if m:
+                nodename = m.group(1)
+                d = {}
+                d['name'] = nodename
+                d['host'] = config.get(section, 'host')
+                if config.has_option(section, 'description'):
+                    d['description'] = config.get(section, 'description')
+                else:
+                    d['description'] = ''
+                if config.has_option(section, 'labels'):
+                    d['labels'] = config.get(section, 'labels').split(',')
+                else:
+                    d['labels'] = []
+                self.static_nodes[nodename] = d
+                continue
+
+    def start(self):
+        self._gearman_running = True
+        self._zmq_running = True
+        self._reaper_running = True
+        self._command_running = True
+
+        # Setup ZMQ
+        self.zcontext = zmq.Context()
+        self.zsocket = self.zcontext.socket(zmq.PUB)
+        self.zsocket.bind("tcp://*:8888")
+
+        # Setup Gearman
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = LaunchGearWorker('Zuul Launch Server',
+                                       launch_server=self)
+        self.worker.addServer(server, port)
+        self.log.debug("Waiting for server")
+        self.worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+
+        # Start command socket
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
+        # Load JJB config
+        self.loadJobs()
+
+        # Start ZMQ worker thread
+        self.log.debug("Starting ZMQ processor")
+        self.zmq_thread = threading.Thread(target=self.runZMQ)
+        self.zmq_thread.daemon = True
+        self.zmq_thread.start()
+
+        # Start node worker reaper thread
+        self.log.debug("Starting reaper")
+        self.reaper_thread = threading.Thread(target=self.runReaper)
+        self.reaper_thread.daemon = True
+        self.reaper_thread.start()
+
+        # Start Gearman worker thread
+        self.log.debug("Starting worker")
+        self.gearman_thread = threading.Thread(target=self.run)
+        self.gearman_thread.daemon = True
+        self.gearman_thread.start()
+
+        # Start static workers
+        for node in self.static_nodes.values():
+            self.log.debug("Creating static node with arguments: %s" % (node,))
+            self._launchWorker(node)
+
+    def loadJobs(self):
+        self.log.debug("Loading jobs")
+        builder = JJB()
+        path = self.config.get('launcher', 'jenkins_jobs')
+        builder.load_files([path])
+        builder.parser.expandYaml()
+        unseen = set(self.jobs.keys())
+        for job in builder.parser.jobs:
+            builder.expandMacros(job)
+            self.jobs[job['name']] = job
+            unseen.discard(job['name'])
+        for name in unseen:
+            del self.jobs[name]
+
+    def register(self):
+        new_functions = set()
+        if self.accept_nodes:
+            new_functions.add("node_assign:zuul")
+        new_functions.add("stop:%s" % self.hostname)
+        new_functions.add("set_description:%s" % self.hostname)
+        new_functions.add("node_revoke:%s" % self.hostname)
+
+        for function in new_functions - self.registered_functions:
+            self.worker.registerFunction(function)
+        for function in self.registered_functions - new_functions:
+            self.worker.unRegisterFunction(function)
+        self.registered_functions = new_functions
+
+    def reconfigure(self):
+        self.log.debug("Reconfiguring")
+        self.loadJobs()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='reconfigure'))
+            except Exception:
+                self.log.exception("Exception sending reconfigure command "
+                                   "to worker:")
+        self.log.debug("Reconfiguration complete")
+
+    def pause(self):
+        self.log.debug("Pausing")
+        self.accept_nodes = False
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='pause'))
+            except Exception:
+                self.log.exception("Exception sending pause command "
+                                   "to worker:")
+        self.log.debug("Paused")
+
+    def unpause(self):
+        self.log.debug("Unpausing")
+        self.accept_nodes = self.config_accept_nodes
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='unpause'))
+            except Exception:
+                self.log.exception("Exception sending unpause command "
+                                   "to worker:")
+        self.log.debug("Unpaused")
+
+    def release(self):
+        self.log.debug("Releasing idle nodes")
+        for node in self.node_workers.values():
+            if node.name in self.static_nodes:
+                continue
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='release'))
+            except Exception:
+                self.log.exception("Exception sending release command "
+                                   "to worker:")
+        self.log.debug("Finished releasing idle nodes")
+
+    def graceful(self):
+        # Note: this is run in the command processing thread; no more
+        # external commands will be processed after this.
+        self.log.debug("Gracefully stopping")
+        self.pause()
+        self.release()
+        self.log.debug("Waiting for all builds to finish")
+        while self.builds:
+            time.sleep(5)
+        self.log.debug("All builds are finished")
+        self.stop()
+
+    def stop(self):
+        self.log.debug("Stopping")
+        # First, stop accepting new jobs
+        self._gearman_running = False
+        self._reaper_running = False
+        self.worker.shutdown()
+        # Then stop all of the workers
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.stop()
+            except Exception:
+                self.log.exception("Exception sending stop command to worker:")
+        # Stop ZMQ afterwords so that the send queue is flushed
+        self._zmq_running = False
+        self.zmq_send_queue.put(None)
+        self.zmq_send_queue.join()
+        # Stop command processing
+        self._command_running = False
+        self.command_socket.stop()
+        # Join the gearman thread which was stopped earlier.
+        self.gearman_thread.join()
+        # The command thread is joined in the join() method of this
+        # class, which is called by the command shell.
+        self.log.debug("Stopped")
+
+    def verboseOn(self):
+        self.log.debug("Enabling verbose mode")
+        self.options['verbose'] = True
+
+    def verboseOff(self):
+        self.log.debug("Disabling verbose mode")
+        self.options['verbose'] = False
+
+    def join(self):
+        self.command_thread.join()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get()
+                self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
+
+    def runZMQ(self):
+        while self._zmq_running or not self.zmq_send_queue.empty():
+            try:
+                item = self.zmq_send_queue.get()
+                self.log.debug("Got ZMQ event %s" % (item,))
+                if item is None:
+                    continue
+                self.zsocket.send(item)
+            except Exception:
+                self.log.exception("Exception while processing ZMQ events")
+            finally:
+                self.zmq_send_queue.task_done()
+
+    def run(self):
+        while self._gearman_running:
+            try:
+                job = self.worker.getJob()
+                try:
+                    if job.name.startswith('node_assign:'):
+                        self.log.debug("Got node_assign job: %s" % job.unique)
+                        self.assignNode(job)
+                    elif job.name.startswith('stop:'):
+                        self.log.debug("Got stop job: %s" % job.unique)
+                        self.stopJob(job)
+                    elif job.name.startswith('set_description:'):
+                        self.log.debug("Got set_description job: %s" %
+                                       job.unique)
+                        job.sendWorkComplete()
+                    elif job.name.startswith('node_revoke:'):
+                        self.log.debug("Got node_revoke job: %s" % job.unique)
+                        self.revokeNode(job)
+                    else:
+                        self.log.error("Unable to handle job %s" % job.name)
+                        job.sendWorkFail()
+                except Exception:
+                    self.log.exception("Exception while running job")
+                    job.sendWorkException(traceback.format_exc())
+            except gear.InterruptedError:
+                return
+            except Exception:
+                self.log.exception("Exception while getting job")
+
+    def assignNode(self, job):
+        args = json.loads(job.arguments)
+        self.log.debug("Assigned node with arguments: %s" % (args,))
+        self._launchWorker(args)
+        data = dict(manager=self.hostname)
+        job.sendWorkData(json.dumps(data))
+        job.sendWorkComplete()
+
+    def _launchWorker(self, args):
+        worker = NodeWorker(self.config, self.jobs, self.builds,
+                            self.sites, args['name'], args['host'],
+                            args['description'], args['labels'],
+                            self.hostname, self.zmq_send_queue,
+                            self.termination_queue, self.keep_jobdir,
+                            self.callback_dir, self.library_dir,
+                            self.options)
+        self.node_workers[worker.name] = worker
+
+        worker.thread = threading.Thread(target=worker.run)
+        worker.thread.start()
+
+    def revokeNode(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Revoke job with arguments: %s" % (args,))
+            name = args['name']
+            node = self.node_workers.get(name)
+            if not node:
+                self.log.debug("Unable to find worker %s" % (name,))
+                return
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='stop'))
+                else:
+                    self.log.debug("Node %s is not alive while revoking node" %
+                                   (node.name,))
+            except Exception:
+                self.log.exception("Exception sending stop command "
+                                   "to worker:")
+        finally:
+            job.sendWorkComplete()
+
+    def stopJob(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Stop job with arguments: %s" % (args,))
+            unique = args['number']
+            build_worker_name = self.builds.get(unique)
+            if not build_worker_name:
+                self.log.debug("Unable to find build for job %s" % (unique,))
+                return
+            node = self.node_workers.get(build_worker_name)
+            if not node:
+                self.log.debug("Unable to find worker for job %s" % (unique,))
+                return
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='abort'))
+                else:
+                    self.log.debug("Node %s is not alive while aborting job" %
+                                   (node.name,))
+            except Exception:
+                self.log.exception("Exception sending abort command "
+                                   "to worker:")
+        finally:
+            job.sendWorkComplete()
+
+    def runReaper(self):
+        # We don't actually care if all the events are processed
+        while self._reaper_running:
+            try:
+                item = self.termination_queue.get()
+                self.log.debug("Got termination event %s" % (item,))
+                if item is None:
+                    continue
+                worker = self.node_workers[item]
+                self.log.debug("Joining %s" % (item,))
+                worker.thread.join()
+                self.log.debug("Joined %s" % (item,))
+                del self.node_workers[item]
+            except Exception:
+                self.log.exception("Exception while processing "
+                                   "termination events:")
+            finally:
+                self.termination_queue.task_done()
+
+
+class NodeWorker(object):
+    def __init__(self, config, jobs, builds, sites, name, host,
+                 description, labels, manager_name, zmq_send_queue,
+                 termination_queue, keep_jobdir, callback_dir,
+                 library_dir, options):
+        self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
+        self.log.debug("Creating node worker %s" % (name,))
+        self.config = config
+        self.jobs = jobs
+        self.builds = builds
+        self.sites = sites
+        self.name = name
+        self.host = host
+        self.description = description
+        if not isinstance(labels, list):
+            labels = [labels]
+        self.labels = labels
+        self.thread = None
+        self.registered_functions = set()
+        # If the unpaused Event is set, that means we should run jobs.
+        # If it is clear, then we are paused and should not run jobs.
+        self.unpaused = threading.Event()
+        self.unpaused.set()
+        self._running = True
+        self.queue = Queue.Queue()
+        self.manager_name = manager_name
+        self.zmq_send_queue = zmq_send_queue
+        self.termination_queue = termination_queue
+        self.keep_jobdir = keep_jobdir
+        self.running_job_lock = threading.Lock()
+        self.pending_registration = False
+        self.registration_lock = threading.Lock()
+        self._get_job_lock = threading.Lock()
+        self._got_job = False
+        self._job_complete_event = threading.Event()
+        self._running_job = False
+        self._aborted_job = False
+        self._sent_complete_event = False
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
+        self.workspace_root = config.get('launcher', 'workspace_root')
+        if self.config.has_option('launcher', 'private_key_file'):
+            self.private_key_file = config.get('launcher', 'private_key_file')
+        else:
+            self.private_key_file = '~/.ssh/id_rsa'
+        if self.config.has_option('launcher', 'username'):
+            self.username = config.get('launcher', 'username')
+        else:
+            self.username = 'zuul'
+        self.callback_dir = callback_dir
+        self.library_dir = library_dir
+        self.options = options
+
+    def isAlive(self):
+        # Meant to be called from the manager
+        if self.thread and self.thread.is_alive():
+            return True
+        return False
+
+    def run(self):
+        self.log.debug("Node worker %s starting" % (self.name,))
+        server = self.config.get('gearman', 'server')
+        if self.config.has_option('gearman', 'port'):
+            port = self.config.get('gearman', 'port')
+        else:
+            port = 4730
+        self.worker = NodeGearWorker(self.name)
+        self.worker.addServer(server, port)
+        self.log.debug("Waiting for server")
+        self.worker.waitForServer()
+        self.log.debug("Registering")
+        self.register()
+
+        self.gearman_thread = threading.Thread(target=self.runGearman)
+        self.gearman_thread.daemon = True
+        self.gearman_thread.start()
+
+        self.log.debug("Started")
+
+        while self._running or not self.queue.empty():
+            try:
+                self._runQueue()
+            except Exception:
+                self.log.exception("Exception in queue manager:")
+
+    def stop(self):
+        # If this is called locally, setting _running will be
+        # effictive, if it's called remotely, it will not be, but it
+        # will be set by the queue thread.
+        self.log.debug("Submitting stop request")
+        self._running = False
+        self.unpaused.set()
+        self.queue.put(dict(action='stop'))
+        self.queue.join()
+
+    def pause(self):
+        self.unpaused.clear()
+        self.worker.stopWaitingForJobs()
+
+    def unpause(self):
+        self.unpaused.set()
+
+    def release(self):
+        # If this node is idle, stop it.
+        old_unpaused = self.unpaused.is_set()
+        if old_unpaused:
+            self.pause()
+        with self._get_job_lock:
+            if self._got_job:
+                self.log.debug("This worker is not idle")
+                if old_unpaused:
+                    self.unpause()
+                return
+        self.log.debug("Stopping due to release command")
+        self.queue.put(dict(action='stop'))
+
+    def _runQueue(self):
+        item = self.queue.get()
+        try:
+            if item['action'] == 'stop':
+                self.log.debug("Received stop request")
+                self._running = False
+                self.termination_queue.put(self.name)
+                if not self.abortRunningJob():
+                    self.sendFakeCompleteEvent()
+                else:
+                    self._job_complete_event.wait()
+                self.worker.shutdown()
+            if item['action'] == 'pause':
+                self.log.debug("Received pause request")
+                self.pause()
+            if item['action'] == 'unpause':
+                self.log.debug("Received unpause request")
+                self.unpause()
+            if item['action'] == 'release':
+                self.log.debug("Received release request")
+                self.release()
+            elif item['action'] == 'reconfigure':
+                self.log.debug("Received reconfigure request")
+                self.register()
+            elif item['action'] == 'abort':
+                self.log.debug("Received abort request")
+                self.abortRunningJob()
+        finally:
+            self.queue.task_done()
+
+    def runGearman(self):
+        while self._running:
+            try:
+                self.unpaused.wait()
+                if self._running:
+                    self._runGearman()
+            except Exception:
+                self.log.exception("Exception in gearman manager:")
+            with self._get_job_lock:
+                self._got_job = False
+
+    def _runGearman(self):
+        if self.pending_registration:
+            self.register()
+        with self._get_job_lock:
+            try:
+                job = self.worker.getJob()
+                self._got_job = True
+            except gear.InterruptedError:
+                return
+        self.log.debug("Node worker %s got job %s" % (self.name, job.name))
+        try:
+            if job.name not in self.registered_functions:
+                self.log.error("Unable to handle job %s" % job.name)
+                job.sendWorkFail()
+                return
+            self.launch(job)
+        except Exception:
+            self.log.exception("Exception while running job")
+            job.sendWorkException(traceback.format_exc())
+
+    def generateFunctionNames(self, job):
+        # This only supports "node: foo" and "node: foo || bar"
+        ret = set()
+        job_labels = job.get('node')
+        matching_labels = set()
+        if job_labels:
+            job_labels = [x.strip() for x in job_labels.split('||')]
+            matching_labels = set(self.labels) & set(job_labels)
+            if not matching_labels:
+                return ret
+        ret.add('build:%s' % (job['name'],))
+        for label in matching_labels:
+            ret.add('build:%s:%s' % (job['name'], label))
+        return ret
+
+    def register(self):
+        if not self.registration_lock.acquire(False):
+            self.log.debug("Registration already in progress")
+            return
+        try:
+            if self._running_job:
+                self.pending_registration = True
+                self.log.debug("Ignoring registration due to running job")
+                return
+            self.log.debug("Updating registration")
+            self.pending_registration = False
+            new_functions = set()
+            for job in self.jobs.values():
+                new_functions |= self.generateFunctionNames(job)
+            self.worker.sendMassDo(new_functions)
+            self.registered_functions = new_functions
+        finally:
+            self.registration_lock.release()
+
+    def abortRunningJob(self):
+        self._aborted_job = True
+        return self.abortRunningProc(self.ansible_job_proc)
+
+    def abortRunningProc(self, proc):
+        aborted = False
+        self.log.debug("Abort: acquiring job lock")
+        with self.running_job_lock:
+            if self._running_job:
+                self.log.debug("Abort: a job is running")
+                if proc:
+                    self.log.debug("Abort: sending kill signal to job "
+                                   "process group")
+                    try:
+                        pgid = os.getpgid(proc.pid)
+                        os.killpg(pgid, signal.SIGKILL)
+                        aborted = True
+                    except Exception:
+                        self.log.exception("Exception while killing "
+                                           "ansible process:")
+            else:
+                self.log.debug("Abort: no job is running")
+
+        return aborted
+
+    def launch(self, job):
+        self.log.info("Node worker %s launching job %s" %
+                      (self.name, job.name))
+
+        # Make sure we can parse what we need from the job first
+        args = json.loads(job.arguments)
+        offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
+        job_name = job.name.split(':')[1]
+
+        # Initialize the result so we have something regardless of
+        # whether the job actually runs
+        result = None
+        self._sent_complete_event = False
+        self._aborted_job = False
+
+        try:
+            self.sendStartEvent(job_name, args)
+        except Exception:
+            self.log.exception("Exception while sending job start event")
+
+        try:
+            result = self.runJob(job, args)
+        except Exception:
+            self.log.exception("Exception while launching job thread")
+
+        self._running_job = False
+
+        try:
+            data = json.dumps(dict(result=result))
+            job.sendWorkComplete(data)
+        except Exception:
+            self.log.exception("Exception while sending job completion packet")
+
+        try:
+            self.sendCompleteEvent(job_name, result, args)
+        except Exception:
+            self.log.exception("Exception while sending job completion event")
+
+        try:
+            del self.builds[job.unique]
+        except Exception:
+            self.log.exception("Exception while clearing build record")
+
+        self._job_complete_event.set()
+        if offline and self._running:
+            self.stop()
+
+    def sendStartEvent(self, name, parameters):
+        build = dict(node_name=self.name,
+                     host_name=self.manager_name,
+                     parameters=parameters)
+
+        event = dict(name=name,
+                     build=build)
+
+        item = "onStarted %s" % json.dumps(event)
+        self.log.debug("Sending over ZMQ: %s" % (item,))
+        self.zmq_send_queue.put(item)
+
+    def sendCompleteEvent(self, name, status, parameters):
+        build = dict(status=status,
+                     node_name=self.name,
+                     host_name=self.manager_name,
+                     parameters=parameters)
+
+        event = dict(name=name,
+                     build=build)
+
+        item = "onFinalized %s" % json.dumps(event)
+        self.log.debug("Sending over ZMQ: %s" % (item,))
+        self.zmq_send_queue.put(item)
+        self._sent_complete_event = True
+
+    def sendFakeCompleteEvent(self):
+        if self._sent_complete_event:
+            return
+        self.sendCompleteEvent('zuul:launcher-shutdown',
+                               'SUCCESS', {})
+
+    def runJob(self, job, args):
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
+        result = None
+        with self.running_job_lock:
+            if not self._running:
+                return result
+            self._running_job = True
+            self._job_complete_event.clear()
+
+        self.log.debug("Job %s: beginning" % (job.unique,))
+        self.builds[job.unique] = self.name
+        with JobDir(self.keep_jobdir) as jobdir:
+            self.log.debug("Job %s: job root at %s" %
+                           (job.unique, jobdir.root))
+            timeout = self.prepareAnsibleFiles(jobdir, job, args)
+
+            data = {
+                'manager': self.manager_name,
+                'number': job.unique,
+                'url': 'telnet://%s:8088' % self.host,
+            }
+            job.sendWorkData(json.dumps(data))
+            job.sendWorkStatus(0, 100)
+
+            job_status = self.runAnsiblePlaybook(jobdir, timeout)
+            if job_status is None:
+                # The result of the job is indeterminate.  Zuul will
+                # run it again.
+                return result
+
+            post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
+            if not post_status:
+                result = 'POST_FAILURE'
+            elif job_status:
+                result = 'SUCCESS'
+            else:
+                result = 'FAILURE'
+
+            if self._aborted_job:
+                # A Null result will cause zuul to relaunch the job if
+                # it needs to.
+                result = None
+
+        return result
+
+    def getHostList(self):
+        return [('node', dict(
+            ansible_host=self.host, ansible_user=self.username))]
+
+    def _substituteVariables(self, text, variables):
+        def lookup(match):
+            return variables.get(match.group(1), '')
+        return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+
+    def _getRsyncOptions(self, source, parameters):
+        # Treat the publisher source as a filter; ant and rsync behave
+        # fairly close in this manner, except for leading directories.
+        source = self._substituteVariables(source, parameters)
+        # If the source starts with ** then we want to match any
+        # number of directories, so don't anchor the include filter.
+        # If it does not start with **, then the intent is likely to
+        # at least start by matching an immediate file or subdirectory
+        # (even if later we have a ** in the middle), so in this case,
+        # anchor it to the root of the transfer (the workspace).
+        if not source.startswith('**'):
+            source = os.path.join('/', source)
+        # These options mean: include the thing we want, include any
+        # directories (so that we continue to search for the thing we
+        # want no matter how deep it is), exclude anything that
+        # doesn't match the thing we want or is a directory, then get
+        # rid of empty directories left over at the end.
+        rsync_opts = ['--include="%s"' % source,
+                      '--include="*/"',
+                      '--exclude="*"',
+                      '--prune-empty-dirs']
+        return rsync_opts
+
+    def _makeSCPTask(self, jobdir, publisher, parameters):
+        tasks = []
+        for scpfile in publisher['scp']['files']:
+            scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+            os.chmod(scproot, 0o755)
+
+            site = publisher['scp']['site']
+            if scpfile.get('copy-console'):
+                # Include the local ansible directory in the console
+                # upload.  This uploads the playbook and ansible logs.
+                copyargs = dict(src=jobdir.ansible_root + '/',
+                                dest=os.path.join(scproot, '_zuul_ansible'))
+                task = dict(copy=copyargs,
+                            delegate_to='127.0.0.1')
+                tasks.append(task)
+
+                # Fetch the console log from the remote host.
+                src = '/tmp/console.html'
+                rsync_opts = []
+            else:
+                src = parameters['WORKSPACE']
+                if not src.endswith('/'):
+                    src = src + '/'
+                rsync_opts = self._getRsyncOptions(scpfile['source'],
+                                                   parameters)
+
+            syncargs = dict(src=src,
+                            dest=scproot,
+                            copy_links='yes',
+                            mode='pull')
+            if rsync_opts:
+                syncargs['rsync_opts'] = rsync_opts
+            task = dict(synchronize=syncargs)
+            if not scpfile.get('copy-after-failure'):
+                task['when'] = 'success'
+            tasks.append(task)
+
+            task = self._makeSCPTaskLocalAction(
+                site, scpfile, scproot, parameters)
+            tasks.append(task)
+        return tasks
+
+    def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
+        if site not in self.sites:
+            raise Exception("Undefined SCP site: %s" % (site,))
+        site = self.sites[site]
+        dest = scpfile['target'].lstrip('/')
+        dest = self._substituteVariables(dest, parameters)
+        dest = os.path.join(site['root'], dest)
+        dest = os.path.normpath(dest)
+        if not dest.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (dest,))
+
+        rsync_cmd = [
+            '/usr/bin/rsync', '--delay-updates', '-F',
+            '--compress', '-rt', '--safe-links',
+            '--rsync-path="mkdir -p {dest} && rsync"',
+            '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
+            '-o StrictHostKeyChecking=no -q"',
+            '--out-format="<<CHANGED>>%i %n%L"',
+            '{source}', '"{user}@{host}:{dest}"'
+        ]
+        if scpfile.get('keep-hierarchy'):
+            source = '"%s/"' % scproot
+        else:
+            source = '`/usr/bin/find "%s" -type f`' % scproot
+        shellargs = ' '.join(rsync_cmd).format(
+            source=source,
+            dest=dest,
+            private_key_file=self.private_key_file,
+            host=site['host'],
+            user=site['user'])
+        task = dict(shell=shellargs,
+                    delegate_to='127.0.0.1')
+        if not scpfile.get('copy-after-failure'):
+            task['when'] = 'success'
+
+        return task
+
+    def _makeFTPTask(self, jobdir, publisher, parameters):
+        tasks = []
+        ftp = publisher['ftp']
+        site = ftp['site']
+        if site not in self.sites:
+            raise Exception("Undefined FTP site: %s" % site)
+        site = self.sites[site]
+
+        ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+        ftpcontent = os.path.join(ftproot, 'content')
+        os.makedirs(ftpcontent)
+        ftpscript = os.path.join(ftproot, 'script')
+
+        src = parameters['WORKSPACE']
+        if not src.endswith('/'):
+            src = src + '/'
+        rsync_opts = self._getRsyncOptions(ftp['source'],
+                                           parameters)
+        syncargs = dict(src=src,
+                        dest=ftpcontent,
+                        copy_links='yes',
+                        mode='pull')
+        if rsync_opts:
+            syncargs['rsync_opts'] = rsync_opts
+        task = dict(synchronize=syncargs,
+                    when='success')
+        tasks.append(task)
+        task = dict(shell='lftp -f %s' % ftpscript,
+                    when='success',
+                    delegate_to='127.0.0.1')
+        ftpsource = ftpcontent
+        if ftp.get('remove-prefix'):
+            ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
+        while ftpsource[-1] == '/':
+            ftpsource = ftpsource[:-1]
+        ftptarget = ftp['target'].lstrip('/')
+        ftptarget = self._substituteVariables(ftptarget, parameters)
+        ftptarget = os.path.join(site['root'], ftptarget)
+        ftptarget = os.path.normpath(ftptarget)
+        if not ftptarget.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (ftptarget,))
+        while ftptarget[-1] == '/':
+            ftptarget = ftptarget[:-1]
+        with open(ftpscript, 'w') as script:
+            script.write('open %s\n' % site['host'])
+            script.write('user %s %s\n' % (site['user'], site['pass']))
+            script.write('mirror -R %s %s\n' % (ftpsource, ftptarget))
+        tasks.append(task)
+        return tasks
+
+    def _makeBuilderTask(self, jobdir, builder, parameters):
+        tasks = []
+        script_fn = '%s.sh' % str(uuid.uuid4().hex)
+        script_path = os.path.join(jobdir.script_root, script_fn)
+        with open(script_path, 'w') as script:
+            data = builder['shell']
+            if not data.startswith('#!'):
+                data = '#!/bin/bash -x\n %s' % (data,)
+            script.write(data)
+
+        remote_path = os.path.join('/tmp', script_fn)
+        copy = dict(src=script_path,
+                    dest=remote_path,
+                    mode=0o555)
+        task = dict(copy=copy)
+        tasks.append(task)
+
+        runner = dict(command=remote_path,
+                      cwd=parameters['WORKSPACE'],
+                      parameters=parameters)
+        task = dict(zuul_runner=runner)
+        task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
+                        'second timeout')
+        task['when'] = '{{ elapsed_time < timeout | int }}'
+        task['async'] = '{{ timeout | int - elapsed_time }}'
+        task['poll'] = 5
+        tasks.append(task)
+
+        filetask = dict(path=remote_path,
+                        state='absent')
+        task = dict(file=filetask)
+        tasks.append(task)
+
+        return tasks
+
+    def _transformPublishers(self, jjb_job):
+        early_publishers = []
+        late_publishers = []
+        old_publishers = jjb_job.get('publishers', [])
+        for publisher in old_publishers:
+            early_scpfiles = []
+            late_scpfiles = []
+            if 'scp' not in publisher:
+                early_publishers.append(publisher)
+                continue
+            copy_console = False
+            for scpfile in publisher['scp']['files']:
+                if scpfile.get('copy-console'):
+                    scpfile['keep-hierarchy'] = True
+                    late_scpfiles.append(scpfile)
+                    copy_console = True
+                else:
+                    early_scpfiles.append(scpfile)
+            publisher['scp']['files'] = early_scpfiles + late_scpfiles
+            if copy_console:
+                late_publishers.append(publisher)
+            else:
+                early_publishers.append(publisher)
+        publishers = early_publishers + late_publishers
+        if old_publishers != publishers:
+            self.log.debug("Transformed job publishers")
+        return early_publishers, late_publishers
+
+    def prepareAnsibleFiles(self, jobdir, gearman_job, args):
+        job_name = gearman_job.name.split(':')[1]
+        jjb_job = self.jobs[job_name]
+
+        parameters = args.copy()
+        parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
+
+        with open(jobdir.inventory, 'w') as inventory:
+            for host_name, host_vars in self.getHostList():
+                inventory.write(host_name)
+                for k, v in host_vars.items():
+                    inventory.write(' %s=%s' % (k, v))
+                inventory.write('\n')
+
+        timeout = None
+        timeout_var = None
+        for wrapper in jjb_job.get('wrappers', []):
+            if isinstance(wrapper, dict):
+                build_timeout = wrapper.get('timeout')
+                if isinstance(build_timeout, dict):
+                    timeout_var = build_timeout.get('timeout-var')
+                    timeout = build_timeout.get('timeout')
+                    if timeout is not None:
+                        timeout = int(timeout) * 60
+        if not timeout:
+            timeout = ANSIBLE_DEFAULT_TIMEOUT
+        if timeout_var:
+            parameters[timeout_var] = str(timeout * 1000)
+
+        with open(jobdir.playbook, 'w') as playbook:
+            pre_tasks = []
+            tasks = []
+            main_block = []
+            error_block = []
+            variables = []
+
+            shellargs = "ssh-keyscan %s > %s" % (
+                self.host, jobdir.known_hosts)
+            pre_tasks.append(dict(shell=shellargs,
+                             delegate_to='127.0.0.1'))
+
+            tasks.append(dict(block=main_block,
+                              rescue=error_block))
+
+            task = dict(file=dict(path='/tmp/console.html', state='absent'))
+            main_block.append(task)
+
+            task = dict(zuul_console=dict(path='/tmp/console.html', port=8088))
+            main_block.append(task)
+
+            task = dict(file=dict(path=parameters['WORKSPACE'],
+                                  state='directory'))
+            main_block.append(task)
+
+            msg = [
+                "Launched by %s" % self.manager_name,
+                "Building remotely on %s in workspace %s" % (
+                    self.name, parameters['WORKSPACE'])]
+            task = dict(zuul_log=dict(msg=msg))
+            main_block.append(task)
+
+            for builder in jjb_job.get('builders', []):
+                if 'shell' in builder:
+                    main_block.extend(
+                        self._makeBuilderTask(jobdir, builder, parameters))
+            task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
+            main_block.append(task)
+
+            task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
+            error_block.append(task)
+            error_block.append(dict(fail=dict(msg='FAILURE')))
+
+            variables.append(dict(timeout=timeout))
+            play = dict(hosts='node', name='Job body', vars=variables,
+                        pre_tasks=pre_tasks, tasks=tasks)
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+        early_publishers, late_publishers = self._transformPublishers(jjb_job)
+
+        with open(jobdir.post_playbook, 'w') as playbook:
+            blocks = []
+            for publishers in [early_publishers, late_publishers]:
+                block = []
+                for publisher in publishers:
+                    if 'scp' in publisher:
+                        block.extend(self._makeSCPTask(jobdir, publisher,
+                                                       parameters))
+                    if 'ftp' in publisher:
+                        block.extend(self._makeFTPTask(jobdir, publisher,
+                                                       parameters))
+                blocks.append(block)
+
+            # The 'always' section contains the log publishing tasks,
+            # the 'block' contains all the other publishers.  This way
+            # we run the log publisher regardless of whether the rest
+            # of the publishers succeed.
+            tasks = []
+            tasks.append(dict(block=blocks[0],
+                              always=blocks[1]))
+
+            play = dict(hosts='node', name='Publishers',
+                        tasks=tasks)
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+        with open(jobdir.config, 'w') as config:
+            config.write('[defaults]\n')
+            config.write('hostfile = %s\n' % jobdir.inventory)
+            config.write('keep_remote_files = True\n')
+            config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root)
+            config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
+            config.write('log_path = %s\n' % jobdir.ansible_log)
+            config.write('gathering = explicit\n')
+            config.write('callback_plugins = %s\n' % self.callback_dir)
+            config.write('library = %s\n' % self.library_dir)
+
+            config.write('[ssh_connection]\n')
+            ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
+                "-o UserKnownHostsFile=%s" % jobdir.known_hosts
+            config.write('ssh_args = %s\n' % ssh_args)
+
+        return timeout
+
+    def _ansibleTimeout(self, proc, msg):
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+
+    def runAnsiblePlaybook(self, jobdir, timeout):
+        # Set LOGNAME env variable so Ansible log_path log reports
+        # the correct user.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+
+        if self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+
+        cmd = ['ansible-playbook', jobdir.playbook, verbose]
+        self.log.debug("Ansible command: %s" % (cmd,))
+
+        self.ansible_job_proc = subprocess.Popen(
+            cmd,
+            cwd=jobdir.ansible_root,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            preexec_fn=os.setsid,
+            env=env_copy,
+        )
+        ret = None
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (self.ansible_job_proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_job_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = self.ansible_job_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
+        self.ansible_job_proc = None
+        if ret == 3:
+            # AnsibleHostUnreachable: We had a network issue connecting to
+            # our zuul-worker.
+            return None
+        elif ret == -9:
+            # Received abort request.
+            return None
+        return ret == 0
+
+    def runAnsiblePostPlaybook(self, jobdir, success):
+        # Set LOGNAME env variable so Ansible log_path log reports
+        # the correct user.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+
+        if self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+
+        cmd = ['ansible-playbook', jobdir.post_playbook,
+               '-e', 'success=%s' % success, verbose]
+        self.log.debug("Ansible post command: %s" % (cmd,))
+
+        self.ansible_post_proc = subprocess.Popen(
+            cmd,
+            cwd=jobdir.ansible_root,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            preexec_fn=os.setsid,
+            env=env_copy,
+        )
+        ret = None
+        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+                            self._ansibleTimeout,
+                            (self.ansible_post_proc,
+                             "Ansible post timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_post_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible post output: %s" % (line,))
+            ret = self.ansible_post_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible post exit code: %s" % (ret,))
+        self.ansible_post_proc = None
+        return ret == 0
+
+
+class JJB(jenkins_jobs.builder.Builder):
+    def __init__(self):
+        self.global_config = None
+        self._plugins_list = []
+
+    def expandComponent(self, component_type, component, template_data):
+        component_list_type = component_type + 's'
+        new_components = []
+        if isinstance(component, dict):
+            name, component_data = next(iter(component.items()))
+            if template_data:
+                component_data = jenkins_jobs.formatter.deep_format(
+                    component_data, template_data, True)
+        else:
+            name = component
+            component_data = {}
+
+        new_component = self.parser.data.get(component_type, {}).get(name)
+        if new_component:
+            for new_sub_component in new_component[component_list_type]:
+                new_components.extend(
+                    self.expandComponent(component_type,
+                                         new_sub_component, component_data))
+        else:
+            new_components.append({name: component_data})
+        return new_components
+
+    def expandMacros(self, job):
+        for component_type in ['builder', 'publisher', 'wrapper']:
+            component_list_type = component_type + 's'
+            new_components = []
+            for new_component in job.get(component_list_type, []):
+                new_components.extend(self.expandComponent(component_type,
+                                                           new_component, {}))
+            job[component_list_type] = new_components
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 69fb71b..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
 import json
 import logging
 import os
+import six
 import time
 import threading
 from uuid import uuid4
@@ -164,6 +165,11 @@
             port = config.get('gearman', 'port')
         else:
             port = 4730
+        if config.has_option('gearman', 'check_job_registration'):
+            self.job_registration = config.getboolean(
+                'gearman', 'check_job_registration')
+        else:
+            self.job_registration = True
 
         self.gearman = ZuulGearmanClient(self)
         self.gearman.addServer(server, port)
@@ -231,7 +237,7 @@
                 s_config = {}
                 s_config.update((k, v.format(item=item, job=job,
                                              change=item.change))
-                                if isinstance(v, basestring)
+                                if isinstance(v, six.string_types)
                                 else (k, v)
                                 for k, v in s.items())
 
@@ -351,7 +357,8 @@
         build.__gearman_job = gearman_job
         self.builds[uuid] = build
 
-        if not self.isJobRegistered(gearman_job.name):
+        if self.job_registration and not self.isJobRegistered(
+                gearman_job.name):
             self.log.error("Job %s is not registered with Gearman" %
                            gearman_job)
             self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@@ -456,9 +463,6 @@
                 build.number = data.get('number')
                 build.__gearman_manager = data.get('manager')
                 self.sched.onBuildStarted(build)
-
-            if job.denominator:
-                build.estimated_time = float(job.denominator) / 1000
         else:
             self.log.error("Unable to find build %s" % job.unique)
 
@@ -505,7 +509,7 @@
             # us where the job is running.
             return False
 
-        if not self.isJobRegistered(name):
+        if self.job_registration and not self.isJobRegistered(name):
             return False
 
         desc_uuid = str(uuid4().hex)
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
 import os
 import re
 
+import six
+
+
 OrderedDict = extras.try_imports(['collections.OrderedDict',
                                   'ordereddict.OrderedDict'])
 
@@ -59,17 +62,17 @@
             raise Exception("Expansion error. Check error messages above")
 
         self.log.info("Mapping projects to workspace...")
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             dest = os.path.normpath(os.path.join(workspace, dest[0]))
             ret[project] = dest
             self.log.info("  %s -> %s", project, dest)
 
         self.log.debug("Checking overlap in destination directories...")
         check = defaultdict(list)
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             check[dest].append(project)
 
-        dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+        dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
         if dupes:
             raise Exception("Some projects share the same destination: %s",
                             dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index 257b95d..3c7f04d 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,7 +19,10 @@
 import re
 import yaml
 
+import six
+
 from git import GitCommandError
+from zuul import exceptions
 from zuul.lib.clonemapper import CloneMapper
 from zuul.merger.merger import Repo
 
@@ -29,7 +32,8 @@
 
     def __init__(self, git_base_url, projects, workspace, zuul_branch,
                  zuul_ref, zuul_url, branch=None, clone_map_file=None,
-                 project_branches=None, cache_dir=None):
+                 project_branches=None, cache_dir=None, zuul_newrev=None,
+                 zuul_project=None):
 
         self.clone_map = []
         self.dests = None
@@ -43,6 +47,10 @@
         self.zuul_ref = zuul_ref or ''
         self.zuul_url = zuul_url
         self.project_branches = project_branches or {}
+        self.project_revisions = {}
+
+        if zuul_newrev and zuul_project:
+            self.project_revisions[zuul_project] = zuul_newrev
 
         if clone_map_file:
             self.readCloneMap(clone_map_file)
@@ -62,7 +70,7 @@
         dests = mapper.expand(workspace=self.workspace)
 
         self.log.info("Preparing %s repositories", len(dests))
-        for project, dest in dests.iteritems():
+        for project, dest in six.iteritems(dests):
             self.prepareRepo(project, dest)
         self.log.info("Prepared all repositories")
 
@@ -103,7 +111,14 @@
             repo.fetchFrom(zuul_remote, ref)
             self.log.debug("Fetched ref %s from %s", ref, project)
             return True
-        except (ValueError, GitCommandError):
+        except ValueError:
+            self.log.debug("Project %s in Zuul does not have ref %s",
+                           project, ref)
+            return False
+        except GitCommandError as error:
+            # Bail out if fetch fails due to infrastructure reasons
+            if error.stderr.startswith('fatal: unable to access'):
+                raise
             self.log.debug("Project %s in Zuul does not have ref %s",
                            project, ref)
             return False
@@ -112,10 +127,15 @@
         """Clone a repository for project at dest and apply a reference
         suitable for testing. The reference lookup is attempted in this order:
 
-         1) Zuul reference for the indicated branch
-         2) Zuul reference for the master branch
-         3) The tip of the indicated branch
-         4) The tip of the master branch
+         1) The indicated revision for specific project
+         2) Zuul reference for the indicated branch
+         3) Zuul reference for the master branch
+         4) The tip of the indicated branch
+         5) The tip of the master branch
+
+        If an "indicated revision" is specified for this project, and we are
+        unable to meet this requirement, we stop attempting to check this
+        repo out and raise a zuul.exceptions.RevNotFound exception.
 
         The "indicated branch" is one of the following:
 
@@ -135,6 +155,10 @@
         # `git branch` is happy with.
         repo.reset()
 
+        indicated_revision = None
+        if project in self.project_revisions:
+            indicated_revision = self.project_revisions[project]
+
         indicated_branch = self.branch or self.zuul_branch
         if project in self.project_branches:
             indicated_branch = self.project_branches[project]
@@ -160,13 +184,26 @@
         else:
             fallback_zuul_ref = None
 
+        # If the user has requested an explicit revision to be checked out,
+        # we use it above all else, and if we cannot satisfy this requirement
+        # we raise an error and do not attempt to continue.
+        if indicated_revision:
+            self.log.info("Attempting to check out revision %s for "
+                          "project %s", indicated_revision, project)
+            try:
+                self.fetchFromZuul(repo, project, self.zuul_ref)
+                commit = repo.checkout(indicated_revision)
+            except (ValueError, GitCommandError):
+                raise exceptions.RevNotFound(project, indicated_revision)
+            self.log.info("Prepared '%s' repo at revision '%s'", project,
+                          indicated_revision)
         # If we have a non empty zuul_ref to use, use it. Otherwise we fall
         # back to checking out the branch.
-        if ((override_zuul_ref and
-            self.fetchFromZuul(repo, project, override_zuul_ref)) or
-            (fallback_zuul_ref and
-             fallback_zuul_ref != override_zuul_ref and
-            self.fetchFromZuul(repo, project, fallback_zuul_ref))):
+        elif ((override_zuul_ref and
+              self.fetchFromZuul(repo, project, override_zuul_ref)) or
+              (fallback_zuul_ref and
+               fallback_zuul_ref != override_zuul_ref and
+              self.fetchFromZuul(repo, project, fallback_zuul_ref))):
             # Work around a bug in GitPython which can not parse FETCH_HEAD
             gitcmd = git.Git(dest)
             fetch_head = gitcmd.rev_parse('FETCH_HEAD')
diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py
new file mode 100644
index 0000000..1b7fed9
--- /dev/null
+++ b/zuul/lib/commandsocket.py
@@ -0,0 +1,83 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2016 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import socket
+import threading
+import Queue
+
+
+class CommandSocket(object):
+    log = logging.getLogger("zuul.CommandSocket")
+
+    def __init__(self, path):
+        self.running = False
+        self.path = path
+        self.queue = Queue.Queue()
+
+    def start(self):
+        self.running = True
+        if os.path.exists(self.path):
+            os.unlink(self.path)
+        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.socket.bind(self.path)
+        self.socket.listen(1)
+        self.socket_thread = threading.Thread(target=self._socketListener)
+        self.socket_thread.daemon = True
+        self.socket_thread.start()
+
+    def stop(self):
+        # First, wake up our listener thread with a connection and
+        # tell it to stop running.
+        self.running = False
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(self.path)
+        s.sendall('_stop\n')
+        # The command '_stop' will be ignored by our listener, so
+        # directly inject it into the queue so that consumers of this
+        # class which are waiting in .get() are awakened.  They can
+        # either handle '_stop' or just ignore the unknown command and
+        # then check to see if they should continue to run before
+        # re-entering their loop.
+        self.queue.put('_stop')
+        self.socket_thread.join()
+
+    def _socketListener(self):
+        while self.running:
+            try:
+                s, addr = self.socket.accept()
+                self.log.debug("Accepted socket connection %s" % (s,))
+                buf = ''
+                while True:
+                    buf += s.recv(1)
+                    if buf[-1] == '\n':
+                        break
+                buf = buf.strip()
+                self.log.debug("Received %s from socket" % (buf,))
+                s.close()
+                # Because we use '_stop' internally to wake up a
+                # waiting thread, don't allow it to actually be
+                # injected externally.
+                if buf != '_stop':
+                    self.queue.put(buf)
+            except Exception:
+                self.log.exception("Exception in socket handler")
+
+    def get(self):
+        if not self.running:
+            raise Exception("CommandSocket.get called while stopped")
+        return self.queue.get()
diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py
new file mode 100644
index 0000000..9cddca3
--- /dev/null
+++ b/zuul/lib/gearserver.py
@@ -0,0 +1,35 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+
+MASS_DO = 101
+
+
+class GearServer(gear.Server):
+    def handlePacket(self, packet):
+        if packet.ptype == MASS_DO:
+            self.log.info("Received packet from %s: %s" % (packet.connection,
+                                                           packet))
+            self.handleMassDo(packet)
+        else:
+            return super(GearServer, self).handlePacket(packet)
+
+    def handleMassDo(self, packet):
+        packet.connection.functions = set()
+        for name in packet.data.split(b'\x00'):
+            self.log.debug("Adding function %s to %s" % (
+                name, packet.connection))
+            packet.connection.functions.add(name)
+            self.functions.add(name)
diff --git a/zuul/lib/swift.py b/zuul/lib/swift.py
index 3c411d3..b5d3bc7 100644
--- a/zuul/lib/swift.py
+++ b/zuul/lib/swift.py
@@ -19,8 +19,8 @@
 import os
 import random
 import six
+from six.moves import urllib
 import string
-import urlparse
 
 
 class Swift(object):
@@ -156,7 +156,7 @@
         url = os.path.join(self.storage_url, settings['container'],
                            settings['file_path_prefix'],
                            destination_prefix)
-        u = urlparse.urlparse(url)
+        u = urllib.parse.urlparse(url)
 
         hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
                                             settings['max_file_size'],
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..3bc29e6 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -210,7 +210,7 @@
         fd.write('#!/bin/bash\n')
         fd.write('ssh -i %s $@\n' % key)
         fd.close()
-        os.chmod(name, 0755)
+        os.chmod(name, 0o755)
 
     def addProject(self, project, url):
         repo = None
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..d56993c 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,7 +19,7 @@
 
 import gear
 
-import merger
+from zuul.merger import merger
 
 
 class MergeServer(object):
diff --git a/zuul/model.py b/zuul/model.py
index d2cf13b..46b0b98 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -13,7 +13,9 @@
 # under the License.
 
 import copy
+import os
 import re
+import struct
 import time
 from uuid import uuid4
 import extras
@@ -108,7 +110,11 @@
         return job_tree
 
     def getProjects(self):
-        return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+        # cmp is not in python3, applied idiom from
+        # http://python-future.org/compatible_idioms.html#cmp
+        return sorted(
+            self.job_trees.keys(),
+            key=lambda p: p.name)
 
     def addQueue(self, queue):
         self.queues.append(queue)
@@ -266,7 +272,7 @@
             items.extend(shared_queue.queue)
         return items
 
-    def formatStatusJSON(self):
+    def formatStatusJSON(self, url_pattern=None):
         j_pipeline = dict(name=self.name,
                           description=self.description)
         j_queues = []
@@ -283,7 +289,7 @@
                     if j_changes:
                         j_queue['heads'].append(j_changes)
                     j_changes = []
-                j_changes.append(e.formatJSON())
+                j_changes.append(e.formatJSON(url_pattern))
                 if (len(j_changes) > 1 and
                         (j_changes[-2]['remaining_time'] is not None) and
                         (j_changes[-1]['remaining_time'] is not None)):
@@ -415,7 +421,7 @@
             elif self.window_decrease_type == 'exponential':
                 self.window = max(
                     self.window_floor,
-                    self.window / self.window_decrease_factor)
+                    int(self.window / self.window_decrease_factor))
 
 
 class Project(object):
@@ -724,7 +730,34 @@
     def setReportedResult(self, result):
         self.current_build_set.result = result
 
-    def formatJSON(self):
+    def formatJobResult(self, job, url_pattern=None):
+        build = self.current_build_set.getBuild(job.name)
+        result = build.result
+        pattern = url_pattern
+        if result == 'SUCCESS':
+            if job.success_message:
+                result = job.success_message
+            if job.success_pattern:
+                pattern = job.success_pattern
+        elif result == 'FAILURE':
+            if job.failure_message:
+                result = job.failure_message
+            if job.failure_pattern:
+                pattern = job.failure_pattern
+        url = None
+        if pattern:
+            try:
+                url = pattern.format(change=self.change,
+                                     pipeline=self.pipeline,
+                                     job=job,
+                                     build=build)
+            except Exception:
+                pass  # FIXME: log this or something?
+        if not url:
+            url = build.url or job.name
+        return (result, url)
+
+    def formatJSON(self, url_pattern=None):
         changeish = self.change
         ret = {}
         ret['active'] = self.active
@@ -761,11 +794,13 @@
             elapsed = None
             remaining = None
             result = None
-            url = None
+            build_url = None
+            report_url = None
             worker = None
             if build:
                 result = build.result
-                url = build.url
+                build_url = build.url
+                (unused, report_url) = self.formatJobResult(job, url_pattern)
                 if build.start_time:
                     if build.end_time:
                         elapsed = int((build.end_time -
@@ -793,7 +828,8 @@
                 'name': job.name,
                 'elapsed_time': elapsed,
                 'remaining_time': remaining,
-                'url': url,
+                'url': build_url,
+                'report_url': report_url,
                 'result': result,
                 'voting': job.voting,
                 'uuid': build.uuid if build else None,
@@ -1043,7 +1079,7 @@
         for a in approvals:
             for k, v in a.items():
                 if k == 'username':
-                    pass
+                    a['username'] = re.compile(v)
                 elif k in ['email', 'email-filter']:
                     a['email'] = re.compile(v)
                 elif k == 'newer-than':
@@ -1062,7 +1098,7 @@
         by = approval.get('by', {})
         for k, v in rapproval.items():
             if k == 'username':
-                if (by.get('username', '') != v):
+                if (not v.search(by.get('username', ''))):
                         return False
             elif k == 'email':
                 if (not v.search(by.get('email', ''))):
@@ -1350,3 +1386,78 @@
                     job.copy(metajob)
             self.jobs[name] = job
         return job
+
+
+class JobTimeData(object):
+    format = 'B10H10H10B'
+    version = 0
+
+    def __init__(self, path):
+        self.path = path
+        self.success_times = [0 for x in range(10)]
+        self.failure_times = [0 for x in range(10)]
+        self.results = [0 for x in range(10)]
+
+    def load(self):
+        if not os.path.exists(self.path):
+            return
+        with open(self.path) as f:
+            data = struct.unpack(self.format, f.read())
+        version = data[0]
+        if version != self.version:
+            raise Exception("Unkown data version")
+        self.success_times = list(data[1:11])
+        self.failure_times = list(data[11:21])
+        self.results = list(data[21:32])
+
+    def save(self):
+        tmpfile = self.path + '.tmp'
+        data = [self.version]
+        data.extend(self.success_times)
+        data.extend(self.failure_times)
+        data.extend(self.results)
+        data = struct.pack(self.format, *data)
+        with open(tmpfile, 'w') as f:
+            f.write(data)
+        os.rename(tmpfile, self.path)
+
+    def add(self, elapsed, result):
+        elapsed = int(elapsed)
+        if result == 'SUCCESS':
+            self.success_times.append(elapsed)
+            self.success_times.pop(0)
+            result = 0
+        else:
+            self.failure_times.append(elapsed)
+            self.failure_times.pop(0)
+            result = 1
+        self.results.append(result)
+        self.results.pop(0)
+
+    def getEstimatedTime(self):
+        times = [x for x in self.success_times if x]
+        if times:
+            return float(sum(times)) / len(times)
+        return 0.0
+
+
+class TimeDataBase(object):
+    def __init__(self, root):
+        self.root = root
+        self.jobs = {}
+
+    def _getTD(self, name):
+        td = self.jobs.get(name)
+        if not td:
+            td = JobTimeData(os.path.join(self.root, name))
+            self.jobs[name] = td
+            td.load()
+        return td
+
+    def getEstimatedTime(self, name):
+        return self._getTD(name).getEstimatedTime()
+
+    def update(self, name, elapsed, result):
+        td = self._getTD(name)
+        td.add(elapsed, result)
+        td.save()
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index fd79174..0569fbe 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -113,25 +113,7 @@
 
         for job in pipeline.getJobs(item):
             build = item.current_build_set.getBuild(job.name)
-            result = build.result
-            pattern = url_pattern
-            if result == 'SUCCESS':
-                if job.success_message:
-                    result = job.success_message
-                if job.success_pattern:
-                    pattern = job.success_pattern
-            elif result == 'FAILURE':
-                if job.failure_message:
-                    result = job.failure_message
-                if job.failure_pattern:
-                    pattern = job.failure_pattern
-            if pattern:
-                url = pattern.format(change=item.change,
-                                     pipeline=pipeline,
-                                     job=job,
-                                     build=build)
-            else:
-                url = build.url or job.name
+            (result, url) = item.formatJobResult(job, url_pattern)
             if not job.voting:
                 voting = ' (non-voting)'
             else:
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d54da9f..716dcfb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -21,7 +21,7 @@
 import gear
 import six
 
-import model
+from zuul import model
 
 
 class RPCListener(object):
@@ -40,11 +40,11 @@
             port = 4730
         self.worker = gear.Worker('Zuul RPC Listener')
         self.worker.addServer(server, port)
+        self.worker.waitForServer()
+        self.register()
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
-        self.worker.waitForServer()
-        self.register()
 
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
@@ -66,8 +66,8 @@
         while self._running:
             try:
                 job = self.worker.getJob()
-                z, jobname = job.name.split(':')
                 self.log.debug("Received job %s" % job.name)
+                z, jobname = job.name.split(':')
                 attrname = 'handle_' + jobname
                 if hasattr(self, attrname):
                     f = getattr(self, attrname)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 93016ac..b974762 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,6 +20,7 @@
 import logging
 import os
 import pickle
+import six
 from six.moves import queue as Queue
 import re
 import sys
@@ -27,10 +28,10 @@
 import time
 import yaml
 
-import layoutvalidator
-import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
+from zuul import layoutvalidator
+from zuul import model
+from zuul.model import Pipeline, Project, ChangeQueue
+from zuul.model import ChangeishFilter, NullChange
 from zuul import change_matcher, exceptions
 from zuul import version as zuul_version
 
@@ -125,12 +126,10 @@
     """An event that should be processed within the main queue run loop"""
     def __init__(self):
         self._wait_event = threading.Event()
-        self._exception = None
-        self._traceback = None
+        self._exc_info = None
 
-    def exception(self, e, tb):
-        self._exception = e
-        self._traceback = tb
+    def exception(self, exc_info):
+        self._exc_info = exc_info
         self._wait_event.set()
 
     def done(self):
@@ -138,8 +137,8 @@
 
     def wait(self, timeout=None):
         self._wait_event.wait(timeout)
-        if self._exception:
-            raise self._exception, None, self._traceback
+        if self._exc_info:
+            six.reraise(*self._exc_info)
         return self._wait_event.is_set()
 
 
@@ -236,7 +235,7 @@
 class Scheduler(threading.Thread):
     log = logging.getLogger("zuul.Scheduler")
 
-    def __init__(self, config):
+    def __init__(self, config, testonly=False):
         threading.Thread.__init__(self)
         self.daemon = True
         self.wake_event = threading.Event()
@@ -262,6 +261,10 @@
         self.management_event_queue = Queue.Queue()
         self.layout = model.Layout()
 
+        if not testonly:
+            time_dir = self._get_time_database_dir()
+            self.time_database = model.TimeDataBase(time_dir)
+
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
 
@@ -411,7 +414,9 @@
                     base = os.path.dirname(os.path.realpath(config_path))
                     fn = os.path.join(base, fn)
                 fn = os.path.expanduser(fn)
-                execfile(fn, config_env)
+                with open(fn) as _f:
+                    code = compile(_f.read(), fn, 'exec')
+                    six.exec_(code, config_env)
 
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])
@@ -743,6 +748,17 @@
             state_dir = '/var/lib/zuul'
         return os.path.join(state_dir, 'queue.pickle')
 
+    def _get_time_database_dir(self):
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(self.config.get('zuul',
+                                                           'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        d = os.path.join(state_dir, 'times')
+        if not os.path.exists(d):
+            os.mkdir(d)
+        return d
+
     def _save_queue(self):
         pickle_file = self._get_queue_pickle_file()
         events = []
@@ -1041,8 +1057,8 @@
             else:
                 self.log.error("Unable to handle event %s" % event)
             event.done()
-        except Exception as e:
-            event.exception(e, sys.exc_info()[2])
+        except Exception:
+            event.exception(sys.exc_info())
         self.management_event_queue.task_done()
 
     def process_result_queue(self):
@@ -1072,6 +1088,11 @@
             self.log.warning("Build %s is not associated with a pipeline" %
                              (build,))
             return
+        try:
+            build.estimated_time = float(self.time_database.getEstimatedTime(
+                build.job.name))
+        except Exception:
+            self.log.exception("Exception estimating build time:")
         pipeline.manager.onBuildStarted(event.build)
 
     def _doBuildCompletedEvent(self, event):
@@ -1085,6 +1106,13 @@
             self.log.warning("Build %s is not associated with a pipeline" %
                              (build,))
             return
+        if build.end_time and build.start_time and build.result:
+            duration = build.end_time - build.start_time
+            try:
+                self.time_database.update(
+                    build.job.name, duration, build.result)
+            except Exception:
+                self.log.exception("Exception recording build time:")
         pipeline.manager.onBuildCompleted(event.build)
 
     def _doMergeCompletedEvent(self, event):
@@ -1100,6 +1128,11 @@
         pipeline.manager.onMergeCompleted(event)
 
     def formatStatusJSON(self):
+        if self.config.has_option('zuul', 'url_pattern'):
+            url_pattern = self.config.get('zuul', 'url_pattern')
+        else:
+            url_pattern = None
+
         data = {}
 
         data['zuul_version'] = self.zuul_version
@@ -1125,7 +1158,7 @@
         pipelines = []
         data['pipelines'] = pipelines
         for pipeline in self.layout.pipelines.values():
-            pipelines.append(pipeline.formatStatusJSON())
+            pipelines.append(pipeline.formatStatusJSON(url_pattern))
         return json.dumps(data)
 
 
@@ -1453,7 +1486,6 @@
             return True
         if build_set.merge_state == build_set.PENDING:
             return False
-        build_set.merge_state = build_set.PENDING
         ref = build_set.ref
         if hasattr(item.change, 'refspec') and not ref:
             self.log.debug("Preparing ref for: %s" % item.change)
@@ -1471,6 +1503,8 @@
             self.sched.merger.updateRepo(item.change.project.name,
                                          url, build_set,
                                          self.pipeline.precedence)
+        # merge:merge has been emitted properly:
+        build_set.merge_state = build_set.PENDING
         return False
 
     def _launchJobs(self, item, jobs):
diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py
index eb8705d..73cf726 100644
--- a/zuul/source/gerrit.py
+++ b/zuul/source/gerrit.py
@@ -20,6 +20,20 @@
 from zuul.source import BaseSource
 
 
+# Walk the change dependency tree to find a cycle
+def detect_cycle(change, history=None):
+    if history is None:
+        history = []
+    else:
+        history = history[:]
+    history.append(change.number)
+    for dep in change.needs_changes:
+        if dep.number in history:
+            raise Exception("Dependency cycle detected: %s in %s" % (
+                dep.number, history))
+        detect_cycle(dep, history)
+
+
 class GerritSource(BaseSource):
     name = 'gerrit'
     log = logging.getLogger("zuul.source.Gerrit")
@@ -60,6 +74,10 @@
         data = self.connection.query(change.number)
         change._data = data
         change.is_merged = self._isMerged(change)
+        if change.is_merged:
+            self.log.debug("Change %s is merged" % (change,))
+        else:
+            self.log.debug("Change %s is not merged" % (change,))
         if not head:
             return change.is_merged
         if not change.is_merged:
@@ -82,7 +100,6 @@
         status = data.get('status')
         if not status:
             return False
-        self.log.debug("Change %s status: %s" % (change, status))
         if status == 'MERGED':
             return True
         return False
@@ -177,7 +194,7 @@
                                    (record.get('number'),))
         return changes
 
-    def _getDependsOnFromCommit(self, message):
+    def _getDependsOnFromCommit(self, message, change):
         records = []
         seen = set()
         for match in self.depends_on_re.findall(message):
@@ -187,17 +204,19 @@
                 continue
             seen.add(match)
             query = "change:%s" % (match,)
-            self.log.debug("Running query %s to find needed changes" %
-                           (query,))
+            self.log.debug("Updating %s: Running query %s "
+                           "to find needed changes" %
+                           (change, query,))
             records.extend(self.connection.simpleQuery(query))
         return records
 
-    def _getNeededByFromCommit(self, change_id):
+    def _getNeededByFromCommit(self, change_id, change):
         records = []
         seen = set()
         query = 'message:%s' % change_id
-        self.log.debug("Running query %s to find changes needed-by" %
-                       (query,))
+        self.log.debug("Updating %s: Running query %s "
+                       "to find changes needed-by" %
+                       (change, query,))
         results = self.connection.simpleQuery(query)
         for result in results:
             for match in self.depends_on_re.findall(
@@ -207,15 +226,15 @@
                 key = (result['number'], result['currentPatchSet']['number'])
                 if key in seen:
                     continue
-                self.log.debug("Found change %s,%s needs %s from commit" %
-                               (key[0], key[1], change_id))
+                self.log.debug("Updating %s: Found change %s,%s "
+                               "needs %s from commit" %
+                               (change, key[0], key[1], change_id))
                 seen.add(key)
                 records.append(result)
         return records
 
     def _updateChange(self, change, history=None):
-        self.log.info("Updating information for %s,%s" %
-                      (change.number, change.patchset))
+        self.log.info("Updating %s" % (change,))
         data = self.connection.query(change.number)
         change._data = data
 
@@ -255,6 +274,7 @@
         if change.is_merged:
             # This change is merged, so we don't need to look any further
             # for dependencies.
+            self.log.debug("Updating %s: change is merged" % (change,))
             return change
 
         if history is None:
@@ -270,21 +290,35 @@
             if dep_num in history:
                 raise Exception("Dependency cycle detected: %s in %s" % (
                     dep_num, history))
-            self.log.debug("Getting git-dependent change %s,%s" %
-                           (dep_num, dep_ps))
+            self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
+                           (change, dep_num, dep_ps))
             dep = self._getChange(dep_num, dep_ps, history=history)
+            # Because we are not forcing a refresh in _getChange, it
+            # may return without executing this code, so if we are
+            # updating our change to add ourselves to a dependency
+            # cycle, we won't detect it.  By explicitly performing a
+            # walk of the dependency tree, we will.
+            detect_cycle(dep, history)
             if (not dep.is_merged) and dep not in needs_changes:
                 needs_changes.append(dep)
 
-        for record in self._getDependsOnFromCommit(data['commitMessage']):
+        for record in self._getDependsOnFromCommit(data['commitMessage'],
+                                                   change):
             dep_num = record['number']
             dep_ps = record['currentPatchSet']['number']
             if dep_num in history:
                 raise Exception("Dependency cycle detected: %s in %s" % (
                     dep_num, history))
-            self.log.debug("Getting commit-dependent change %s,%s" %
-                           (dep_num, dep_ps))
+            self.log.debug("Updating %s: Getting commit-dependent "
+                           "change %s,%s" %
+                           (change, dep_num, dep_ps))
             dep = self._getChange(dep_num, dep_ps, history=history)
+            # Because we are not forcing a refresh in _getChange, it
+            # may return without executing this code, so if we are
+            # updating our change to add ourselves to a dependency
+            # cycle, we won't detect it.  By explicitly performing a
+            # walk of the dependency tree, we will.
+            detect_cycle(dep, history)
             if (not dep.is_merged) and dep not in needs_changes:
                 needs_changes.append(dep)
         change.needs_changes = needs_changes
@@ -294,15 +328,17 @@
             for needed in data['neededBy']:
                 parts = needed['ref'].split('/')
                 dep_num, dep_ps = parts[3], parts[4]
+                self.log.debug("Updating %s: Getting git-needed change %s,%s" %
+                               (change, dep_num, dep_ps))
                 dep = self._getChange(dep_num, dep_ps)
                 if (not dep.is_merged) and dep.is_current_patchset:
                     needed_by_changes.append(dep)
 
-        for record in self._getNeededByFromCommit(data['id']):
+        for record in self._getNeededByFromCommit(data['id'], change):
             dep_num = record['number']
             dep_ps = record['currentPatchSet']['number']
-            self.log.debug("Getting commit-needed change %s,%s" %
-                           (dep_num, dep_ps))
+            self.log.debug("Updating %s: Getting commit-needed change %s,%s" %
+                           (change, dep_num, dep_ps))
             # Because a commit needed-by may be a cross-repo
             # dependency, cause that change to refresh so that it will
             # reference the latest patchset of its Depends-On (this
diff --git a/zuul/trigger/timer.py b/zuul/trigger/timer.py
index d42e3db..f81312e 100644
--- a/zuul/trigger/timer.py
+++ b/zuul/trigger/timer.py
@@ -40,8 +40,8 @@
             self.log.debug("Adding event %s" % event)
             self.sched.addEvent(event)
 
-    def _shutdown(self):
-        self.apsched.stop()
+    def stop(self):
+        self.apsched.shutdown()
 
     def getEventFilters(self, trigger_conf):
         def toList(item):