Merge "Support post jobs by supporting rev checkout"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
 [DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
 test_id_option=--load-list $IDFILE
 test_list_option=--list
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c61cea8..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
    https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
 
 .. _`Turbo-Hipster`:
-   http://git.openstack.org/cgit/stackforge/turbo-hipster/
+   https://git.openstack.org/cgit/openstack/turbo-hipster/
 
 .. _`Turbo-Hipster Documentation`:
    http://turbo-hipster.rtfd.org/
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 98e4bb8..be9570c 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -49,6 +49,11 @@
   Port on which the Gearman server is listening.
   ``port=4730``
 
+**check_job_registration**
+  Check to see if job is registered with Gearman or not. When True
+  a build result of NOT_REGISTERED will be return if job is not found.
+  ``check_job_registration=True``
+
 gearman_server
 """"""""""""""
 
@@ -394,11 +399,12 @@
   approval matching all specified requirements.
 
     *username*
-    If present, an approval from this username is required.
+    If present, an approval from this username is required.  It is
+    treated as a regular expression.
 
     *email*
     If present, an approval with this email address is required.  It
-    is treated as a regular expression as above.
+    is treated as a regular expression.
 
     *email-filter* (deprecated)
     A deprecated alternate spelling of *email*.  Only one of *email* or
diff --git a/other-requirements.txt b/other-requirements.txt
new file mode 100644
index 0000000..1ade655
--- /dev/null
+++ b/other-requirements.txt
@@ -0,0 +1,4 @@
+mysql-client [test]
+mysql-server [test]
+postgresql [test]
+postgresql-client [test]
diff --git a/tests/base.py b/tests/base.py
index 7945a0b..2d7f918 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -22,22 +22,22 @@
 import os
 import pprint
 from six.moves import queue as Queue
+from six.moves import urllib
 import random
 import re
 import select
 import shutil
+from six.moves import reload_module
 import socket
 import string
 import subprocess
 import swiftclient
 import threading
 import time
-import urllib2
 
 import git
 import gear
 import fixtures
-import six.moves.urllib.parse as urlparse
 import statsd
 import testtools
 from git import GitCommandError
@@ -479,7 +479,7 @@
         self.url = url
 
     def read(self):
-        res = urlparse.urlparse(self.url)
+        res = urllib.parse.urlparse(self.url)
         path = res.path
         project = '/'.join(path.split('/')[2:-2])
         ret = '001e# service=git-upload-pack\n'
@@ -862,6 +862,28 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
+            # NOTE(notmorgan): Extract logging overrides for specific libraries
+            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+            # each. This is used to limit the output during test runs from
+            # libraries that zuul depends on such as gear.
+            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+            if log_defaults_from_env:
+                for default in log_defaults_from_env.split(','):
+                    try:
+                        name, level_str = default.split('=', 1)
+                        level = getattr(logging, level_str, logging.DEBUG)
+                        self.useFixture(fixtures.FakeLogger(
+                            name=name,
+                            level=level,
+                            format='%(asctime)s %(name)-32s '
+                                   '%(levelname)-8s %(message)s'))
+                    except ValueError:
+                        # NOTE(notmorgan): Invalid format of the log default,
+                        # skip and don't try and apply a logger for the
+                        # specified module
+                        pass
+
 
 class ZuulTestCase(BaseTestCase):
 
@@ -876,11 +898,13 @@
         self.test_root = os.path.join(tmp_root, "zuul-test")
         self.upstream_root = os.path.join(self.test_root, "upstream")
         self.git_root = os.path.join(self.test_root, "git")
+        self.state_root = os.path.join(self.test_root, "lib")
 
         if os.path.exists(self.test_root):
             shutil.rmtree(self.test_root)
         os.makedirs(self.test_root)
         os.makedirs(self.upstream_root)
+        os.makedirs(self.state_root)
 
         # Make per test copy of Configuration.
         self.setup_config()
@@ -888,6 +912,7 @@
                         os.path.join(FIXTURE_DIR,
                                      self.config.get('zuul', 'layout_config')))
         self.config.set('merger', 'git_dir', self.git_root)
+        self.config.set('zuul', 'state_dir', self.state_root)
 
         # For each project in config:
         self.init_repo("org/project")
@@ -914,8 +939,8 @@
         os.environ['STATSD_PORT'] = str(self.statsd.port)
         self.statsd.start()
         # the statsd client object is configured in the statsd module import
-        reload(statsd)
-        reload(zuul.scheduler)
+        reload_module(statsd)
+        reload_module(zuul.scheduler)
 
         self.gearman_server = FakeGearmanServer()
 
@@ -944,12 +969,12 @@
         self.sched.registerConnections(self.connections)
 
         def URLOpenerFactory(*args, **kw):
-            if isinstance(args[0], urllib2.Request):
+            if isinstance(args[0], urllib.request.Request):
                 return old_urlopen(*args, **kw)
             return FakeURLOpener(self.upstream_root, *args, **kw)
 
-        old_urlopen = urllib2.urlopen
-        urllib2.urlopen = URLOpenerFactory
+        old_urlopen = urllib.request.urlopen
+        urllib.request.urlopen = URLOpenerFactory
 
         self.merge_server = zuul.merger.server.MergeServer(self.config,
                                                            self.connections)
@@ -1304,9 +1329,11 @@
         start = time.time()
         while True:
             if time.time() - start > 10:
-                print 'queue status:',
-                print ' '.join(self.eventQueuesEmpty())
-                print self.areAllBuildsWaiting()
+                self.log.debug("Queue status:")
+                for queue in self.event_queues:
+                    self.log.debug("  %s: %s" % (queue, queue.empty()))
+                self.log.debug("All builds waiting: %s" %
+                               (self.areAllBuildsWaiting(),))
                 raise Exception("Timeout waiting for Zuul to settle")
             # Make sure no new events show up while we're checking
             self.worker.lock.acquire()
@@ -1344,8 +1371,8 @@
         for pipeline in self.sched.layout.pipelines.values():
             for queue in pipeline.queues:
                 if len(queue.queue) != 0:
-                    print 'pipeline %s queue %s contents %s' % (
-                        pipeline.name, queue.name, queue.queue)
+                    print('pipeline %s queue %s contents %s' % (
+                        pipeline.name, queue.name, queue.queue))
                 self.assertEqual(len(queue.queue), 0,
                                  "Pipelines queues should be empty")
 
diff --git a/tests/fixtures/layout-requirement-username.yaml b/tests/fixtures/layout-requirement-username.yaml
index 7a549f0..f9e6477 100644
--- a/tests/fixtures/layout-requirement-username.yaml
+++ b/tests/fixtures/layout-requirement-username.yaml
@@ -3,7 +3,7 @@
     manager: IndependentPipelineManager
     require:
       approval:
-        - username: jenkins
+        - username: ^(jenkins|zuul)$
     trigger:
       gerrit:
         - event: comment-added
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3dc3234..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import ConfigParser
+from six.moves import configparser as ConfigParser
 import os
 import re
 
@@ -33,13 +33,13 @@
 class TestLayoutValidator(testtools.TestCase):
     def test_layouts(self):
         """Test layout file validation"""
-        print
+        print()
         errors = []
         for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')):
             m = LAYOUT_RE.match(fn)
             if not m:
                 continue
-            print fn
+            print(fn)
 
             # Load any .conf file by the same name but .conf extension.
             config_file = ("%s.conf" %
@@ -69,7 +69,7 @@
                                     fn)
                 except voluptuous.Invalid as e:
                     error = str(e)
-                    print '  ', error
+                    print('  ', error)
                     if error in errors:
                         raise Exception("Error has already been tested: %s" %
                                         error)
diff --git a/tests/test_model.py b/tests/test_model.py
index 2711618..ac19383 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -12,6 +12,11 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import os
+import random
+
+import fixtures
+
 from zuul import change_matcher as cm
 from zuul import model
 
@@ -62,3 +67,76 @@
         metajob = model.Job('^job')
         job.copy(metajob)
         self._assert_job_booleans_are_not_none(job)
+
+
+class TestJobTimeData(BaseTestCase):
+    def setUp(self):
+        super(TestJobTimeData, self).setUp()
+        self.tmp_root = self.useFixture(fixtures.TempDir(
+            rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+        ).path
+
+    def test_empty_timedata(self):
+        path = os.path.join(self.tmp_root, 'job-name')
+        self.assertFalse(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+
+    def test_save_reload(self):
+        path = os.path.join(self.tmp_root, 'job-name')
+        self.assertFalse(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
+        success_times = []
+        failure_times = []
+        results = []
+        for x in range(10):
+            success_times.append(int(random.random() * 1000))
+            failure_times.append(int(random.random() * 1000))
+            results.append(0)
+            results.append(1)
+        random.shuffle(results)
+        s = f = 0
+        for result in results:
+            if result:
+                td.add(failure_times[f], 'FAILURE')
+                f += 1
+            else:
+                td.add(success_times[s], 'SUCCESS')
+                s += 1
+        self.assertEqual(td.success_times, success_times)
+        self.assertEqual(td.failure_times, failure_times)
+        self.assertEqual(td.results, results[10:])
+        td.save()
+        self.assertTrue(os.path.exists(path))
+        self.assertFalse(os.path.exists(path + '.tmp'))
+        td = model.JobTimeData(path)
+        td.load()
+        self.assertEqual(td.success_times, success_times)
+        self.assertEqual(td.failure_times, failure_times)
+        self.assertEqual(td.results, results[10:])
+
+
+class TestTimeDataBase(BaseTestCase):
+    def setUp(self):
+        super(TestTimeDataBase, self).setUp()
+        self.tmp_root = self.useFixture(fixtures.TempDir(
+            rootdir=os.environ.get("ZUUL_TEST_ROOT"))
+        ).path
+        self.db = model.TimeDataBase(self.tmp_root)
+
+    def test_timedatabase(self):
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
+        self.db.update('job-name', 50, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
+        self.db.update('job-name', 100, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
+        for x in range(10):
+            self.db.update('job-name', 100, 'SUCCESS')
+        self.assertEqual(self.db.getEstimatedTime('job-name'), 100)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index fe7c7cc..ea512a2 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -20,11 +20,10 @@
 import re
 import shutil
 import time
-import urllib
-import urllib2
 import yaml
 
 import git
+from six.moves import urllib
 import testtools
 
 import zuul.change_matcher
@@ -34,7 +33,6 @@
 import zuul.reporter.smtp
 
 from tests.base import (
-    BaseTestCase,
     ZuulTestCase,
     repack_repo,
 )
@@ -44,40 +42,6 @@
                     '%(levelname)-8s %(message)s')
 
 
-class TestSchedulerConfigParsing(BaseTestCase):
-
-    def test_parse_skip_if(self):
-        job_yaml = """
-jobs:
-  - name: job_name
-    skip-if:
-      - project: ^project_name$
-        branch: ^stable/icehouse$
-        all-files-match-any:
-          - ^filename$
-      - project: ^project2_name$
-        all-files-match-any:
-          - ^filename2$
-    """.strip()
-        data = yaml.load(job_yaml)
-        config_job = data.get('jobs')[0]
-        sched = zuul.scheduler.Scheduler({})
-        cm = zuul.change_matcher
-        expected = cm.MatchAny([
-            cm.MatchAll([
-                cm.ProjectMatcher('^project_name$'),
-                cm.BranchMatcher('^stable/icehouse$'),
-                cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
-            ]),
-            cm.MatchAll([
-                cm.ProjectMatcher('^project2_name$'),
-                cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
-            ]),
-        ])
-        matcher = sched._parseSkipIf(config_job)
-        self.assertEqual(expected, matcher)
-
-
 class TestScheduler(ZuulTestCase):
 
     def test_jobs_launched(self):
@@ -495,6 +459,46 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def _test_time_database(self, iteration):
+        self.worker.hold_jobs_in_build = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        A.addApproval('CRVW', 2)
+        self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+        self.waitUntilSettled()
+        time.sleep(2)
+
+        data = json.loads(self.sched.formatStatusJSON())
+        found_job = None
+        for pipeline in data['pipelines']:
+            if pipeline['name'] != 'gate':
+                continue
+            for queue in pipeline['change_queues']:
+                for head in queue['heads']:
+                    for item in head:
+                        for job in item['jobs']:
+                            if job['name'] == 'project-merge':
+                                found_job = job
+                                break
+
+        self.assertIsNotNone(found_job)
+        if iteration == 1:
+            self.assertIsNotNone(found_job['estimated_time'])
+            self.assertIsNone(found_job['remaining_time'])
+        else:
+            self.assertIsNotNone(found_job['estimated_time'])
+            self.assertTrue(found_job['estimated_time'] >= 2)
+            self.assertIsNotNone(found_job['remaining_time'])
+
+        self.worker.hold_jobs_in_build = False
+        self.worker.release()
+        self.waitUntilSettled()
+
+    def test_time_database(self):
+        "Test the time database"
+
+        self._test_time_database(1)
+        self._test_time_database(2)
+
     def test_two_failed_changes_at_head(self):
         "Test that changes are reparented correctly if 2 fail at head"
 
@@ -600,6 +604,36 @@
         self.assertEqual(B.reported, 2)
         self.assertEqual(C.reported, 2)
 
+    def test_parse_skip_if(self):
+        job_yaml = """
+jobs:
+  - name: job_name
+    skip-if:
+      - project: ^project_name$
+        branch: ^stable/icehouse$
+        all-files-match-any:
+          - ^filename$
+      - project: ^project2_name$
+        all-files-match-any:
+          - ^filename2$
+    """.strip()
+        data = yaml.load(job_yaml)
+        config_job = data.get('jobs')[0]
+        cm = zuul.change_matcher
+        expected = cm.MatchAny([
+            cm.MatchAll([
+                cm.ProjectMatcher('^project_name$'),
+                cm.BranchMatcher('^stable/icehouse$'),
+                cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
+            ]),
+            cm.MatchAll([
+                cm.ProjectMatcher('^project2_name$'),
+                cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
+            ]),
+        ])
+        matcher = self.sched._parseSkipIf(config_job)
+        self.assertEqual(expected, matcher)
+
     def test_patch_order(self):
         "Test that dependent patches are tested in the right order"
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -1449,7 +1483,7 @@
         self.worker.build_history = []
 
         path = os.path.join(self.git_root, "org/project")
-        print repack_repo(path)
+        print(repack_repo(path))
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
@@ -1474,9 +1508,9 @@
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         A.addPatchset(large=True)
         path = os.path.join(self.upstream_root, "org/project1")
-        print repack_repo(path)
+        print(repack_repo(path))
         path = os.path.join(self.git_root, "org/project1")
-        print repack_repo(path)
+        print(repack_repo(path))
 
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@@ -2240,8 +2274,8 @@
 
         port = self.webapp.server.socket.getsockname()[1]
 
-        req = urllib2.Request("http://localhost:%s/status.json" % port)
-        f = urllib2.urlopen(req)
+        req = urllib.request.Request("http://localhost:%s/status.json" % port)
+        f = urllib.request.urlopen(req)
         headers = f.info()
         self.assertIn('Content-Length', headers)
         self.assertIn('Content-Type', headers)
@@ -2846,7 +2880,8 @@
 
         port = self.webapp.server.socket.getsockname()[1]
 
-        f = urllib.urlopen("http://localhost:%s/status.json" % port)
+        req = urllib.request.Request("http://localhost:%s/status.json" % port)
+        f = urllib.request.urlopen(req)
         data = f.read()
 
         self.worker.hold_jobs_in_build = False
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index b127c51..94f097a 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -16,7 +16,8 @@
 # under the License.
 
 import json
-import urllib2
+
+from six.moves import urllib
 
 from tests.base import ZuulTestCase
 
@@ -44,41 +45,41 @@
     def test_webapp_status(self):
         "Test that we can filter to only certain changes in the webapp."
 
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertIn('pipelines', data)
 
     def test_webapp_status_compat(self):
         # testing compat with status.json
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status.json" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertIn('pipelines', data)
 
     def test_webapp_bad_url(self):
         # do we 404 correctly
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/foo" % self.port)
-        self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req)
+        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
 
     def test_webapp_find_change(self):
         # can we filter by change id
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/change/1,1" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertEqual(1, len(data), data)
         self.assertEqual("org/project", data[0]['project'])
 
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/change/2,1" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertEqual(1, len(data), data)
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 9dbf504..8b854c7 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -35,7 +35,7 @@
                 if not change['live']:
                     continue
                 cid, cps = change['id'].split(',')
-                print (
+                print(
                     "zuul enqueue --trigger gerrit --pipeline %s "
                     "--project %s --change %s,%s" % (
                         options.pipeline_name,
diff --git a/tox.ini b/tox.ini
index 79ea939..06ccbcd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
          STATSD_PORT=8125
          VIRTUAL_ENV={envdir}
          OS_TEST_TIMEOUT=30
+         OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
 passenv = ZUUL_TEST_ROOT
 usedevelop = True
 install_command = pip install {opts} {packages}
@@ -17,7 +18,17 @@
 commands =
   python setup.py testr --slowest --testr-args='{posargs}'
 
+[testenv:bindep]
+# Do not install any requirements. We want this to be fast and work even if
+# system dependencies are missing, since it's used to tell you what system
+# dependencies are missing! This also means that bindep must be installed
+# separately, outside of the requirements files.
+deps = bindep
+commands = bindep test
+
 [testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
 commands = flake8 {posargs}
 
 [testenv:cover]
diff --git a/zuul/ansible/library/zuul_console.py b/zuul/ansible/library/zuul_console.py
index bb6ec7b..78f3249 100644
--- a/zuul/ansible/library/zuul_console.py
+++ b/zuul/ansible/library/zuul_console.py
@@ -109,12 +109,20 @@
 
     def followConsole(self, console, conn):
         while True:
-            r = [console.file, conn]
-            e = [console.file, conn]
-            r, w, e = select.select(r, [], e)
+            # As long as we have unread data, keep reading/sending
+            while True:
+                chunk = console.file.read(4096)
+                if chunk:
+                    conn.send(chunk)
+                else:
+                    break
 
-            if console.file in e:
-                return True
+            # At this point, we are waiting for more data to be written
+            time.sleep(0.5)
+
+            # Check to see if the remote end has sent any data, if so,
+            # discard
+            r, w, e = select.select([conn], [], [conn], 0)
             if conn in e:
                 return False
             if conn in r:
@@ -124,19 +132,15 @@
                 if not ret:
                     return False
 
-            if console.file in r:
-                line = console.file.readline()
-                if line:
-                    conn.send(line)
-                time.sleep(0.5)
-                try:
-                    st = os.stat(console.path)
-                    if (st.st_ino != console.stat.st_ino or
-                        st.st_size < console.size):
-                        return True
-                except Exception:
+            # See if the file has been truncated
+            try:
+                st = os.stat(console.path)
+                if (st.st_ino != console.stat.st_ino or
+                    st.st_size < console.size):
                     return True
-                console.size = st.st_size
+            except Exception:
+                return True
+            console.size = st.st_size
 
     def handleOneConnection(self, conn):
         # FIXME: this won't notice disconnects until it tries to send
@@ -166,14 +170,14 @@
 
 
 def test():
-    s = Server('/tmp/console.log', 8088)
+    s = Server('/tmp/console.html', 8088)
     s.run()
 
 
 def main():
     module = AnsibleModule(
         argument_spec=dict(
-            path=dict(default='/tmp/console.log'),
+            path=dict(default='/tmp/console.html'),
             port=dict(default=8088, type='int'),
         )
     )
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
new file mode 100644
index 0000000..4b377d9
--- /dev/null
+++ b/zuul/ansible/library/zuul_log.py
@@ -0,0 +1,58 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+# Copyright (c) 2016 Red Hat
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software.  If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+
+
+class Console(object):
+    def __enter__(self):
+        self.logfile = open('/tmp/console.html', 'a', 0)
+        return self
+
+    def __exit__(self, etype, value, tb):
+        self.logfile.close()
+
+    def addLine(self, ln):
+        ts = datetime.datetime.now()
+        outln = '%s | %s' % (str(ts), ln)
+        self.logfile.write(outln)
+
+
+def log(msg):
+    if not isinstance(msg, list):
+        msg = [msg]
+    with Console() as console:
+        for line in msg:
+            console.addLine("[Zuul] %s\n" % line)
+
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            msg=dict(required=True, type='raw'),
+        )
+    )
+
+    p = module.params
+    log(p['msg'])
+    module.exit_json(changed=True)
+
+from ansible.module_utils.basic import *  # noqa
+
+if __name__ == '__main__':
+    main()
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index 6fc8f2d..7689fb3 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -16,40 +16,95 @@
 # along with this software.  If not, see <http://www.gnu.org/licenses/>.
 
 import datetime
+import getpass
+import os
 import subprocess
+import threading
 
 
 class Console(object):
     def __enter__(self):
-        self.logfile = open('/tmp/console.log', 'w+')
+        self.logfile = open('/tmp/console.html', 'a', 0)
         return self
 
     def __exit__(self, etype, value, tb):
         self.logfile.close()
 
     def addLine(self, ln):
+        # Note this format with deliminator is "inspired" by the old
+        # Jenkins format but with microsecond resolution instead of
+        # millisecond.  It is kept so log parsing/formatting remains
+        # consistent.
         ts = datetime.datetime.now()
-        outln = '%s %s' % (str(ts), ln)
+        outln = '%s | %s' % (ts, ln)
         self.logfile.write(outln)
 
 
+def get_env():
+    env = {}
+    env['HOME'] = os.path.expanduser('~')
+    env['USER'] = getpass.getuser()
+
+    # Known locations for PAM mod_env sources
+    for fn in ['/etc/environment', '/etc/default/locale']:
+        if os.path.exists(fn):
+            with open(fn) as f:
+                for line in f:
+                    if not line:
+                        continue
+                    if line[0] == '#':
+                        continue
+                    if '=' not in line:
+                        continue
+                    k, v = line.strip().split('=')
+                    for q in ["'", '"']:
+                        if v[0] == q:
+                            v = v.strip(q)
+                    env[k] = v
+    return env
+
+
+def follow(fd):
+    newline_warning = False
+    with Console() as console:
+        while True:
+            line = fd.readline()
+            if not line:
+                break
+            if not line.endswith('\n'):
+                line += '\n'
+                newline_warning = True
+            console.addLine(line)
+        if newline_warning:
+            console.addLine('[Zuul] No trailing newline\n')
+
+
 def run(cwd, cmd, args):
+    env = get_env()
+    env.update(args)
     proc = subprocess.Popen(
-        [cmd],
+        ['/bin/bash', '-l', '-c', cmd],
         cwd=cwd,
         stdout=subprocess.PIPE,
         stderr=subprocess.STDOUT,
-        env=args,
+        env=env,
     )
 
-    with Console() as console:
-        while True:
-            line = proc.stdout.readline()
-            if not line:
-                break
-            console.addLine(line)
+    t = threading.Thread(target=follow, args=(proc.stdout,))
+    t.daemon = True
+    t.start()
 
     ret = proc.wait()
+    # Give the thread that is writing the console log up to 10 seconds
+    # to catch up and exit.  If it hasn't done so by then, it is very
+    # likely stuck in readline() because it spawed a child that is
+    # holding stdout or stderr open.
+    t.join(10)
+    with Console() as console:
+        if t.isAlive():
+            console.addLine("[Zuul] standard output/error still open "
+                            "after child exited")
+        console.addLine("[Zuul] Task exit code: %s\n" % ret)
     return ret
 
 
@@ -63,7 +118,8 @@
     )
 
     p = module.params
-    ret = run(p['cwd'], p['command'], p['parameters'])
+    env = p['parameters'].copy()
+    ret = run(p['cwd'], p['command'], env)
     if ret == 0:
         module.exit_json(changed=True, rc=ret)
     else:
diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py
index 245e988..1cfd10d 100644
--- a/zuul/ansible/plugins/callback_plugins/timeout.py
+++ b/zuul/ansible/plugins/callback_plugins/timeout.py
@@ -46,12 +46,7 @@
             self._elapsed_time += task_time
         if self._play and result._host:
             manager = self._play.get_variable_manager()
-            facts = dict(elapsed_time=self._elapsed_time)
-
-            overall_timeout = manager.extra_vars.get('timeout')
-            if overall_timeout is not None:
-                timeout = int(overall_timeout) - int(self._elapsed_time)
-                facts['timeout'] = timeout
+            facts = dict(elapsed_time=int(self._elapsed_time))
 
             manager.set_nonpersistent_facts(result._host, facts)
         self._task_start_time = None
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..5ffd431 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -14,8 +14,8 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import six
 from six.moves import configparser as ConfigParser
-import cStringIO
 import extras
 import logging
 import logging.config
@@ -47,7 +47,7 @@
             yappi.start()
         else:
             yappi.stop()
-            yappi_out = cStringIO.StringIO()
+            yappi_out = six.BytesIO()
             yappi.get_func_stats().print_all(out=yappi_out)
             yappi.get_thread_stats().print_all(out=yappi_out)
             log.debug(yappi_out.getvalue())
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 59ac419..1ce2828 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -154,7 +154,7 @@
         running_items = client.get_running_jobs()
 
         if len(running_items) == 0:
-            print "No jobs currently running"
+            print("No jobs currently running")
             return True
 
         all_fields = self._show_running_jobs_columns()
@@ -181,7 +181,7 @@
                         v += all_fields[f]['append']
                     values.append(v)
                 table.add_row(values)
-        print table
+        print(table)
         return True
 
     def _epoch_to_relative_time(self, epoch):
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
index 86266b3..49643ae 100644
--- a/zuul/cmd/launcher.py
+++ b/zuul/cmd/launcher.py
@@ -24,12 +24,14 @@
 
 import logging
 import os
+import socket
 import sys
 import signal
 
 import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
 
-# No zuul imports here because they pull in paramiko which must not be
+# No zuul imports that pull in paramiko here; it must not be
 # imported until after the daemonization.
 # https://github.com/paramiko/paramiko/issues/59
 # Similar situation with gear and statsd.
@@ -46,53 +48,64 @@
         parser.add_argument('--version', dest='version', action='version',
                             version=self._get_version(),
                             help='show zuul version')
+        parser.add_argument('--keep-jobdir', dest='keep_jobdir',
+                            action='store_true',
+                            help='keep local jobdirs after run completes')
+        parser.add_argument('command',
+                            choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
+                            nargs='?')
+
         self.args = parser.parse_args()
 
-    def reconfigure_handler(self, signum, frame):
-        signal.signal(signal.SIGHUP, signal.SIG_IGN)
-        self.log.debug("Reconfiguration triggered")
-        self.read_config()
-        self.setup_logging('launcher', 'log_config')
-        try:
-            self.launcher.reconfigure(self.config)
-        except Exception:
-            self.log.exception("Reconfiguration failed:")
-        signal.signal(signal.SIGHUP, self.reconfigure_handler)
+    def send_command(self, cmd):
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(path)
+        s.sendall('%s\n' % cmd)
 
-    def exit_handler(self, signum, frame):
-        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+    def exit_handler(self):
         self.launcher.stop()
         self.launcher.join()
 
-    def main(self):
+    def main(self, daemon=True):
         # See comment at top of file about zuul imports
-        import zuul.launcher.ansiblelaunchserver
 
         self.setup_logging('launcher', 'log_config')
 
         self.log = logging.getLogger("zuul.Launcher")
 
         LaunchServer = zuul.launcher.ansiblelaunchserver.LaunchServer
-        self.launcher = LaunchServer(self.config)
+        self.launcher = LaunchServer(self.config,
+                                     keep_jobdir=self.args.keep_jobdir)
         self.launcher.start()
 
-        signal.signal(signal.SIGHUP, self.reconfigure_handler)
-        signal.signal(signal.SIGUSR1, self.exit_handler)
         signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
-        while True:
-            try:
-                signal.pause()
-            except KeyboardInterrupt:
-                print "Ctrl + C: asking launcher to exit nicely...\n"
-                self.exit_handler(signal.SIGINT, None)
-                sys.exit(0)
+        if daemon:
+            self.launcher.join()
+        else:
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking launcher to exit nicely...\n")
+                    self.exit_handler()
+                    sys.exit(0)
 
 
 def main():
     server = Launcher()
     server.parse_arguments()
-
     server.read_config()
+
+    if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+        server.send_command(server.args.command)
+        sys.exit(0)
+
     server.configure_connections()
 
     if server.config.has_option('launcher', 'pidfile'):
@@ -102,10 +115,10 @@
     pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
 
     if server.args.nodaemon:
-        server.main()
+        server.main(False)
     else:
         with daemon.DaemonContext(pidfile=pid):
-            server.main()
+            server.main(True)
 
 
 if __name__ == "__main__":
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index df215fd..797a990 100644
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -68,7 +68,7 @@
             try:
                 signal.pause()
             except KeyboardInterrupt:
-                print "Ctrl + C: asking merger to exit nicely...\n"
+                print("Ctrl + C: asking merger to exit nicely...\n")
                 self.exit_handler(signal.SIGINT, None)
 
 
@@ -89,9 +89,7 @@
         f.close()
         os.unlink(test_fn)
     except Exception:
-        print
-        print "Unable to write to state directory: %s" % state_dir
-        print
+        print("\nUnable to write to state directory: %s\n" % state_dir)
         raise
 
     if server.config.has_option('merger', 'pidfile'):
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index b1cd050..1fb4a32 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -86,7 +86,8 @@
         import zuul.trigger.gerrit
 
         logging.basicConfig(level=logging.DEBUG)
-        self.sched = zuul.scheduler.Scheduler(self.config)
+        self.sched = zuul.scheduler.Scheduler(self.config,
+                                              testonly=True)
         self.configure_connections()
         layout = self.sched.testConfig(self.config.get('zuul',
                                                        'layout_config'),
@@ -106,7 +107,7 @@
                 jobs.add(v)
         for job in sorted(layout.jobs):
             if job not in jobs:
-                print "Job %s not defined" % job
+                print("Job %s not defined" % job)
                 failure = True
         return failure
 
@@ -116,18 +117,18 @@
         if child_pid == 0:
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
-            import gear
+            import zuul.lib.gearserver
             statsd_host = os.environ.get('STATSD_HOST')
             statsd_port = int(os.environ.get('STATSD_PORT', 8125))
             if self.config.has_option('gearman_server', 'listen_address'):
                 host = self.config.get('gearman_server', 'listen_address')
             else:
                 host = None
-            gear.Server(4730,
-                        host=host,
-                        statsd_host=statsd_host,
-                        statsd_port=statsd_port,
-                        statsd_prefix='zuul.geard')
+            zuul.lib.gearserver.GearServer(4730,
+                                           host=host,
+                                           statsd_host=statsd_host,
+                                           statsd_port=statsd_port,
+                                           statsd_prefix='zuul.geard')
 
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
@@ -195,7 +196,7 @@
             try:
                 signal.pause()
             except KeyboardInterrupt:
-                print "Ctrl + C: asking scheduler to exit nicely...\n"
+                print("Ctrl + C: asking scheduler to exit nicely...\n")
                 self.exit_handler(signal.SIGINT, None)
 
 
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index a1854f4..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -18,11 +18,11 @@
 import json
 import time
 from six.moves import queue as Queue
+from six.moves import urllib
 import paramiko
 import logging
 import pprint
 import voluptuous as v
-import urllib2
 
 from zuul.connection import BaseConnection
 from zuul.model import TriggerEvent
@@ -32,7 +32,7 @@
     """Move events from Gerrit to the scheduler."""
 
     log = logging.getLogger("zuul.GerritEventConnector")
-    delay = 5.0
+    delay = 10.0
 
     def __init__(self, connection):
         super(GerritEventConnector, self).__init__()
@@ -388,10 +388,10 @@
         url = "%s/p/%s/info/refs?service=git-upload-pack" % (
             self.baseurl, project)
         try:
-            data = urllib2.urlopen(url).read()
+            data = urllib.request.urlopen(url).read()
         except:
             self.log.error("Cannot get references from %s" % url)
-            raise  # keeps urllib2 error informations
+            raise  # keeps urllib error informations
         ret = {}
         read_headers = False
         read_advertisement = False
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 844b390..95fc2fa 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -14,7 +14,6 @@
 
 import json
 import logging
-import multiprocessing
 import os
 import re
 import shutil
@@ -23,76 +22,217 @@
 import subprocess
 import tempfile
 import threading
+import time
 import traceback
+import Queue
 import uuid
 
 import gear
 import yaml
 import jenkins_jobs.builder
+import jenkins_jobs.formatter
 import zmq
 
 import zuul.ansible.library
 import zuul.ansible.plugins.callback_plugins
+from zuul.lib import commandsocket
+
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful',
+            'verbose', 'unverbose']
+
+
+def boolify(x):
+    if isinstance(x, str):
+        return bool(int(x))
+    return bool(x)
+
+
+class LaunchGearWorker(gear.Worker):
+    def __init__(self, *args, **kw):
+        self.__launch_server = kw.pop('launch_server')
+        super(LaunchGearWorker, self).__init__(*args, **kw)
+
+    def handleNoop(self, packet):
+        workers = len(self.__launch_server.node_workers)
+        delay = (workers ** 2) / 1000.0
+        time.sleep(delay)
+        return super(LaunchGearWorker, self).handleNoop(packet)
+
+
+class NodeGearWorker(gear.Worker):
+    MASS_DO = 101
+
+    def sendMassDo(self, functions):
+        data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
+        self.broadcast_lock.acquire()
+        try:
+            p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.function(*self.args)
+
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
 
 
 class JobDir(object):
-    def __init__(self):
+    def __init__(self, keep=False):
+        self.keep = keep
         self.root = tempfile.mkdtemp()
-        self.git_root = os.path.join(self.root, 'git')
-        os.makedirs(self.git_root)
         self.ansible_root = os.path.join(self.root, 'ansible')
         os.makedirs(self.ansible_root)
-        self.plugins_root = os.path.join(self.ansible_root, 'plugins')
-        os.makedirs(self.plugins_root)
+        self.known_hosts = os.path.join(self.ansible_root, 'known_hosts')
         self.inventory = os.path.join(self.ansible_root, 'inventory')
         self.playbook = os.path.join(self.ansible_root, 'playbook')
         self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
         self.config = os.path.join(self.ansible_root, 'ansible.cfg')
         self.script_root = os.path.join(self.ansible_root, 'scripts')
+        self.ansible_log = os.path.join(self.ansible_root, 'ansible_log.txt')
         os.makedirs(self.script_root)
+        self.staging_root = os.path.join(self.root, 'staging')
+        os.makedirs(self.staging_root)
 
     def __enter__(self):
         return self
 
     def __exit__(self, etype, value, tb):
-        shutil.rmtree(self.root)
+        if not self.keep:
+            shutil.rmtree(self.root)
 
 
 class LaunchServer(object):
     log = logging.getLogger("zuul.LaunchServer")
-    section_re = re.compile('site "(.*?)"')
+    site_section_re = re.compile('site "(.*?)"')
+    node_section_re = re.compile('node "(.*?)"')
 
-    def __init__(self, config):
+    def __init__(self, config, keep_jobdir=False):
         self.config = config
+        self.options = dict(
+            verbose=False
+        )
+        self.keep_jobdir = keep_jobdir
         self.hostname = socket.gethostname()
+        self.registered_functions = set()
         self.node_workers = {}
-        self.mpmanager = multiprocessing.Manager()
-        self.jobs = self.mpmanager.dict()
-        self.builds = self.mpmanager.dict()
-        self.zmq_send_queue = multiprocessing.JoinableQueue()
-        self.termination_queue = multiprocessing.JoinableQueue()
+        self.jobs = {}
+        self.builds = {}
+        self.zmq_send_queue = Queue.Queue()
+        self.termination_queue = Queue.Queue()
         self.sites = {}
+        self.static_nodes = {}
+        self.command_map = dict(
+            reconfigure=self.reconfigure,
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            release=self.release,
+            graceful=self.graceful,
+            verbose=self.verboseOn,
+            unverbose=self.verboseOff,
+        )
+
+        if config.has_option('launcher', 'accept_nodes'):
+            self.accept_nodes = config.getboolean('launcher',
+                                                  'accept_nodes')
+        else:
+            self.accept_nodes = True
+        self.config_accept_nodes = self.accept_nodes
+
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
+        ansible_dir = os.path.join(state_dir, 'ansible')
+        plugins_dir = os.path.join(ansible_dir, 'plugins')
+        self.callback_dir = os.path.join(plugins_dir, 'callback_plugins')
+        if not os.path.exists(self.callback_dir):
+            os.makedirs(self.callback_dir)
+        self.library_dir = os.path.join(ansible_dir, 'library')
+        if not os.path.exists(self.library_dir):
+            os.makedirs(self.library_dir)
+
+        callback_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.plugins.callback_plugins.__file__))
+        for fn in os.listdir(callback_path):
+            shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
+
+        library_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.library.__file__))
+        for fn in os.listdir(library_path):
+            shutil.copy(os.path.join(library_path, fn), self.library_dir)
 
         for section in config.sections():
-            m = self.section_re.match(section)
+            m = self.site_section_re.match(section)
             if m:
                 sitename = m.group(1)
                 d = {}
                 d['host'] = config.get(section, 'host')
                 d['user'] = config.get(section, 'user')
-                d['pass'] = config.get(section, 'pass', '')
-                d['root'] = config.get(section, 'root', '/')
+                if config.has_option(section, 'pass'):
+                    d['pass'] = config.get(section, 'pass')
+                else:
+                    d['pass'] = ''
+                if config.has_option(section, 'root'):
+                    d['root'] = config.get(section, 'root')
+                else:
+                    d['root'] = '/'
                 self.sites[sitename] = d
+                continue
+            m = self.node_section_re.match(section)
+            if m:
+                nodename = m.group(1)
+                d = {}
+                d['name'] = nodename
+                d['host'] = config.get(section, 'host')
+                if config.has_option(section, 'description'):
+                    d['description'] = config.get(section, 'description')
+                else:
+                    d['description'] = ''
+                if config.has_option(section, 'labels'):
+                    d['labels'] = config.get(section, 'labels').split(',')
+                else:
+                    d['labels'] = []
+                self.static_nodes[nodename] = d
+                continue
 
     def start(self):
         self._gearman_running = True
         self._zmq_running = True
         self._reaper_running = True
+        self._command_running = True
 
         # Setup ZMQ
         self.zcontext = zmq.Context()
         self.zsocket = self.zcontext.socket(zmq.PUB)
-        self.zsocket.bind("tcp://*:8881")
+        self.zsocket.bind("tcp://*:8888")
 
         # Setup Gearman
         server = self.config.get('gearman', 'server')
@@ -100,13 +240,21 @@
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker('Zuul Launch Server')
+        self.worker = LaunchGearWorker('Zuul Launch Server',
+                                       launch_server=self)
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
         self.log.debug("Registering")
         self.register()
 
+        # Start command socket
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
         # Load JJB config
         self.loadJobs()
 
@@ -128,6 +276,11 @@
         self.gearman_thread.daemon = True
         self.gearman_thread.start()
 
+        # Start static workers
+        for node in self.static_nodes.values():
+            self.log.debug("Creating static node with arguments: %s" % (node,))
+            self._launchWorker(node)
+
     def loadJobs(self):
         self.log.debug("Loading jobs")
         builder = JJB()
@@ -136,18 +289,28 @@
         builder.parser.expandYaml()
         unseen = set(self.jobs.keys())
         for job in builder.parser.jobs:
+            builder.expandMacros(job)
             self.jobs[job['name']] = job
             unseen.discard(job['name'])
         for name in unseen:
             del self.jobs[name]
 
     def register(self):
-        self.worker.registerFunction("node-assign:zuul")
-        self.worker.registerFunction("stop:%s" % self.hostname)
+        new_functions = set()
+        if self.accept_nodes:
+            new_functions.add("node_assign:zuul")
+        new_functions.add("stop:%s" % self.hostname)
+        new_functions.add("set_description:%s" % self.hostname)
+        new_functions.add("node_revoke:%s" % self.hostname)
 
-    def reconfigure(self, config):
+        for function in new_functions - self.registered_functions:
+            self.worker.registerFunction(function)
+        for function in self.registered_functions - new_functions:
+            self.worker.unRegisterFunction(function)
+        self.registered_functions = new_functions
+
+    def reconfigure(self):
         self.log.debug("Reconfiguring")
-        self.config = config
         self.loadJobs()
         for node in self.node_workers.values():
             try:
@@ -156,25 +319,103 @@
             except Exception:
                 self.log.exception("Exception sending reconfigure command "
                                    "to worker:")
+        self.log.debug("Reconfiguration complete")
+
+    def pause(self):
+        self.log.debug("Pausing")
+        self.accept_nodes = False
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='pause'))
+            except Exception:
+                self.log.exception("Exception sending pause command "
+                                   "to worker:")
+        self.log.debug("Paused")
+
+    def unpause(self):
+        self.log.debug("Unpausing")
+        self.accept_nodes = self.config_accept_nodes
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='unpause'))
+            except Exception:
+                self.log.exception("Exception sending unpause command "
+                                   "to worker:")
+        self.log.debug("Unpaused")
+
+    def release(self):
+        self.log.debug("Releasing idle nodes")
+        for node in self.node_workers.values():
+            if node.name in self.static_nodes:
+                continue
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='release'))
+            except Exception:
+                self.log.exception("Exception sending release command "
+                                   "to worker:")
+        self.log.debug("Finished releasing idle nodes")
+
+    def graceful(self):
+        # Note: this is run in the command processing thread; no more
+        # external commands will be processed after this.
+        self.log.debug("Gracefully stopping")
+        self.pause()
+        self.release()
+        self.log.debug("Waiting for all builds to finish")
+        while self.builds:
+            time.sleep(5)
+        self.log.debug("All builds are finished")
+        self.stop()
 
     def stop(self):
         self.log.debug("Stopping")
+        # First, stop accepting new jobs
         self._gearman_running = False
         self._reaper_running = False
         self.worker.shutdown()
+        # Then stop all of the workers
         for node in self.node_workers.values():
             try:
                 if node.isAlive():
                     node.stop()
             except Exception:
                 self.log.exception("Exception sending stop command to worker:")
+        # Stop ZMQ afterwords so that the send queue is flushed
         self._zmq_running = False
         self.zmq_send_queue.put(None)
         self.zmq_send_queue.join()
+        # Stop command processing
+        self._command_running = False
+        self.command_socket.stop()
+        # Join the gearman thread which was stopped earlier.
+        self.gearman_thread.join()
+        # The command thread is joined in the join() method of this
+        # class, which is called by the command shell.
         self.log.debug("Stopped")
 
+    def verboseOn(self):
+        self.log.debug("Enabling verbose mode")
+        self.options['verbose'] = True
+
+    def verboseOff(self):
+        self.log.debug("Disabling verbose mode")
+        self.options['verbose'] = False
+
     def join(self):
-        self.gearman_thread.join()
+        self.command_thread.join()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get()
+                self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
 
     def runZMQ(self):
         while self._zmq_running or not self.zmq_send_queue.empty():
@@ -194,12 +435,19 @@
             try:
                 job = self.worker.getJob()
                 try:
-                    if job.name.startswith('node-assign:'):
-                        self.log.debug("Got node-assign job: %s" % job.unique)
+                    if job.name.startswith('node_assign:'):
+                        self.log.debug("Got node_assign job: %s" % job.unique)
                         self.assignNode(job)
                     elif job.name.startswith('stop:'):
                         self.log.debug("Got stop job: %s" % job.unique)
                         self.stopJob(job)
+                    elif job.name.startswith('set_description:'):
+                        self.log.debug("Got set_description job: %s" %
+                                       job.unique)
+                        job.sendWorkComplete()
+                    elif job.name.startswith('node_revoke:'):
+                        self.log.debug("Got node_revoke job: %s" % job.unique)
+                        self.revokeNode(job)
                     else:
                         self.log.error("Unable to handle job %s" % job.name)
                         job.sendWorkFail()
@@ -214,19 +462,44 @@
     def assignNode(self, job):
         args = json.loads(job.arguments)
         self.log.debug("Assigned node with arguments: %s" % (args,))
+        self._launchWorker(args)
+        data = dict(manager=self.hostname)
+        job.sendWorkData(json.dumps(data))
+        job.sendWorkComplete()
+
+    def _launchWorker(self, args):
         worker = NodeWorker(self.config, self.jobs, self.builds,
                             self.sites, args['name'], args['host'],
                             args['description'], args['labels'],
                             self.hostname, self.zmq_send_queue,
-                            self.termination_queue)
+                            self.termination_queue, self.keep_jobdir,
+                            self.callback_dir, self.library_dir,
+                            self.options)
         self.node_workers[worker.name] = worker
 
-        worker.process = multiprocessing.Process(target=worker.run)
-        worker.process.start()
+        worker.thread = threading.Thread(target=worker.run)
+        worker.thread.start()
 
-        data = dict(manager=self.hostname)
-        job.sendWorkData(json.dumps(data))
-        job.sendWorkComplete()
+    def revokeNode(self, job):
+        try:
+            args = json.loads(job.arguments)
+            self.log.debug("Revoke job with arguments: %s" % (args,))
+            name = args['name']
+            node = self.node_workers.get(name)
+            if not node:
+                self.log.debug("Unable to find worker %s" % (name,))
+                return
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='stop'))
+                else:
+                    self.log.debug("Node %s is not alive while revoking node" %
+                                   (node.name,))
+            except Exception:
+                self.log.exception("Exception sending stop command "
+                                   "to worker:")
+        finally:
+            job.sendWorkComplete()
 
     def stopJob(self, job):
         try:
@@ -261,6 +534,10 @@
                 self.log.debug("Got termination event %s" % (item,))
                 if item is None:
                     continue
+                worker = self.node_workers[item]
+                self.log.debug("Joining %s" % (item,))
+                worker.thread.join()
+                self.log.debug("Joined %s" % (item,))
                 del self.node_workers[item]
             except Exception:
                 self.log.exception("Exception while processing "
@@ -270,11 +547,11 @@
 
 
 class NodeWorker(object):
-    log = logging.getLogger("zuul.NodeWorker")
-
     def __init__(self, config, jobs, builds, sites, name, host,
                  description, labels, manager_name, zmq_send_queue,
-                 termination_queue):
+                 termination_queue, keep_jobdir, callback_dir,
+                 library_dir, options):
+        self.log = logging.getLogger("zuul.NodeWorker.%s" % (name,))
         self.log.debug("Creating node worker %s" % (name,))
         self.config = config
         self.jobs = jobs
@@ -286,17 +563,29 @@
         if not isinstance(labels, list):
             labels = [labels]
         self.labels = labels
-        self.process = None
+        self.thread = None
         self.registered_functions = set()
+        # If the unpaused Event is set, that means we should run jobs.
+        # If it is clear, then we are paused and should not run jobs.
+        self.unpaused = threading.Event()
+        self.unpaused.set()
         self._running = True
-        self.queue = multiprocessing.JoinableQueue()
+        self.queue = Queue.Queue()
         self.manager_name = manager_name
         self.zmq_send_queue = zmq_send_queue
         self.termination_queue = termination_queue
+        self.keep_jobdir = keep_jobdir
         self.running_job_lock = threading.Lock()
+        self.pending_registration = False
+        self.registration_lock = threading.Lock()
+        self._get_job_lock = threading.Lock()
+        self._got_job = False
         self._job_complete_event = threading.Event()
         self._running_job = False
+        self._aborted_job = False
         self._sent_complete_event = False
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         self.workspace_root = config.get('launcher', 'workspace_root')
         if self.config.has_option('launcher', 'private_key_file'):
             self.private_key_file = config.get('launcher', 'private_key_file')
@@ -306,31 +595,36 @@
             self.username = config.get('launcher', 'username')
         else:
             self.username = 'zuul'
+        self.callback_dir = callback_dir
+        self.library_dir = library_dir
+        self.options = options
 
     def isAlive(self):
         # Meant to be called from the manager
-        if self.process and self.process.is_alive():
+        if self.thread and self.thread.is_alive():
             return True
         return False
 
     def run(self):
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
         self.log.debug("Node worker %s starting" % (self.name,))
         server = self.config.get('gearman', 'server')
         if self.config.has_option('gearman', 'port'):
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker(self.name)
+        self.worker = NodeGearWorker(self.name)
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
+        self.log.debug("Registering")
         self.register()
 
         self.gearman_thread = threading.Thread(target=self.runGearman)
         self.gearman_thread.daemon = True
         self.gearman_thread.start()
 
+        self.log.debug("Started")
+
         while self._running or not self.queue.empty():
             try:
                 self._runQueue()
@@ -343,9 +637,31 @@
         # will be set by the queue thread.
         self.log.debug("Submitting stop request")
         self._running = False
+        self.unpaused.set()
         self.queue.put(dict(action='stop'))
         self.queue.join()
 
+    def pause(self):
+        self.unpaused.clear()
+        self.worker.stopWaitingForJobs()
+
+    def unpause(self):
+        self.unpaused.set()
+
+    def release(self):
+        # If this node is idle, stop it.
+        old_unpaused = self.unpaused.is_set()
+        if old_unpaused:
+            self.pause()
+        with self._get_job_lock:
+            if self._got_job:
+                self.log.debug("This worker is not idle")
+                if old_unpaused:
+                    self.unpause()
+                return
+        self.log.debug("Stopping due to release command")
+        self.queue.put(dict(action='stop'))
+
     def _runQueue(self):
         item = self.queue.get()
         try:
@@ -358,6 +674,15 @@
                 else:
                     self._job_complete_event.wait()
                 self.worker.shutdown()
+            if item['action'] == 'pause':
+                self.log.debug("Received pause request")
+                self.pause()
+            if item['action'] == 'unpause':
+                self.log.debug("Received unpause request")
+                self.unpause()
+            if item['action'] == 'release':
+                self.log.debug("Received release request")
+                self.release()
             elif item['action'] == 'reconfigure':
                 self.log.debug("Received reconfigure request")
                 self.register()
@@ -370,15 +695,23 @@
     def runGearman(self):
         while self._running:
             try:
-                self._runGearman()
+                self.unpaused.wait()
+                if self._running:
+                    self._runGearman()
             except Exception:
                 self.log.exception("Exception in gearman manager:")
+            with self._get_job_lock:
+                self._got_job = False
 
     def _runGearman(self):
-        try:
-            job = self.worker.getJob()
-        except gear.InterruptedError:
-            return
+        if self.pending_registration:
+            self.register()
+        with self._get_job_lock:
+            try:
+                job = self.worker.getJob()
+                self._got_job = True
+            except gear.InterruptedError:
+                return
         self.log.debug("Node worker %s got job %s" % (self.name, job.name))
         try:
             if job.name not in self.registered_functions:
@@ -406,24 +739,34 @@
         return ret
 
     def register(self):
-        if self._running_job:
+        if not self.registration_lock.acquire(False):
+            self.log.debug("Registration already in progress")
             return
-        new_functions = set()
-        for job in self.jobs.values():
-            new_functions |= self.generateFunctionNames(job)
-        for function in new_functions - self.registered_functions:
-            self.worker.registerFunction(function)
-        for function in self.registered_functions - new_functions:
-            self.worker.unRegisterFunction(function)
-        self.registered_functions = new_functions
+        try:
+            if self._running_job:
+                self.pending_registration = True
+                self.log.debug("Ignoring registration due to running job")
+                return
+            self.log.debug("Updating registration")
+            self.pending_registration = False
+            new_functions = set()
+            for job in self.jobs.values():
+                new_functions |= self.generateFunctionNames(job)
+            self.worker.sendMassDo(new_functions)
+            self.registered_functions = new_functions
+        finally:
+            self.registration_lock.release()
 
     def abortRunningJob(self):
+        self._aborted_job = True
+        return self.abortRunningProc(self.ansible_job_proc)
+
+    def abortRunningProc(self, proc):
         aborted = False
         self.log.debug("Abort: acquiring job lock")
         with self.running_job_lock:
             if self._running_job:
                 self.log.debug("Abort: a job is running")
-                proc = self.ansible_proc
                 if proc:
                     self.log.debug("Abort: sending kill signal to job "
                                    "process group")
@@ -445,15 +788,14 @@
 
         # Make sure we can parse what we need from the job first
         args = json.loads(job.arguments)
-        # This may be configurable later, or we may choose to honor
-        # OFFLINE_NODE_WHEN_COMPLETE
-        offline = True
+        offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
         job_name = job.name.split(':')[1]
 
         # Initialize the result so we have something regardless of
         # whether the job actually runs
         result = None
         self._sent_complete_event = False
+        self._aborted_job = False
 
         try:
             self.sendStartEvent(job_name, args)
@@ -466,11 +808,10 @@
             self.log.exception("Exception while launching job thread")
 
         self._running_job = False
-        if not result:
-            result = b''
 
         try:
-            job.sendWorkComplete(result)
+            data = json.dumps(dict(result=result))
+            job.sendWorkComplete(data)
         except Exception:
             self.log.exception("Exception while sending job completion packet")
 
@@ -521,7 +862,8 @@
                                'SUCCESS', {})
 
     def runJob(self, job, args):
-        self.ansible_proc = None
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         result = None
         with self.running_job_lock:
             if not self._running:
@@ -531,7 +873,7 @@
 
         self.log.debug("Job %s: beginning" % (job.unique,))
         self.builds[job.unique] = self.name
-        with JobDir() as jobdir:
+        with JobDir(self.keep_jobdir) as jobdir:
             self.log.debug("Job %s: job root at %s" %
                            (job.unique, jobdir.root))
             timeout = self.prepareAnsibleFiles(jobdir, job, args)
@@ -545,13 +887,23 @@
             job.sendWorkStatus(0, 100)
 
             job_status = self.runAnsiblePlaybook(jobdir, timeout)
-            post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
-            if job_status and post_status:
-                status = 'SUCCESS'
-            else:
-                status = 'FAILURE'
+            if job_status is None:
+                # The result of the job is indeterminate.  Zuul will
+                # run it again.
+                return result
 
-            result = json.dumps(dict(result=status))
+            post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
+            if not post_status:
+                result = 'POST_FAILURE'
+            elif job_status:
+                result = 'SUCCESS'
+            else:
+                result = 'FAILURE'
+
+            if self._aborted_job:
+                # A Null result will cause zuul to relaunch the job if
+                # it needs to.
+                result = None
 
         return result
 
@@ -559,56 +911,152 @@
         return [('node', dict(
             ansible_host=self.host, ansible_user=self.username))]
 
-    def _makeSCPTask(self, publisher):
+    def _substituteVariables(self, text, variables):
+        def lookup(match):
+            return variables.get(match.group(1), '')
+        return re.sub('\$([A-Za-z0-9_]+)', lookup, text)
+
+    def _getRsyncOptions(self, source, parameters):
+        # Treat the publisher source as a filter; ant and rsync behave
+        # fairly close in this manner, except for leading directories.
+        source = self._substituteVariables(source, parameters)
+        # If the source starts with ** then we want to match any
+        # number of directories, so don't anchor the include filter.
+        # If it does not start with **, then the intent is likely to
+        # at least start by matching an immediate file or subdirectory
+        # (even if later we have a ** in the middle), so in this case,
+        # anchor it to the root of the transfer (the workspace).
+        if not source.startswith('**'):
+            source = os.path.join('/', source)
+        # These options mean: include the thing we want, include any
+        # directories (so that we continue to search for the thing we
+        # want no matter how deep it is), exclude anything that
+        # doesn't match the thing we want or is a directory, then get
+        # rid of empty directories left over at the end.
+        rsync_opts = ['--include="%s"' % source,
+                      '--include="*/"',
+                      '--exclude="*"',
+                      '--prune-empty-dirs']
+        return rsync_opts
+
+    def _makeSCPTask(self, jobdir, publisher, parameters):
         tasks = []
         for scpfile in publisher['scp']['files']:
+            scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+            os.chmod(scproot, 0o755)
+
             site = publisher['scp']['site']
-            if site not in self.sites:
-                raise Exception("Undefined SCP site: %s" % (site,))
-            site = self.sites[site]
             if scpfile.get('copy-console'):
-                src = '/tmp/console.log'
+                # Include the local ansible directory in the console
+                # upload.  This uploads the playbook and ansible logs.
+                copyargs = dict(src=jobdir.ansible_root + '/',
+                                dest=os.path.join(scproot, '_zuul_ansible'))
+                task = dict(copy=copyargs,
+                            delegate_to='127.0.0.1')
+                tasks.append(task)
+
+                # Fetch the console log from the remote host.
+                src = '/tmp/console.html'
+                rsync_opts = []
             else:
-                src = scpfile['source']
-            dest = os.path.join(site['root'], scpfile['target'])
-            dest = os.path.normpath(dest)
-            if not dest.startswith(site['root']):
-                raise Exception("Target path %s is not below site root" %
-                                (dest,))
+                src = parameters['WORKSPACE']
+                if not src.endswith('/'):
+                    src = src + '/'
+                rsync_opts = self._getRsyncOptions(scpfile['source'],
+                                                   parameters)
+
             syncargs = dict(src=src,
-                            dest=dest)
-            task = dict(synchronize=syncargs,
-                        delegate_to=site['host'])
+                            dest=scproot,
+                            copy_links='yes',
+                            mode='pull')
+            if rsync_opts:
+                syncargs['rsync_opts'] = rsync_opts
+            task = dict(synchronize=syncargs)
             if not scpfile.get('copy-after-failure'):
                 task['when'] = 'success'
             tasks.append(task)
+
+            task = self._makeSCPTaskLocalAction(
+                site, scpfile, scproot, parameters)
+            tasks.append(task)
         return tasks
 
-    def _makeFTPTask(self, jobdir, publisher):
+    def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
+        if site not in self.sites:
+            raise Exception("Undefined SCP site: %s" % (site,))
+        site = self.sites[site]
+        dest = scpfile['target'].lstrip('/')
+        dest = self._substituteVariables(dest, parameters)
+        dest = os.path.join(site['root'], dest)
+        dest = os.path.normpath(dest)
+        if not dest.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (dest,))
+
+        rsync_cmd = [
+            '/usr/bin/rsync', '--delay-updates', '-F',
+            '--compress', '-rt', '--safe-links',
+            '--rsync-path="mkdir -p {dest} && rsync"',
+            '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
+            '-o StrictHostKeyChecking=no -q"',
+            '--out-format="<<CHANGED>>%i %n%L"',
+            '{source}', '"{user}@{host}:{dest}"'
+        ]
+        if scpfile.get('keep-hierarchy'):
+            source = '"%s/"' % scproot
+        else:
+            source = '`/usr/bin/find "%s" -type f`' % scproot
+        shellargs = ' '.join(rsync_cmd).format(
+            source=source,
+            dest=dest,
+            private_key_file=self.private_key_file,
+            host=site['host'],
+            user=site['user'])
+        task = dict(shell=shellargs,
+                    delegate_to='127.0.0.1')
+        if not scpfile.get('copy-after-failure'):
+            task['when'] = 'success'
+
+        return task
+
+    def _makeFTPTask(self, jobdir, publisher, parameters):
         tasks = []
         ftp = publisher['ftp']
         site = ftp['site']
         if site not in self.sites:
             raise Exception("Undefined FTP site: %s" % site)
         site = self.sites[site]
-        ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
+
+        ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
         ftpcontent = os.path.join(ftproot, 'content')
         os.makedirs(ftpcontent)
         ftpscript = os.path.join(ftproot, 'script')
-        syncargs = dict(src=ftp['source'],
-                        dest=ftpcontent)
+
+        src = parameters['WORKSPACE']
+        if not src.endswith('/'):
+            src = src + '/'
+        rsync_opts = self._getRsyncOptions(ftp['source'],
+                                           parameters)
+        syncargs = dict(src=src,
+                        dest=ftpcontent,
+                        copy_links='yes',
+                        mode='pull')
+        if rsync_opts:
+            syncargs['rsync_opts'] = rsync_opts
         task = dict(synchronize=syncargs,
                     when='success')
         tasks.append(task)
         task = dict(shell='lftp -f %s' % ftpscript,
-                    when='success')
+                    when='success',
+                    delegate_to='127.0.0.1')
         ftpsource = ftpcontent
         if ftp.get('remove-prefix'):
             ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
         while ftpsource[-1] == '/':
             ftpsource = ftpsource[:-1]
-        ftptarget = ftp['target']
-        ftptarget = os.path.join(site['root'], ftp['target'])
+        ftptarget = ftp['target'].lstrip('/')
+        ftptarget = self._substituteVariables(ftptarget, parameters)
+        ftptarget = os.path.join(site['root'], ftptarget)
         ftptarget = os.path.normpath(ftptarget)
         if not ftptarget.startswith(site['root']):
             raise Exception("Target path %s is not below site root" %
@@ -622,17 +1070,20 @@
         tasks.append(task)
         return tasks
 
-    def _makeBuilderTask(self, jobdir, builder, parameters, timeout):
+    def _makeBuilderTask(self, jobdir, builder, parameters):
         tasks = []
         script_fn = '%s.sh' % str(uuid.uuid4().hex)
         script_path = os.path.join(jobdir.script_root, script_fn)
         with open(script_path, 'w') as script:
-            script.write(builder['shell'])
+            data = builder['shell']
+            if not data.startswith('#!'):
+                data = '#!/bin/bash -x\n %s' % (data,)
+            script.write(data)
 
         remote_path = os.path.join('/tmp', script_fn)
         copy = dict(src=script_path,
                     dest=remote_path,
-                    mode=0555)
+                    mode=0o555)
         task = dict(copy=copy)
         tasks.append(task)
 
@@ -640,11 +1091,10 @@
                       cwd=parameters['WORKSPACE'],
                       parameters=parameters)
         task = dict(zuul_runner=runner)
-        if timeout:
-            task['when'] = '{{ timeout | int > 0 }}'
-            task['async'] = '{{ timeout }}'
-        else:
-            task['async'] = 2 * 60 * 60  # 2 hour default timeout
+        task['name'] = ('zuul_runner with {{ timeout | int - elapsed_time }} '
+                        'second timeout')
+        task['when'] = '{{ elapsed_time < timeout | int }}'
+        task['async'] = '{{ timeout | int - elapsed_time }}'
         task['poll'] = 5
         tasks.append(task)
 
@@ -655,6 +1105,34 @@
 
         return tasks
 
+    def _transformPublishers(self, jjb_job):
+        early_publishers = []
+        late_publishers = []
+        old_publishers = jjb_job.get('publishers', [])
+        for publisher in old_publishers:
+            early_scpfiles = []
+            late_scpfiles = []
+            if 'scp' not in publisher:
+                early_publishers.append(publisher)
+                continue
+            copy_console = False
+            for scpfile in publisher['scp']['files']:
+                if scpfile.get('copy-console'):
+                    scpfile['keep-hierarchy'] = True
+                    late_scpfiles.append(scpfile)
+                    copy_console = True
+                else:
+                    early_scpfiles.append(scpfile)
+            publisher['scp']['files'] = early_scpfiles + late_scpfiles
+            if copy_console:
+                late_publishers.append(publisher)
+            else:
+                early_publishers.append(publisher)
+        publishers = early_publishers + late_publishers
+        if old_publishers != publishers:
+            self.log.debug("Transformed job publishers")
+        return early_publishers, late_publishers
+
     def prepareAnsibleFiles(self, jobdir, gearman_job, args):
         job_name = gearman_job.name.split(':')[1]
         jjb_job = self.jobs[job_name]
@@ -670,94 +1148,237 @@
                 inventory.write('\n')
 
         timeout = None
+        timeout_var = None
         for wrapper in jjb_job.get('wrappers', []):
             if isinstance(wrapper, dict):
-                timeout = wrapper.get('build-timeout', {})
-                if isinstance(timeout, dict):
-                    timeout = timeout.get('timeout')
-                    if timeout:
-                        timeout = timeout * 60
+                build_timeout = wrapper.get('timeout')
+                if isinstance(build_timeout, dict):
+                    timeout_var = build_timeout.get('timeout-var')
+                    timeout = build_timeout.get('timeout')
+                    if timeout is not None:
+                        timeout = int(timeout) * 60
+        if not timeout:
+            timeout = ANSIBLE_DEFAULT_TIMEOUT
+        if timeout_var:
+            parameters[timeout_var] = str(timeout * 1000)
 
         with open(jobdir.playbook, 'w') as playbook:
+            pre_tasks = []
             tasks = []
+            main_block = []
+            error_block = []
+            variables = []
 
-            task = dict(file=dict(path='/tmp/console.log', state='absent'))
-            tasks.append(task)
+            shellargs = "ssh-keyscan %s > %s" % (
+                self.host, jobdir.known_hosts)
+            pre_tasks.append(dict(shell=shellargs,
+                             delegate_to='127.0.0.1'))
 
-            task = dict(zuul_console=dict(path='/tmp/console.log', port=8088))
-            tasks.append(task)
+            tasks.append(dict(block=main_block,
+                              rescue=error_block))
+
+            task = dict(file=dict(path='/tmp/console.html', state='absent'))
+            main_block.append(task)
+
+            task = dict(zuul_console=dict(path='/tmp/console.html', port=8088))
+            main_block.append(task)
 
             task = dict(file=dict(path=parameters['WORKSPACE'],
                                   state='directory'))
-            tasks.append(task)
+            main_block.append(task)
+
+            msg = [
+                "Launched by %s" % self.manager_name,
+                "Building remotely on %s in workspace %s" % (
+                    self.name, parameters['WORKSPACE'])]
+            task = dict(zuul_log=dict(msg=msg))
+            main_block.append(task)
 
             for builder in jjb_job.get('builders', []):
                 if 'shell' in builder:
-                    tasks.extend(self._makeBuilderTask(jobdir, builder,
-                                                       parameters, timeout))
-            play = dict(hosts='node', name='Job body',
-                        tasks=tasks)
-            playbook.write(yaml.dump([play]))
+                    main_block.extend(
+                        self._makeBuilderTask(jobdir, builder, parameters))
+            task = dict(zuul_log=dict(msg="Job complete, result: SUCCESS"))
+            main_block.append(task)
+
+            task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
+            error_block.append(task)
+            error_block.append(dict(fail=dict(msg='FAILURE')))
+
+            variables.append(dict(timeout=timeout))
+            play = dict(hosts='node', name='Job body', vars=variables,
+                        pre_tasks=pre_tasks, tasks=tasks)
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+        early_publishers, late_publishers = self._transformPublishers(jjb_job)
 
         with open(jobdir.post_playbook, 'w') as playbook:
+            blocks = []
+            for publishers in [early_publishers, late_publishers]:
+                block = []
+                for publisher in publishers:
+                    if 'scp' in publisher:
+                        block.extend(self._makeSCPTask(jobdir, publisher,
+                                                       parameters))
+                    if 'ftp' in publisher:
+                        block.extend(self._makeFTPTask(jobdir, publisher,
+                                                       parameters))
+                blocks.append(block)
+
+            # The 'always' section contains the log publishing tasks,
+            # the 'block' contains all the other publishers.  This way
+            # we run the log publisher regardless of whether the rest
+            # of the publishers succeed.
             tasks = []
-            for publisher in jjb_job.get('publishers', []):
-                if 'scp' in publisher:
-                    tasks.extend(self._makeSCPTask(publisher))
-                if 'ftp' in publisher:
-                    tasks.extend(self._makeFTPTask(jobdir, publisher))
+            tasks.append(dict(block=blocks[0],
+                              always=blocks[1]))
+
             play = dict(hosts='node', name='Publishers',
                         tasks=tasks)
-            playbook.write(yaml.dump([play]))
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
 
         with open(jobdir.config, 'w') as config:
             config.write('[defaults]\n')
             config.write('hostfile = %s\n' % jobdir.inventory)
-            config.write('host_key_checking = False\n')
+            config.write('keep_remote_files = True\n')
+            config.write('local_tmp = %s/.ansible/tmp\n' % jobdir.root)
             config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
+            config.write('log_path = %s\n' % jobdir.ansible_log)
+            config.write('gathering = explicit\n')
+            config.write('callback_plugins = %s\n' % self.callback_dir)
+            config.write('library = %s\n' % self.library_dir)
 
-            callback_path = zuul.ansible.plugins.callback_plugins.__file__
-            callback_path = os.path.abspath(callback_path)
-            callback_path = os.path.dirname(callback_path)
-            config.write('callback_plugins = %s\n' % callback_path)
-
-            library_path = zuul.ansible.library.__file__
-            library_path = os.path.abspath(library_path)
-            library_path = os.path.dirname(library_path)
-            config.write('library = %s\n' % library_path)
+            config.write('[ssh_connection]\n')
+            ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
+                "-o UserKnownHostsFile=%s" % jobdir.known_hosts
+            config.write('ssh_args = %s\n' % ssh_args)
 
         return timeout
 
+    def _ansibleTimeout(self, proc, msg):
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+
     def runAnsiblePlaybook(self, jobdir, timeout):
-        self.ansible_proc = subprocess.Popen(
-            ['ansible-playbook', jobdir.playbook,
-             '-e', 'timeout=%s' % timeout, '-v'],
+        # Set LOGNAME env variable so Ansible log_path log reports
+        # the correct user.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+
+        if self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+
+        cmd = ['ansible-playbook', jobdir.playbook, verbose]
+        self.log.debug("Ansible command: %s" % (cmd,))
+
+        self.ansible_job_proc = subprocess.Popen(
+            cmd,
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
+            env=env_copy,
         )
-        (out, err) = self.ansible_proc.communicate()
-        self.log.debug("Ansible stdout:\n%s" % out)
-        self.log.debug("Ansible stderr:\n%s" % err)
-        ret = self.ansible_proc.wait()
-        self.ansible_proc = None
+        ret = None
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (self.ansible_job_proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_job_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = self.ansible_job_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
+        self.ansible_job_proc = None
+        if ret == 3:
+            # AnsibleHostUnreachable: We had a network issue connecting to
+            # our zuul-worker.
+            return None
+        elif ret == -9:
+            # Received abort request.
+            return None
         return ret == 0
 
     def runAnsiblePostPlaybook(self, jobdir, success):
-        proc = subprocess.Popen(
-            ['ansible-playbook', jobdir.post_playbook,
-             '-e', 'success=%s' % success],
+        # Set LOGNAME env variable so Ansible log_path log reports
+        # the correct user.
+        env_copy = os.environ.copy()
+        env_copy['LOGNAME'] = 'zuul'
+
+        if self.options['verbose']:
+            verbose = '-vvv'
+        else:
+            verbose = '-v'
+
+        cmd = ['ansible-playbook', jobdir.post_playbook,
+               '-e', 'success=%s' % success, verbose]
+        self.log.debug("Ansible post command: %s" % (cmd,))
+
+        self.ansible_post_proc = subprocess.Popen(
+            cmd,
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
+            env=env_copy,
         )
-        (out, err) = proc.communicate()
-        return proc.wait() == 0
+        ret = None
+        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+                            self._ansibleTimeout,
+                            (self.ansible_post_proc,
+                             "Ansible post timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_post_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible post output: %s" % (line,))
+            ret = self.ansible_post_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible post exit code: %s" % (ret,))
+        self.ansible_post_proc = None
+        return ret == 0
 
 
 class JJB(jenkins_jobs.builder.Builder):
     def __init__(self):
         self.global_config = None
         self._plugins_list = []
+
+    def expandComponent(self, component_type, component, template_data):
+        component_list_type = component_type + 's'
+        new_components = []
+        if isinstance(component, dict):
+            name, component_data = next(iter(component.items()))
+            if template_data:
+                component_data = jenkins_jobs.formatter.deep_format(
+                    component_data, template_data, True)
+        else:
+            name = component
+            component_data = {}
+
+        new_component = self.parser.data.get(component_type, {}).get(name)
+        if new_component:
+            for new_sub_component in new_component[component_list_type]:
+                new_components.extend(
+                    self.expandComponent(component_type,
+                                         new_sub_component, component_data))
+        else:
+            new_components.append({name: component_data})
+        return new_components
+
+    def expandMacros(self, job):
+        for component_type in ['builder', 'publisher', 'wrapper']:
+            component_list_type = component_type + 's'
+            new_components = []
+            for new_component in job.get(component_list_type, []):
+                new_components.extend(self.expandComponent(component_type,
+                                                           new_component, {}))
+            job[component_list_type] = new_components
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 69fb71b..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
 import json
 import logging
 import os
+import six
 import time
 import threading
 from uuid import uuid4
@@ -164,6 +165,11 @@
             port = config.get('gearman', 'port')
         else:
             port = 4730
+        if config.has_option('gearman', 'check_job_registration'):
+            self.job_registration = config.getboolean(
+                'gearman', 'check_job_registration')
+        else:
+            self.job_registration = True
 
         self.gearman = ZuulGearmanClient(self)
         self.gearman.addServer(server, port)
@@ -231,7 +237,7 @@
                 s_config = {}
                 s_config.update((k, v.format(item=item, job=job,
                                              change=item.change))
-                                if isinstance(v, basestring)
+                                if isinstance(v, six.string_types)
                                 else (k, v)
                                 for k, v in s.items())
 
@@ -351,7 +357,8 @@
         build.__gearman_job = gearman_job
         self.builds[uuid] = build
 
-        if not self.isJobRegistered(gearman_job.name):
+        if self.job_registration and not self.isJobRegistered(
+                gearman_job.name):
             self.log.error("Job %s is not registered with Gearman" %
                            gearman_job)
             self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@@ -456,9 +463,6 @@
                 build.number = data.get('number')
                 build.__gearman_manager = data.get('manager')
                 self.sched.onBuildStarted(build)
-
-            if job.denominator:
-                build.estimated_time = float(job.denominator) / 1000
         else:
             self.log.error("Unable to find build %s" % job.unique)
 
@@ -505,7 +509,7 @@
             # us where the job is running.
             return False
 
-        if not self.isJobRegistered(name):
+        if self.job_registration and not self.isJobRegistered(name):
             return False
 
         desc_uuid = str(uuid4().hex)
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
 import os
 import re
 
+import six
+
+
 OrderedDict = extras.try_imports(['collections.OrderedDict',
                                   'ordereddict.OrderedDict'])
 
@@ -59,17 +62,17 @@
             raise Exception("Expansion error. Check error messages above")
 
         self.log.info("Mapping projects to workspace...")
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             dest = os.path.normpath(os.path.join(workspace, dest[0]))
             ret[project] = dest
             self.log.info("  %s -> %s", project, dest)
 
         self.log.debug("Checking overlap in destination directories...")
         check = defaultdict(list)
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             check[dest].append(project)
 
-        dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+        dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
         if dupes:
             raise Exception("Some projects share the same destination: %s",
                             dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index 62ab938..3c7f04d 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,6 +19,8 @@
 import re
 import yaml
 
+import six
+
 from git import GitCommandError
 from zuul import exceptions
 from zuul.lib.clonemapper import CloneMapper
@@ -68,7 +70,7 @@
         dests = mapper.expand(workspace=self.workspace)
 
         self.log.info("Preparing %s repositories", len(dests))
-        for project, dest in dests.iteritems():
+        for project, dest in six.iteritems(dests):
             self.prepareRepo(project, dest)
         self.log.info("Prepared all repositories")
 
diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py
new file mode 100644
index 0000000..1b7fed9
--- /dev/null
+++ b/zuul/lib/commandsocket.py
@@ -0,0 +1,83 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2016 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import socket
+import threading
+import Queue
+
+
+class CommandSocket(object):
+    log = logging.getLogger("zuul.CommandSocket")
+
+    def __init__(self, path):
+        self.running = False
+        self.path = path
+        self.queue = Queue.Queue()
+
+    def start(self):
+        self.running = True
+        if os.path.exists(self.path):
+            os.unlink(self.path)
+        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.socket.bind(self.path)
+        self.socket.listen(1)
+        self.socket_thread = threading.Thread(target=self._socketListener)
+        self.socket_thread.daemon = True
+        self.socket_thread.start()
+
+    def stop(self):
+        # First, wake up our listener thread with a connection and
+        # tell it to stop running.
+        self.running = False
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(self.path)
+        s.sendall('_stop\n')
+        # The command '_stop' will be ignored by our listener, so
+        # directly inject it into the queue so that consumers of this
+        # class which are waiting in .get() are awakened.  They can
+        # either handle '_stop' or just ignore the unknown command and
+        # then check to see if they should continue to run before
+        # re-entering their loop.
+        self.queue.put('_stop')
+        self.socket_thread.join()
+
+    def _socketListener(self):
+        while self.running:
+            try:
+                s, addr = self.socket.accept()
+                self.log.debug("Accepted socket connection %s" % (s,))
+                buf = ''
+                while True:
+                    buf += s.recv(1)
+                    if buf[-1] == '\n':
+                        break
+                buf = buf.strip()
+                self.log.debug("Received %s from socket" % (buf,))
+                s.close()
+                # Because we use '_stop' internally to wake up a
+                # waiting thread, don't allow it to actually be
+                # injected externally.
+                if buf != '_stop':
+                    self.queue.put(buf)
+            except Exception:
+                self.log.exception("Exception in socket handler")
+
+    def get(self):
+        if not self.running:
+            raise Exception("CommandSocket.get called while stopped")
+        return self.queue.get()
diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py
new file mode 100644
index 0000000..9cddca3
--- /dev/null
+++ b/zuul/lib/gearserver.py
@@ -0,0 +1,35 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+
+MASS_DO = 101
+
+
+class GearServer(gear.Server):
+    def handlePacket(self, packet):
+        if packet.ptype == MASS_DO:
+            self.log.info("Received packet from %s: %s" % (packet.connection,
+                                                           packet))
+            self.handleMassDo(packet)
+        else:
+            return super(GearServer, self).handlePacket(packet)
+
+    def handleMassDo(self, packet):
+        packet.connection.functions = set()
+        for name in packet.data.split(b'\x00'):
+            self.log.debug("Adding function %s to %s" % (
+                name, packet.connection))
+            packet.connection.functions.add(name)
+            self.functions.add(name)
diff --git a/zuul/lib/swift.py b/zuul/lib/swift.py
index 3c411d3..b5d3bc7 100644
--- a/zuul/lib/swift.py
+++ b/zuul/lib/swift.py
@@ -19,8 +19,8 @@
 import os
 import random
 import six
+from six.moves import urllib
 import string
-import urlparse
 
 
 class Swift(object):
@@ -156,7 +156,7 @@
         url = os.path.join(self.storage_url, settings['container'],
                            settings['file_path_prefix'],
                            destination_prefix)
-        u = urlparse.urlparse(url)
+        u = urllib.parse.urlparse(url)
 
         hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
                                             settings['max_file_size'],
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..3bc29e6 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -210,7 +210,7 @@
         fd.write('#!/bin/bash\n')
         fd.write('ssh -i %s $@\n' % key)
         fd.close()
-        os.chmod(name, 0755)
+        os.chmod(name, 0o755)
 
     def addProject(self, project, url):
         repo = None
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..d56993c 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,7 +19,7 @@
 
 import gear
 
-import merger
+from zuul.merger import merger
 
 
 class MergeServer(object):
diff --git a/zuul/model.py b/zuul/model.py
index 5bea5d0..46b0b98 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -13,7 +13,9 @@
 # under the License.
 
 import copy
+import os
 import re
+import struct
 import time
 from uuid import uuid4
 import extras
@@ -108,7 +110,11 @@
         return job_tree
 
     def getProjects(self):
-        return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+        # cmp is not in python3, applied idiom from
+        # http://python-future.org/compatible_idioms.html#cmp
+        return sorted(
+            self.job_trees.keys(),
+            key=lambda p: p.name)
 
     def addQueue(self, queue):
         self.queues.append(queue)
@@ -415,7 +421,7 @@
             elif self.window_decrease_type == 'exponential':
                 self.window = max(
                     self.window_floor,
-                    self.window / self.window_decrease_factor)
+                    int(self.window / self.window_decrease_factor))
 
 
 class Project(object):
@@ -1073,7 +1079,7 @@
         for a in approvals:
             for k, v in a.items():
                 if k == 'username':
-                    pass
+                    a['username'] = re.compile(v)
                 elif k in ['email', 'email-filter']:
                     a['email'] = re.compile(v)
                 elif k == 'newer-than':
@@ -1092,7 +1098,7 @@
         by = approval.get('by', {})
         for k, v in rapproval.items():
             if k == 'username':
-                if (by.get('username', '') != v):
+                if (not v.search(by.get('username', ''))):
                         return False
             elif k == 'email':
                 if (not v.search(by.get('email', ''))):
@@ -1380,3 +1386,78 @@
                     job.copy(metajob)
             self.jobs[name] = job
         return job
+
+
+class JobTimeData(object):
+    format = 'B10H10H10B'
+    version = 0
+
+    def __init__(self, path):
+        self.path = path
+        self.success_times = [0 for x in range(10)]
+        self.failure_times = [0 for x in range(10)]
+        self.results = [0 for x in range(10)]
+
+    def load(self):
+        if not os.path.exists(self.path):
+            return
+        with open(self.path) as f:
+            data = struct.unpack(self.format, f.read())
+        version = data[0]
+        if version != self.version:
+            raise Exception("Unkown data version")
+        self.success_times = list(data[1:11])
+        self.failure_times = list(data[11:21])
+        self.results = list(data[21:32])
+
+    def save(self):
+        tmpfile = self.path + '.tmp'
+        data = [self.version]
+        data.extend(self.success_times)
+        data.extend(self.failure_times)
+        data.extend(self.results)
+        data = struct.pack(self.format, *data)
+        with open(tmpfile, 'w') as f:
+            f.write(data)
+        os.rename(tmpfile, self.path)
+
+    def add(self, elapsed, result):
+        elapsed = int(elapsed)
+        if result == 'SUCCESS':
+            self.success_times.append(elapsed)
+            self.success_times.pop(0)
+            result = 0
+        else:
+            self.failure_times.append(elapsed)
+            self.failure_times.pop(0)
+            result = 1
+        self.results.append(result)
+        self.results.pop(0)
+
+    def getEstimatedTime(self):
+        times = [x for x in self.success_times if x]
+        if times:
+            return float(sum(times)) / len(times)
+        return 0.0
+
+
+class TimeDataBase(object):
+    def __init__(self, root):
+        self.root = root
+        self.jobs = {}
+
+    def _getTD(self, name):
+        td = self.jobs.get(name)
+        if not td:
+            td = JobTimeData(os.path.join(self.root, name))
+            self.jobs[name] = td
+            td.load()
+        return td
+
+    def getEstimatedTime(self, name):
+        return self._getTD(name).getEstimatedTime()
+
+    def update(self, name, elapsed, result):
+        td = self._getTD(name)
+        td.add(elapsed, result)
+        td.save()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d54da9f..716dcfb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -21,7 +21,7 @@
 import gear
 import six
 
-import model
+from zuul import model
 
 
 class RPCListener(object):
@@ -40,11 +40,11 @@
             port = 4730
         self.worker = gear.Worker('Zuul RPC Listener')
         self.worker.addServer(server, port)
+        self.worker.waitForServer()
+        self.register()
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
-        self.worker.waitForServer()
-        self.register()
 
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
@@ -66,8 +66,8 @@
         while self._running:
             try:
                 job = self.worker.getJob()
-                z, jobname = job.name.split(':')
                 self.log.debug("Received job %s" % job.name)
+                z, jobname = job.name.split(':')
                 attrname = 'handle_' + jobname
                 if hasattr(self, attrname):
                     f = getattr(self, attrname)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index aea9a67..a30b735 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,6 +20,7 @@
 import logging
 import os
 import pickle
+import six
 from six.moves import queue as Queue
 import re
 import sys
@@ -27,10 +28,10 @@
 import time
 import yaml
 
-import layoutvalidator
-import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
+from zuul import layoutvalidator
+from zuul import model
+from zuul.model import Pipeline, Project, ChangeQueue
+from zuul.model import ChangeishFilter, NullChange
 from zuul import change_matcher, exceptions
 from zuul import version as zuul_version
 
@@ -125,12 +126,10 @@
     """An event that should be processed within the main queue run loop"""
     def __init__(self):
         self._wait_event = threading.Event()
-        self._exception = None
-        self._traceback = None
+        self._exc_info = None
 
-    def exception(self, e, tb):
-        self._exception = e
-        self._traceback = tb
+    def exception(self, exc_info):
+        self._exc_info = exc_info
         self._wait_event.set()
 
     def done(self):
@@ -138,8 +137,8 @@
 
     def wait(self, timeout=None):
         self._wait_event.wait(timeout)
-        if self._exception:
-            raise self._exception, None, self._traceback
+        if self._exc_info:
+            six.reraise(*self._exc_info)
         return self._wait_event.is_set()
 
 
@@ -236,7 +235,7 @@
 class Scheduler(threading.Thread):
     log = logging.getLogger("zuul.Scheduler")
 
-    def __init__(self, config):
+    def __init__(self, config, testonly=False):
         threading.Thread.__init__(self)
         self.daemon = True
         self.wake_event = threading.Event()
@@ -262,6 +261,10 @@
         self.management_event_queue = Queue.Queue()
         self.layout = model.Layout()
 
+        if not testonly:
+            time_dir = self._get_time_database_dir()
+            self.time_database = model.TimeDataBase(time_dir)
+
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
 
@@ -408,7 +411,9 @@
                     base = os.path.dirname(os.path.realpath(config_path))
                     fn = os.path.join(base, fn)
                 fn = os.path.expanduser(fn)
-                execfile(fn, config_env)
+                with open(fn) as _f:
+                    code = compile(_f.read(), fn, 'exec')
+                    six.exec_(code, config_env)
 
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])
@@ -740,6 +745,17 @@
             state_dir = '/var/lib/zuul'
         return os.path.join(state_dir, 'queue.pickle')
 
+    def _get_time_database_dir(self):
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(self.config.get('zuul',
+                                                           'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        d = os.path.join(state_dir, 'times')
+        if not os.path.exists(d):
+            os.mkdir(d)
+        return d
+
     def _save_queue(self):
         pickle_file = self._get_queue_pickle_file()
         events = []
@@ -1038,8 +1054,8 @@
             else:
                 self.log.error("Unable to handle event %s" % event)
             event.done()
-        except Exception as e:
-            event.exception(e, sys.exc_info()[2])
+        except Exception:
+            event.exception(sys.exc_info())
         self.management_event_queue.task_done()
 
     def process_result_queue(self):
@@ -1069,6 +1085,11 @@
             self.log.warning("Build %s is not associated with a pipeline" %
                              (build,))
             return
+        try:
+            build.estimated_time = float(self.time_database.getEstimatedTime(
+                build.job.name))
+        except Exception:
+            self.log.exception("Exception estimating build time:")
         pipeline.manager.onBuildStarted(event.build)
 
     def _doBuildCompletedEvent(self, event):
@@ -1082,6 +1103,13 @@
             self.log.warning("Build %s is not associated with a pipeline" %
                              (build,))
             return
+        if build.end_time and build.start_time and build.result:
+            duration = build.end_time - build.start_time
+            try:
+                self.time_database.update(
+                    build.job.name, duration, build.result)
+            except Exception:
+                self.log.exception("Exception recording build time:")
         pipeline.manager.onBuildCompleted(event.build)
 
     def _doMergeCompletedEvent(self, event):