Merge "Move a debug logging line by one"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
 [DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
 test_id_option=--load-list $IDFILE
 test_list_option=--list
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c61cea8..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
    https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
 
 .. _`Turbo-Hipster`:
-   http://git.openstack.org/cgit/stackforge/turbo-hipster/
+   https://git.openstack.org/cgit/openstack/turbo-hipster/
 
 .. _`Turbo-Hipster Documentation`:
    http://turbo-hipster.rtfd.org/
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 98e4bb8..07b777a 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -49,6 +49,11 @@
   Port on which the Gearman server is listening.
   ``port=4730``
 
+**check_job_registration**
+  Check to see if job is registered with Gearman or not. When True
+  a build result of NOT_REGISTERED will be return if job is not found.
+  ``check_job_registration=True``
+
 gearman_server
 """"""""""""""
 
diff --git a/tests/base.py b/tests/base.py
index 585f2d2..5b31eea 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -22,22 +22,22 @@
 import os
 import pprint
 from six.moves import queue as Queue
+from six.moves import urllib
 import random
 import re
 import select
 import shutil
+from six.moves import reload_module
 import socket
 import string
 import subprocess
 import swiftclient
 import threading
 import time
-import urllib2
 
 import git
 import gear
 import fixtures
-import six.moves.urllib.parse as urlparse
 import statsd
 import testtools
 from git import GitCommandError
@@ -479,7 +479,7 @@
         self.url = url
 
     def read(self):
-        res = urlparse.urlparse(self.url)
+        res = urllib.parse.urlparse(self.url)
         path = res.path
         project = '/'.join(path.split('/')[2:-2])
         ret = '001e# service=git-upload-pack\n'
@@ -862,6 +862,28 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
+            # NOTE(notmorgan): Extract logging overrides for specific libraries
+            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+            # each. This is used to limit the output during test runs from
+            # libraries that zuul depends on such as gear.
+            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+            if log_defaults_from_env:
+                for default in log_defaults_from_env.split(','):
+                    try:
+                        name, level_str = default.split('=', 1)
+                        level = getattr(logging, level_str, logging.DEBUG)
+                        self.useFixture(fixtures.FakeLogger(
+                            name=name,
+                            level=level,
+                            format='%(asctime)s %(name)-32s '
+                                   '%(levelname)-8s %(message)s'))
+                    except ValueError:
+                        # NOTE(notmorgan): Invalid format of the log default,
+                        # skip and don't try and apply a logger for the
+                        # specified module
+                        pass
+
 
 class ZuulTestCase(BaseTestCase):
 
@@ -917,8 +939,8 @@
         os.environ['STATSD_PORT'] = str(self.statsd.port)
         self.statsd.start()
         # the statsd client object is configured in the statsd module import
-        reload(statsd)
-        reload(zuul.scheduler)
+        reload_module(statsd)
+        reload_module(zuul.scheduler)
 
         self.gearman_server = FakeGearmanServer()
 
@@ -947,12 +969,12 @@
         self.sched.registerConnections(self.connections)
 
         def URLOpenerFactory(*args, **kw):
-            if isinstance(args[0], urllib2.Request):
+            if isinstance(args[0], urllib.request.Request):
                 return old_urlopen(*args, **kw)
             return FakeURLOpener(self.upstream_root, *args, **kw)
 
-        old_urlopen = urllib2.urlopen
-        urllib2.urlopen = URLOpenerFactory
+        old_urlopen = urllib.request.urlopen
+        urllib.request.urlopen = URLOpenerFactory
 
         self.merge_server = zuul.merger.server.MergeServer(self.config,
                                                            self.connections)
@@ -1296,9 +1318,8 @@
         start = time.time()
         while True:
             if time.time() - start > 10:
-                print 'queue status:',
-                print ' '.join(self.eventQueuesEmpty())
-                print self.areAllBuildsWaiting()
+                print('queue status:', ''.join(self.eventQueuesEmpty()))
+                print(self.areAllBuildsWaiting())
                 raise Exception("Timeout waiting for Zuul to settle")
             # Make sure no new events show up while we're checking
             self.worker.lock.acquire()
@@ -1336,8 +1357,8 @@
         for pipeline in self.sched.layout.pipelines.values():
             for queue in pipeline.queues:
                 if len(queue.queue) != 0:
-                    print 'pipeline %s queue %s contents %s' % (
-                        pipeline.name, queue.name, queue.queue)
+                    print('pipeline %s queue %s contents %s' % (
+                        pipeline.name, queue.name, queue.queue))
                 self.assertEqual(len(queue.queue), 0,
                                  "Pipelines queues should be empty")
 
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3dc3234..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import ConfigParser
+from six.moves import configparser as ConfigParser
 import os
 import re
 
@@ -33,13 +33,13 @@
 class TestLayoutValidator(testtools.TestCase):
     def test_layouts(self):
         """Test layout file validation"""
-        print
+        print()
         errors = []
         for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')):
             m = LAYOUT_RE.match(fn)
             if not m:
                 continue
-            print fn
+            print(fn)
 
             # Load any .conf file by the same name but .conf extension.
             config_file = ("%s.conf" %
@@ -69,7 +69,7 @@
                                     fn)
                 except voluptuous.Invalid as e:
                     error = str(e)
-                    print '  ', error
+                    print('  ', error)
                     if error in errors:
                         raise Exception("Error has already been tested: %s" %
                                         error)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 15d33c8..ea512a2 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -20,11 +20,10 @@
 import re
 import shutil
 import time
-import urllib
-import urllib2
 import yaml
 
 import git
+from six.moves import urllib
 import testtools
 
 import zuul.change_matcher
@@ -1484,7 +1483,7 @@
         self.worker.build_history = []
 
         path = os.path.join(self.git_root, "org/project")
-        print repack_repo(path)
+        print(repack_repo(path))
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
         A.addApproval('CRVW', 2)
@@ -1509,9 +1508,9 @@
         A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
         A.addPatchset(large=True)
         path = os.path.join(self.upstream_root, "org/project1")
-        print repack_repo(path)
+        print(repack_repo(path))
         path = os.path.join(self.git_root, "org/project1")
-        print repack_repo(path)
+        print(repack_repo(path))
 
         A.addApproval('CRVW', 2)
         self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@@ -2275,8 +2274,8 @@
 
         port = self.webapp.server.socket.getsockname()[1]
 
-        req = urllib2.Request("http://localhost:%s/status.json" % port)
-        f = urllib2.urlopen(req)
+        req = urllib.request.Request("http://localhost:%s/status.json" % port)
+        f = urllib.request.urlopen(req)
         headers = f.info()
         self.assertIn('Content-Length', headers)
         self.assertIn('Content-Type', headers)
@@ -2881,7 +2880,8 @@
 
         port = self.webapp.server.socket.getsockname()[1]
 
-        f = urllib.urlopen("http://localhost:%s/status.json" % port)
+        req = urllib.request.Request("http://localhost:%s/status.json" % port)
+        f = urllib.request.urlopen(req)
         data = f.read()
 
         self.worker.hold_jobs_in_build = False
diff --git a/tests/test_webapp.py b/tests/test_webapp.py
index b127c51..94f097a 100644
--- a/tests/test_webapp.py
+++ b/tests/test_webapp.py
@@ -16,7 +16,8 @@
 # under the License.
 
 import json
-import urllib2
+
+from six.moves import urllib
 
 from tests.base import ZuulTestCase
 
@@ -44,41 +45,41 @@
     def test_webapp_status(self):
         "Test that we can filter to only certain changes in the webapp."
 
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertIn('pipelines', data)
 
     def test_webapp_status_compat(self):
         # testing compat with status.json
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status.json" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertIn('pipelines', data)
 
     def test_webapp_bad_url(self):
         # do we 404 correctly
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/foo" % self.port)
-        self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req)
+        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
 
     def test_webapp_find_change(self):
         # can we filter by change id
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/change/1,1" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertEqual(1, len(data), data)
         self.assertEqual("org/project", data[0]['project'])
 
-        req = urllib2.Request(
+        req = urllib.request.Request(
             "http://localhost:%s/status/change/2,1" % self.port)
-        f = urllib2.urlopen(req)
+        f = urllib.request.urlopen(req)
         data = json.loads(f.read())
 
         self.assertEqual(1, len(data), data)
diff --git a/tools/zuul-changes.py b/tools/zuul-changes.py
index 9dbf504..8b854c7 100755
--- a/tools/zuul-changes.py
+++ b/tools/zuul-changes.py
@@ -35,7 +35,7 @@
                 if not change['live']:
                     continue
                 cid, cps = change['id'].split(',')
-                print (
+                print(
                     "zuul enqueue --trigger gerrit --pipeline %s "
                     "--project %s --change %s,%s" % (
                         options.pipeline_name,
diff --git a/tox.ini b/tox.ini
index 79ea939..a8767c2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
          STATSD_PORT=8125
          VIRTUAL_ENV={envdir}
          OS_TEST_TIMEOUT=30
+         OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
 passenv = ZUUL_TEST_ROOT
 usedevelop = True
 install_command = pip install {opts} {packages}
@@ -18,6 +19,8 @@
   python setup.py testr --slowest --testr-args='{posargs}'
 
 [testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
 commands = flake8 {posargs}
 
 [testenv:cover]
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
index 6209473..2072bc9 100644
--- a/zuul/ansible/library/zuul_log.py
+++ b/zuul/ansible/library/zuul_log.py
@@ -34,20 +34,23 @@
 
 
 def log(msg):
+    if not isinstance(msg, list):
+        msg = [msg]
     with Console() as console:
-        console.addLine("[Zuul] %s\n" % msg)
+        for line in msg:
+            console.addLine("[Zuul] %s\n" % line)
 
 
 def main():
     module = AnsibleModule(
         argument_spec=dict(
-            msg=dict(required=True, default=''),
+            msg=dict(required=True, type='raw'),
         )
     )
 
     p = module.params
-    ret = log(p['msg'])
-    module.exit_json(changed=True, rc=ret)
+    log(p['msg'])
+    module.exit_json(changed=True)
 
 from ansible.module_utils.basic import *  # noqa
 
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index bc38376..5a38807 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -19,6 +19,7 @@
 import getpass
 import os
 import subprocess
+import threading
 
 
 class Console(object):
@@ -45,11 +46,35 @@
         if os.path.exists(fn):
             with open(fn) as f:
                 for line in f:
+                    if not line:
+                        continue
+                    if line[0] == '#':
+                        continue
+                    if '=' not in line:
+                        continue
                     k, v = line.strip().split('=')
+                    for q in ["'", '"']:
+                        if v[0] == q:
+                            v = v.strip(q)
                     env[k] = v
     return env
 
 
+def follow(fd):
+    newline_warning = False
+    with Console() as console:
+        while True:
+            line = fd.readline()
+            if not line:
+                break
+            if not line.endswith('\n'):
+                line += '\n'
+                newline_warning = True
+            console.addLine(line)
+        if newline_warning:
+            console.addLine('[Zuul] No trailing newline\n')
+
+
 def run(cwd, cmd, args):
     env = get_env()
     env.update(args)
@@ -61,14 +86,20 @@
         env=env,
     )
 
-    with Console() as console:
-        while True:
-            line = proc.stdout.readline()
-            if not line:
-                break
-            console.addLine(line)
+    t = threading.Thread(target=follow, args=(proc.stdout,))
+    t.daemon = True
+    t.start()
 
-        ret = proc.wait()
+    ret = proc.wait()
+    # Give the thread that is writing the console log up to 10 seconds
+    # to catch up and exit.  If it hasn't done so by then, it is very
+    # likely stuck in readline() because it spawed a child that is
+    # holding stdout or stderr open.
+    t.join(10)
+    with Console() as console:
+        if t.isAlive():
+            console.addLine("[Zuul] standard output/error still open "
+                            "after child exited")
         console.addLine("[Zuul] Task exit code: %s\n" % ret)
     return ret
 
diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py
index 245e988..030ecc8 100644
--- a/zuul/ansible/plugins/callback_plugins/timeout.py
+++ b/zuul/ansible/plugins/callback_plugins/timeout.py
@@ -49,7 +49,7 @@
             facts = dict(elapsed_time=self._elapsed_time)
 
             overall_timeout = manager.extra_vars.get('timeout')
-            if overall_timeout is not None:
+            if str(overall_timeout) != 'None':
                 timeout = int(overall_timeout) - int(self._elapsed_time)
                 facts['timeout'] = timeout
 
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..5ffd431 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -14,8 +14,8 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import six
 from six.moves import configparser as ConfigParser
-import cStringIO
 import extras
 import logging
 import logging.config
@@ -47,7 +47,7 @@
             yappi.start()
         else:
             yappi.stop()
-            yappi_out = cStringIO.StringIO()
+            yappi_out = six.BytesIO()
             yappi.get_func_stats().print_all(out=yappi_out)
             yappi.get_thread_stats().print_all(out=yappi_out)
             log.debug(yappi_out.getvalue())
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 59ac419..1ce2828 100644
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -154,7 +154,7 @@
         running_items = client.get_running_jobs()
 
         if len(running_items) == 0:
-            print "No jobs currently running"
+            print("No jobs currently running")
             return True
 
         all_fields = self._show_running_jobs_columns()
@@ -181,7 +181,7 @@
                         v += all_fields[f]['append']
                     values.append(v)
                 table.add_row(values)
-        print table
+        print(table)
         return True
 
     def _epoch_to_relative_time(self, epoch):
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
index c9516f8..49643ae 100644
--- a/zuul/cmd/launcher.py
+++ b/zuul/cmd/launcher.py
@@ -24,12 +24,14 @@
 
 import logging
 import os
+import socket
 import sys
 import signal
 
 import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
 
-# No zuul imports here because they pull in paramiko which must not be
+# No zuul imports that pull in paramiko here; it must not be
 # imported until after the daemonization.
 # https://github.com/paramiko/paramiko/issues/59
 # Similar situation with gear and statsd.
@@ -49,27 +51,29 @@
         parser.add_argument('--keep-jobdir', dest='keep_jobdir',
                             action='store_true',
                             help='keep local jobdirs after run completes')
+        parser.add_argument('command',
+                            choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
+                            nargs='?')
+
         self.args = parser.parse_args()
 
-    def reconfigure_handler(self, signum, frame):
-        signal.signal(signal.SIGHUP, signal.SIG_IGN)
-        self.log.debug("Reconfiguration triggered")
-        self.read_config()
-        self.setup_logging('launcher', 'log_config')
-        try:
-            self.launcher.reconfigure(self.config)
-        except Exception:
-            self.log.exception("Reconfiguration failed:")
-        signal.signal(signal.SIGHUP, self.reconfigure_handler)
+    def send_command(self, cmd):
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(path)
+        s.sendall('%s\n' % cmd)
 
-    def exit_handler(self, signum, frame):
-        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+    def exit_handler(self):
         self.launcher.stop()
         self.launcher.join()
 
-    def main(self):
+    def main(self, daemon=True):
         # See comment at top of file about zuul imports
-        import zuul.launcher.ansiblelaunchserver
 
         self.setup_logging('launcher', 'log_config')
 
@@ -80,23 +84,28 @@
                                      keep_jobdir=self.args.keep_jobdir)
         self.launcher.start()
 
-        signal.signal(signal.SIGHUP, self.reconfigure_handler)
-        signal.signal(signal.SIGUSR1, self.exit_handler)
         signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
-        while True:
-            try:
-                signal.pause()
-            except KeyboardInterrupt:
-                print "Ctrl + C: asking launcher to exit nicely...\n"
-                self.exit_handler(signal.SIGINT, None)
-                sys.exit(0)
+        if daemon:
+            self.launcher.join()
+        else:
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking launcher to exit nicely...\n")
+                    self.exit_handler()
+                    sys.exit(0)
 
 
 def main():
     server = Launcher()
     server.parse_arguments()
-
     server.read_config()
+
+    if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+        server.send_command(server.args.command)
+        sys.exit(0)
+
     server.configure_connections()
 
     if server.config.has_option('launcher', 'pidfile'):
@@ -106,10 +115,10 @@
     pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
 
     if server.args.nodaemon:
-        server.main()
+        server.main(False)
     else:
         with daemon.DaemonContext(pidfile=pid):
-            server.main()
+            server.main(True)
 
 
 if __name__ == "__main__":
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index df215fd..797a990 100644
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -68,7 +68,7 @@
             try:
                 signal.pause()
             except KeyboardInterrupt:
-                print "Ctrl + C: asking merger to exit nicely...\n"
+                print("Ctrl + C: asking merger to exit nicely...\n")
                 self.exit_handler(signal.SIGINT, None)
 
 
@@ -89,9 +89,7 @@
         f.close()
         os.unlink(test_fn)
     except Exception:
-        print
-        print "Unable to write to state directory: %s" % state_dir
-        print
+        print("\nUnable to write to state directory: %s\n" % state_dir)
         raise
 
     if server.config.has_option('merger', 'pidfile'):
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index 6db15a2..1fb4a32 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -107,7 +107,7 @@
                 jobs.add(v)
         for job in sorted(layout.jobs):
             if job not in jobs:
-                print "Job %s not defined" % job
+                print("Job %s not defined" % job)
                 failure = True
         return failure
 
@@ -117,18 +117,18 @@
         if child_pid == 0:
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
-            import gear
+            import zuul.lib.gearserver
             statsd_host = os.environ.get('STATSD_HOST')
             statsd_port = int(os.environ.get('STATSD_PORT', 8125))
             if self.config.has_option('gearman_server', 'listen_address'):
                 host = self.config.get('gearman_server', 'listen_address')
             else:
                 host = None
-            gear.Server(4730,
-                        host=host,
-                        statsd_host=statsd_host,
-                        statsd_port=statsd_port,
-                        statsd_prefix='zuul.geard')
+            zuul.lib.gearserver.GearServer(4730,
+                                           host=host,
+                                           statsd_host=statsd_host,
+                                           statsd_port=statsd_port,
+                                           statsd_prefix='zuul.geard')
 
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
@@ -196,7 +196,7 @@
             try:
                 signal.pause()
             except KeyboardInterrupt:
-                print "Ctrl + C: asking scheduler to exit nicely...\n"
+                print("Ctrl + C: asking scheduler to exit nicely...\n")
                 self.exit_handler(signal.SIGINT, None)
 
 
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index a1854f4..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -18,11 +18,11 @@
 import json
 import time
 from six.moves import queue as Queue
+from six.moves import urllib
 import paramiko
 import logging
 import pprint
 import voluptuous as v
-import urllib2
 
 from zuul.connection import BaseConnection
 from zuul.model import TriggerEvent
@@ -32,7 +32,7 @@
     """Move events from Gerrit to the scheduler."""
 
     log = logging.getLogger("zuul.GerritEventConnector")
-    delay = 5.0
+    delay = 10.0
 
     def __init__(self, connection):
         super(GerritEventConnector, self).__init__()
@@ -388,10 +388,10 @@
         url = "%s/p/%s/info/refs?service=git-upload-pack" % (
             self.baseurl, project)
         try:
-            data = urllib2.urlopen(url).read()
+            data = urllib.request.urlopen(url).read()
         except:
             self.log.error("Cannot get references from %s" % url)
-            raise  # keeps urllib2 error informations
+            raise  # keeps urllib error informations
         ret = {}
         read_headers = False
         read_advertisement = False
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 5adde3f..aef54d3 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -14,7 +14,6 @@
 
 import json
 import logging
-import multiprocessing
 import os
 import re
 import shutil
@@ -23,7 +22,9 @@
 import subprocess
 import tempfile
 import threading
+import time
 import traceback
+import Queue
 import uuid
 
 import gear
@@ -34,14 +35,62 @@
 
 import zuul.ansible.library
 import zuul.ansible.plugins.callback_plugins
+from zuul.lib import commandsocket
+
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful']
+
+
+def boolify(x):
+    if isinstance(x, str):
+        return bool(int(x))
+    return bool(x)
+
+
+class GearWorker(gear.Worker):
+    MASS_DO = 101
+
+    def sendMassDo(self, functions):
+        data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
+        self.broadcast_lock.acquire()
+        try:
+            p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.function(*self.args)
+
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
 
 
 class JobDir(object):
     def __init__(self, keep=False):
         self.keep = keep
         self.root = tempfile.mkdtemp()
-        self.git_root = os.path.join(self.root, 'git')
-        os.makedirs(self.git_root)
         self.ansible_root = os.path.join(self.root, 'ansible')
         os.makedirs(self.ansible_root)
         self.plugins_root = os.path.join(self.ansible_root, 'plugins')
@@ -63,40 +112,88 @@
 
 class LaunchServer(object):
     log = logging.getLogger("zuul.LaunchServer")
-    section_re = re.compile('site "(.*?)"')
+    site_section_re = re.compile('site "(.*?)"')
+    node_section_re = re.compile('node "(.*?)"')
 
     def __init__(self, config, keep_jobdir=False):
         self.config = config
         self.keep_jobdir = keep_jobdir
         self.hostname = socket.gethostname()
+        self.registered_functions = set()
         self.node_workers = {}
-        self.mpmanager = multiprocessing.Manager()
-        self.jobs = self.mpmanager.dict()
-        self.builds = self.mpmanager.dict()
-        self.zmq_send_queue = multiprocessing.JoinableQueue()
-        self.termination_queue = multiprocessing.JoinableQueue()
+        self.jobs = {}
+        self.builds = {}
+        self.zmq_send_queue = Queue.Queue()
+        self.termination_queue = Queue.Queue()
         self.sites = {}
+        self.static_nodes = {}
+        self.command_map = dict(
+            reconfigure=self.reconfigure,
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            release=self.release,
+            graceful=self.graceful,
+        )
+
+        if config.has_option('launcher', 'accept_nodes'):
+            self.accept_nodes = config.getboolean('launcher',
+                                                  'accept_nodes')
+        else:
+            self.accept_nodes = True
+
+        if self.config.has_option('zuul', 'state_dir'):
+            state_dir = os.path.expanduser(
+                self.config.get('zuul', 'state_dir'))
+        else:
+            state_dir = '/var/lib/zuul'
+        path = os.path.join(state_dir, 'launcher.socket')
+        self.command_socket = commandsocket.CommandSocket(path)
 
         for section in config.sections():
-            m = self.section_re.match(section)
+            m = self.site_section_re.match(section)
             if m:
                 sitename = m.group(1)
                 d = {}
                 d['host'] = config.get(section, 'host')
                 d['user'] = config.get(section, 'user')
-                d['pass'] = config.get(section, 'pass', '')
-                d['root'] = config.get(section, 'root', '/')
+                if config.has_option(section, 'pass'):
+                    d['pass'] = config.get(section, 'pass')
+                else:
+                    d['pass'] = ''
+                if config.has_option(section, 'root'):
+                    d['root'] = config.get(section, 'root')
+                else:
+                    d['root'] = '/'
                 self.sites[sitename] = d
+                continue
+            m = self.node_section_re.match(section)
+            if m:
+                nodename = m.group(1)
+                d = {}
+                d['name'] = nodename
+                d['host'] = config.get(section, 'host')
+                if config.has_option(section, 'description'):
+                    d['description'] = config.get(section, 'description')
+                else:
+                    d['description'] = ''
+                if config.has_option(section, 'labels'):
+                    d['labels'] = config.get(section, 'labels').split(',')
+                else:
+                    d['labels'] = []
+                self.static_nodes[nodename] = d
+                continue
 
     def start(self):
         self._gearman_running = True
         self._zmq_running = True
         self._reaper_running = True
+        self._command_running = True
 
         # Setup ZMQ
         self.zcontext = zmq.Context()
         self.zsocket = self.zcontext.socket(zmq.PUB)
-        self.zsocket.bind("tcp://*:8881")
+        self.zsocket.bind("tcp://*:8888")
 
         # Setup Gearman
         server = self.config.get('gearman', 'server')
@@ -111,6 +208,13 @@
         self.log.debug("Registering")
         self.register()
 
+        # Start command socket
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
         # Load JJB config
         self.loadJobs()
 
@@ -132,6 +236,11 @@
         self.gearman_thread.daemon = True
         self.gearman_thread.start()
 
+        # Start static workers
+        for node in self.static_nodes.values():
+            self.log.debug("Creating static node with arguments: %s" % (node,))
+            self._launchWorker(node)
+
     def loadJobs(self):
         self.log.debug("Loading jobs")
         builder = JJB()
@@ -147,12 +256,20 @@
             del self.jobs[name]
 
     def register(self):
-        self.worker.registerFunction("node-assign:zuul")
-        self.worker.registerFunction("stop:%s" % self.hostname)
+        new_functions = set()
+        if self.accept_nodes:
+            new_functions.add("node_assign:zuul")
+        new_functions.add("stop:%s" % self.hostname)
+        new_functions.add("set_description:%s" % self.hostname)
 
-    def reconfigure(self, config):
+        for function in new_functions - self.registered_functions:
+            self.worker.registerFunction(function)
+        for function in self.registered_functions - new_functions:
+            self.worker.unRegisterFunction(function)
+        self.registered_functions = new_functions
+
+    def reconfigure(self):
         self.log.debug("Reconfiguring")
-        self.config = config
         self.loadJobs()
         for node in self.node_workers.values():
             try:
@@ -163,24 +280,93 @@
                                    "to worker:")
         self.log.debug("Reconfiguration complete")
 
+    def pause(self):
+        self.log.debug("Pausing")
+        self.accept_nodes = False
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='pause'))
+            except Exception:
+                self.log.exception("Exception sending pause command "
+                                   "to worker:")
+        self.log.debug("Paused")
+
+    def unpause(self):
+        self.log.debug("Unpausing")
+        self.accept_nodes = True
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='unpause'))
+            except Exception:
+                self.log.exception("Exception sending unpause command "
+                                   "to worker:")
+        self.log.debug("Unpaused")
+
+    def release(self):
+        self.log.debug("Releasing idle nodes")
+        for node in self.node_workers.values():
+            if node.name in self.static_nodes:
+                continue
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='release'))
+            except Exception:
+                self.log.exception("Exception sending release command "
+                                   "to worker:")
+        self.log.debug("Finished releasing idle nodes")
+
+    def graceful(self):
+        # Note: this is run in the command processing thread; no more
+        # external commands will be processed after this.
+        self.log.debug("Gracefully stopping")
+        self.pause()
+        self.release()
+        self.log.debug("Waiting for all builds to finish")
+        while self.builds:
+            time.sleep(5)
+        self.log.debug("All builds are finished")
+        self.stop()
+
     def stop(self):
         self.log.debug("Stopping")
+        # First, stop accepting new jobs
         self._gearman_running = False
         self._reaper_running = False
         self.worker.shutdown()
+        # Then stop all of the workers
         for node in self.node_workers.values():
             try:
                 if node.isAlive():
                     node.stop()
             except Exception:
                 self.log.exception("Exception sending stop command to worker:")
+        # Stop ZMQ afterwords so that the send queue is flushed
         self._zmq_running = False
         self.zmq_send_queue.put(None)
         self.zmq_send_queue.join()
+        # Stop command processing
+        self._command_running = False
+        self.command_socket.stop()
+        # Join the gearman thread which was stopped earlier.
+        self.gearman_thread.join()
+        # The command thread is joined in the join() method of this
+        # class, which is called by the command shell.
         self.log.debug("Stopped")
 
     def join(self):
-        self.gearman_thread.join()
+        self.command_thread.join()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get()
+                self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
 
     def runZMQ(self):
         while self._zmq_running or not self.zmq_send_queue.empty():
@@ -200,12 +386,16 @@
             try:
                 job = self.worker.getJob()
                 try:
-                    if job.name.startswith('node-assign:'):
-                        self.log.debug("Got node-assign job: %s" % job.unique)
+                    if job.name.startswith('node_assign:'):
+                        self.log.debug("Got node_assign job: %s" % job.unique)
                         self.assignNode(job)
                     elif job.name.startswith('stop:'):
                         self.log.debug("Got stop job: %s" % job.unique)
                         self.stopJob(job)
+                    elif job.name.startswith('set_description:'):
+                        self.log.debug("Got set_description job: %s" %
+                                       job.unique)
+                        job.sendWorkComplete()
                     else:
                         self.log.error("Unable to handle job %s" % job.name)
                         job.sendWorkFail()
@@ -220,6 +410,12 @@
     def assignNode(self, job):
         args = json.loads(job.arguments)
         self.log.debug("Assigned node with arguments: %s" % (args,))
+        self._launchWorker(args)
+        data = dict(manager=self.hostname)
+        job.sendWorkData(json.dumps(data))
+        job.sendWorkComplete()
+
+    def _launchWorker(self, args):
         worker = NodeWorker(self.config, self.jobs, self.builds,
                             self.sites, args['name'], args['host'],
                             args['description'], args['labels'],
@@ -227,12 +423,8 @@
                             self.termination_queue, self.keep_jobdir)
         self.node_workers[worker.name] = worker
 
-        worker.process = multiprocessing.Process(target=worker.run)
-        worker.process.start()
-
-        data = dict(manager=self.hostname)
-        job.sendWorkData(json.dumps(data))
-        job.sendWorkComplete()
+        worker.thread = threading.Thread(target=worker.run)
+        worker.thread.start()
 
     def stopJob(self, job):
         try:
@@ -269,7 +461,7 @@
                     continue
                 worker = self.node_workers[item]
                 self.log.debug("Joining %s" % (item,))
-                worker.process.join()
+                worker.thread.join()
                 self.log.debug("Joined %s" % (item,))
                 del self.node_workers[item]
             except Exception:
@@ -295,18 +487,27 @@
         if not isinstance(labels, list):
             labels = [labels]
         self.labels = labels
-        self.process = None
+        self.thread = None
         self.registered_functions = set()
+        # If the unpaused Event is set, that means we should run jobs.
+        # If it is clear, then we are paused and should not run jobs.
+        self.unpaused = threading.Event()
+        self.unpaused.set()
         self._running = True
-        self.queue = multiprocessing.JoinableQueue()
+        self.queue = Queue.Queue()
         self.manager_name = manager_name
         self.zmq_send_queue = zmq_send_queue
         self.termination_queue = termination_queue
         self.keep_jobdir = keep_jobdir
         self.running_job_lock = threading.Lock()
+        self._get_job_lock = threading.Lock()
+        self._got_job = False
         self._job_complete_event = threading.Event()
         self._running_job = False
+        self._aborted_job = False
         self._sent_complete_event = False
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         self.workspace_root = config.get('launcher', 'workspace_root')
         if self.config.has_option('launcher', 'private_key_file'):
             self.private_key_file = config.get('launcher', 'private_key_file')
@@ -319,19 +520,18 @@
 
     def isAlive(self):
         # Meant to be called from the manager
-        if self.process and self.process.is_alive():
+        if self.thread and self.thread.is_alive():
             return True
         return False
 
     def run(self):
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
         self.log.debug("Node worker %s starting" % (self.name,))
         server = self.config.get('gearman', 'server')
         if self.config.has_option('gearman', 'port'):
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker(self.name)
+        self.worker = GearWorker(self.name)
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
@@ -356,9 +556,31 @@
         # will be set by the queue thread.
         self.log.debug("Submitting stop request")
         self._running = False
+        self.unpaused.set()
         self.queue.put(dict(action='stop'))
         self.queue.join()
 
+    def pause(self):
+        self.unpaused.clear()
+        self.worker.stopWaitingForJobs()
+
+    def unpause(self):
+        self.unpaused.set()
+
+    def release(self):
+        # If this node is idle, stop it.
+        old_unpaused = self.unpaused.is_set()
+        if old_unpaused:
+            self.pause()
+        with self._get_job_lock:
+            if self._got_job:
+                self.log.debug("This worker is not idle")
+                if old_unpaused:
+                    self.unpause()
+                return
+        self.log.debug("Stopping due to release command")
+        self.queue.put(dict(action='stop'))
+
     def _runQueue(self):
         item = self.queue.get()
         try:
@@ -371,6 +593,15 @@
                 else:
                     self._job_complete_event.wait()
                 self.worker.shutdown()
+            if item['action'] == 'pause':
+                self.log.debug("Received pause request")
+                self.pause()
+            if item['action'] == 'unpause':
+                self.log.debug("Received unpause request")
+                self.unpause()
+            if item['action'] == 'release':
+                self.log.debug("Received release request")
+                self.release()
             elif item['action'] == 'reconfigure':
                 self.log.debug("Received reconfigure request")
                 self.register()
@@ -383,15 +614,21 @@
     def runGearman(self):
         while self._running:
             try:
-                self._runGearman()
+                self.unpaused.wait()
+                if self._running:
+                    self._runGearman()
             except Exception:
                 self.log.exception("Exception in gearman manager:")
+            with self._get_job_lock:
+                self._got_job = False
 
     def _runGearman(self):
-        try:
-            job = self.worker.getJob()
-        except gear.InterruptedError:
-            return
+        with self._get_job_lock:
+            try:
+                job = self.worker.getJob()
+                self._got_job = True
+            except gear.InterruptedError:
+                return
         self.log.debug("Node worker %s got job %s" % (self.name, job.name))
         try:
             if job.name not in self.registered_functions:
@@ -424,19 +661,19 @@
         new_functions = set()
         for job in self.jobs.values():
             new_functions |= self.generateFunctionNames(job)
-        for function in new_functions - self.registered_functions:
-            self.worker.registerFunction(function)
-        for function in self.registered_functions - new_functions:
-            self.worker.unRegisterFunction(function)
+        self.worker.sendMassDo(new_functions)
         self.registered_functions = new_functions
 
     def abortRunningJob(self):
+        self._aborted_job = True
+        return self.abortRunningProc(self.ansible_job_proc)
+
+    def abortRunningProc(self, proc):
         aborted = False
         self.log.debug("Abort: acquiring job lock")
         with self.running_job_lock:
             if self._running_job:
                 self.log.debug("Abort: a job is running")
-                proc = self.ansible_proc
                 if proc:
                     self.log.debug("Abort: sending kill signal to job "
                                    "process group")
@@ -458,15 +695,14 @@
 
         # Make sure we can parse what we need from the job first
         args = json.loads(job.arguments)
-        # This may be configurable later, or we may choose to honor
-        # OFFLINE_NODE_WHEN_COMPLETE
-        offline = True
+        offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
         job_name = job.name.split(':')[1]
 
         # Initialize the result so we have something regardless of
         # whether the job actually runs
         result = None
         self._sent_complete_event = False
+        self._aborted_job = False
 
         try:
             self.sendStartEvent(job_name, args)
@@ -534,7 +770,8 @@
                                'SUCCESS', {})
 
     def runJob(self, job, args):
-        self.ansible_proc = None
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         result = None
         with self.running_job_lock:
             if not self._running:
@@ -558,13 +795,24 @@
             job.sendWorkStatus(0, 100)
 
             job_status = self.runAnsiblePlaybook(jobdir, timeout)
+            if job_status == 3:
+                # AnsibleHostUnreachable: We had a network issue connecting to
+                # our zuul-worker. Rather then contiune, have zuul requeue the
+                # job.
+                return result
+
             post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
-            if job_status and post_status:
+            if not post_status:
+                status = 'POST_FAILURE'
+            elif job_status:
                 status = 'SUCCESS'
             else:
                 status = 'FAILURE'
 
-            result = json.dumps(dict(result=status))
+            if not self._aborted_job:
+                # A Null result will cause zuul to relaunch the job if
+                # it needs to.
+                result = json.dumps(dict(result=status))
 
         return result
 
@@ -638,22 +886,27 @@
                 raise Exception("Target path %s is not below site root" %
                                 (dest,))
 
-            local_args = [
-                'command', '/usr/bin/rsync', '--delay-updates', '-F',
+            rsync_cmd = [
+                '/usr/bin/rsync', '--delay-updates', '-F',
                 '--compress', '-rt', '--safe-links',
                 '--rsync-path="mkdir -p {dest} && rsync"',
                 '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
                 '-o StrictHostKeyChecking=no -q"',
                 '--out-format="<<CHANGED>>%i %n%L"',
-                '"{source}/"', '"{user}@{host}:{dest}"'
+                '{source}', '"{user}@{host}:{dest}"'
             ]
-            local_action = ' '.join(local_args).format(
-                source=scproot,
+            if scpfile.get('keep-hierarchy'):
+                source = '"%s/"' % scproot
+            else:
+                source = '`/usr/bin/find "%s" -type f`' % scproot
+            shellargs = ' '.join(rsync_cmd).format(
+                source=source,
                 dest=dest,
                 private_key_file=self.private_key_file,
                 host=site['host'],
                 user=site['user'])
-            task = dict(local_action=local_action)
+            task = dict(shell=shellargs,
+                        delegate_to='127.0.0.1')
             if not scpfile.get('copy-after-failure'):
                 task['when'] = 'success'
             tasks.append(task)
@@ -679,14 +932,16 @@
                                            parameters)
         syncargs = dict(src=src,
                         dest=ftpcontent,
-                        copy_links='yes')
+                        copy_links='yes',
+                        mode='pull')
         if rsync_opts:
             syncargs['rsync_opts'] = rsync_opts
         task = dict(synchronize=syncargs,
                     when='success')
         tasks.append(task)
         task = dict(shell='lftp -f %s' % ftpscript,
-                    when='success')
+                    when='success',
+                    delegate_to='127.0.0.1')
         ftpsource = ftpcontent
         if ftp.get('remove-prefix'):
             ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
@@ -721,7 +976,7 @@
         remote_path = os.path.join('/tmp', script_fn)
         copy = dict(src=script_path,
                     dest=remote_path,
-                    mode=0555)
+                    mode=0o555)
         task = dict(copy=copy)
         tasks.append(task)
 
@@ -729,11 +984,8 @@
                       cwd=parameters['WORKSPACE'],
                       parameters=parameters)
         task = dict(zuul_runner=runner)
-        if timeout:
-            task['when'] = '{{ timeout | int > 0 }}'
-            task['async'] = '{{ timeout }}'
-        else:
-            task['async'] = 2 * 60 * 60  # 2 hour default timeout
+        task['when'] = '{{ timeout | int > 0 }}'
+        task['async'] = '{{ timeout }}'
         task['poll'] = 5
         tasks.append(task)
 
@@ -759,13 +1011,19 @@
                 inventory.write('\n')
 
         timeout = None
+        timeout_var = None
         for wrapper in jjb_job.get('wrappers', []):
             if isinstance(wrapper, dict):
-                timeout = wrapper.get('build-timeout', {})
-                if isinstance(timeout, dict):
-                    timeout = timeout.get('timeout')
+                build_timeout = wrapper.get('build-timeout', {})
+                if isinstance(build_timeout, dict):
+                    timeout_var = build_timeout.get('timeout-var', None)
+                    timeout = build_timeout.get('timeout')
                     if timeout:
                         timeout = timeout * 60
+        if not timeout:
+            timeout = ANSIBLE_DEFAULT_TIMEOUT
+        if timeout_var:
+            parameters[timeout_var] = timeout
 
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
@@ -784,11 +1042,12 @@
                                   state='directory'))
             main_block.append(task)
 
-            # TODO: remove once zuul-worker DIB element has landed
-            main_block.append(dict(shell="[ -f /usr/bin/yum ] && "
-                                   "sudo /usr/bin/yum install "
-                                   "libselinux-python || "
-                                   "/bin/true"))
+            msg = [
+                "Launched by %s" % self.manager_name,
+                "Building remotely on %s in workspace %s" % (
+                    self.name, parameters['WORKSPACE'])]
+            task = dict(zuul_log=dict(msg=msg))
+            main_block.append(task)
 
             for builder in jjb_job.get('builders', []):
                 if 'shell' in builder:
@@ -800,6 +1059,7 @@
 
             task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
             error_block.append(task)
+            error_block.append(dict(fail=dict(msg='FAILURE')))
 
             play = dict(hosts='node', name='Job body',
                         tasks=tasks)
@@ -823,6 +1083,7 @@
             config.write('hostfile = %s\n' % jobdir.inventory)
             config.write('host_key_checking = False\n')
             config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
 
             callback_path = zuul.ansible.plugins.callback_plugins.__file__
             callback_path = os.path.abspath(callback_path)
@@ -836,39 +1097,61 @@
 
         return timeout
 
+    def _ansibleTimeout(self, proc, msg):
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+
     def runAnsiblePlaybook(self, jobdir, timeout):
-        self.ansible_proc = subprocess.Popen(
+        self.ansible_job_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.playbook,
              '-e', 'timeout=%s' % timeout, '-v'],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
         )
-        (out, err) = self.ansible_proc.communicate()
-        for line in out.split('\n'):
-            self.log.debug("Ansible stdout:\n%s" % line)
-        for line in err.split('\n'):
-            self.log.debug("Ansible stderr:\n%s" % line)
-        ret = self.ansible_proc.wait()
-        self.ansible_proc = None
+        ret = None
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (self.ansible_job_proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_job_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = self.ansible_job_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
+        self.ansible_job_proc = None
         return ret == 0
 
     def runAnsiblePostPlaybook(self, jobdir, success):
-        proc = subprocess.Popen(
+        self.ansible_post_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.post_playbook,
              '-e', 'success=%s' % success, '-v'],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
         )
-        (out, err) = proc.communicate()
-        for line in out.split('\n'):
-            self.log.debug("Ansible post stdout:\n%s" % line)
-        for line in err.split('\n'):
-            self.log.debug("Ansible post stderr:\n%s" % line)
-        return proc.wait() == 0
+        ret = None
+        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+                            self._ansibleTimeout,
+                            (self.ansible_post_proc,
+                             "Ansible post timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_post_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible post output: %s" % (line,))
+            ret = self.ansible_post_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible post exit code: %s" % (ret,))
+        self.ansible_post_proc = None
+        return ret == 0
 
 
 class JJB(jenkins_jobs.builder.Builder):
@@ -888,7 +1171,7 @@
             name = component
             component_data = {}
 
-        new_component = self.parser.data[component_type].get(name)
+        new_component = self.parser.data.get(component_type, {}).get(name)
         if new_component:
             for new_sub_component in new_component[component_list_type]:
                 new_components.extend(
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index f3b867c..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
 import json
 import logging
 import os
+import six
 import time
 import threading
 from uuid import uuid4
@@ -164,6 +165,11 @@
             port = config.get('gearman', 'port')
         else:
             port = 4730
+        if config.has_option('gearman', 'check_job_registration'):
+            self.job_registration = config.getboolean(
+                'gearman', 'check_job_registration')
+        else:
+            self.job_registration = True
 
         self.gearman = ZuulGearmanClient(self)
         self.gearman.addServer(server, port)
@@ -231,7 +237,7 @@
                 s_config = {}
                 s_config.update((k, v.format(item=item, job=job,
                                              change=item.change))
-                                if isinstance(v, basestring)
+                                if isinstance(v, six.string_types)
                                 else (k, v)
                                 for k, v in s.items())
 
@@ -351,7 +357,8 @@
         build.__gearman_job = gearman_job
         self.builds[uuid] = build
 
-        if not self.isJobRegistered(gearman_job.name):
+        if self.job_registration and not self.isJobRegistered(
+                gearman_job.name):
             self.log.error("Job %s is not registered with Gearman" %
                            gearman_job)
             self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@@ -502,7 +509,7 @@
             # us where the job is running.
             return False
 
-        if not self.isJobRegistered(name):
+        if self.job_registration and not self.isJobRegistered(name):
             return False
 
         desc_uuid = str(uuid4().hex)
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
 import os
 import re
 
+import six
+
+
 OrderedDict = extras.try_imports(['collections.OrderedDict',
                                   'ordereddict.OrderedDict'])
 
@@ -59,17 +62,17 @@
             raise Exception("Expansion error. Check error messages above")
 
         self.log.info("Mapping projects to workspace...")
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             dest = os.path.normpath(os.path.join(workspace, dest[0]))
             ret[project] = dest
             self.log.info("  %s -> %s", project, dest)
 
         self.log.debug("Checking overlap in destination directories...")
         check = defaultdict(list)
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             check[dest].append(project)
 
-        dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+        dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
         if dupes:
             raise Exception("Some projects share the same destination: %s",
                             dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index f0235a6..3155df6 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,6 +19,8 @@
 import re
 import yaml
 
+import six
+
 from git import GitCommandError
 from zuul.lib.clonemapper import CloneMapper
 from zuul.merger.merger import Repo
@@ -62,7 +64,7 @@
         dests = mapper.expand(workspace=self.workspace)
 
         self.log.info("Preparing %s repositories", len(dests))
-        for project, dest in dests.iteritems():
+        for project, dest in six.iteritems(dests):
             self.prepareRepo(project, dest)
         self.log.info("Prepared all repositories")
 
diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py
new file mode 100644
index 0000000..1b7fed9
--- /dev/null
+++ b/zuul/lib/commandsocket.py
@@ -0,0 +1,83 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2016 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import socket
+import threading
+import Queue
+
+
+class CommandSocket(object):
+    log = logging.getLogger("zuul.CommandSocket")
+
+    def __init__(self, path):
+        self.running = False
+        self.path = path
+        self.queue = Queue.Queue()
+
+    def start(self):
+        self.running = True
+        if os.path.exists(self.path):
+            os.unlink(self.path)
+        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.socket.bind(self.path)
+        self.socket.listen(1)
+        self.socket_thread = threading.Thread(target=self._socketListener)
+        self.socket_thread.daemon = True
+        self.socket_thread.start()
+
+    def stop(self):
+        # First, wake up our listener thread with a connection and
+        # tell it to stop running.
+        self.running = False
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(self.path)
+        s.sendall('_stop\n')
+        # The command '_stop' will be ignored by our listener, so
+        # directly inject it into the queue so that consumers of this
+        # class which are waiting in .get() are awakened.  They can
+        # either handle '_stop' or just ignore the unknown command and
+        # then check to see if they should continue to run before
+        # re-entering their loop.
+        self.queue.put('_stop')
+        self.socket_thread.join()
+
+    def _socketListener(self):
+        while self.running:
+            try:
+                s, addr = self.socket.accept()
+                self.log.debug("Accepted socket connection %s" % (s,))
+                buf = ''
+                while True:
+                    buf += s.recv(1)
+                    if buf[-1] == '\n':
+                        break
+                buf = buf.strip()
+                self.log.debug("Received %s from socket" % (buf,))
+                s.close()
+                # Because we use '_stop' internally to wake up a
+                # waiting thread, don't allow it to actually be
+                # injected externally.
+                if buf != '_stop':
+                    self.queue.put(buf)
+            except Exception:
+                self.log.exception("Exception in socket handler")
+
+    def get(self):
+        if not self.running:
+            raise Exception("CommandSocket.get called while stopped")
+        return self.queue.get()
diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py
new file mode 100644
index 0000000..9cddca3
--- /dev/null
+++ b/zuul/lib/gearserver.py
@@ -0,0 +1,35 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+
+MASS_DO = 101
+
+
+class GearServer(gear.Server):
+    def handlePacket(self, packet):
+        if packet.ptype == MASS_DO:
+            self.log.info("Received packet from %s: %s" % (packet.connection,
+                                                           packet))
+            self.handleMassDo(packet)
+        else:
+            return super(GearServer, self).handlePacket(packet)
+
+    def handleMassDo(self, packet):
+        packet.connection.functions = set()
+        for name in packet.data.split(b'\x00'):
+            self.log.debug("Adding function %s to %s" % (
+                name, packet.connection))
+            packet.connection.functions.add(name)
+            self.functions.add(name)
diff --git a/zuul/lib/swift.py b/zuul/lib/swift.py
index 3c411d3..b5d3bc7 100644
--- a/zuul/lib/swift.py
+++ b/zuul/lib/swift.py
@@ -19,8 +19,8 @@
 import os
 import random
 import six
+from six.moves import urllib
 import string
-import urlparse
 
 
 class Swift(object):
@@ -156,7 +156,7 @@
         url = os.path.join(self.storage_url, settings['container'],
                            settings['file_path_prefix'],
                            destination_prefix)
-        u = urlparse.urlparse(url)
+        u = urllib.parse.urlparse(url)
 
         hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
                                             settings['max_file_size'],
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..3bc29e6 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -210,7 +210,7 @@
         fd.write('#!/bin/bash\n')
         fd.write('ssh -i %s $@\n' % key)
         fd.close()
-        os.chmod(name, 0755)
+        os.chmod(name, 0o755)
 
     def addProject(self, project, url):
         repo = None
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..d56993c 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,7 +19,7 @@
 
 import gear
 
-import merger
+from zuul.merger import merger
 
 
 class MergeServer(object):
diff --git a/zuul/model.py b/zuul/model.py
index 3fb0577..542d0b6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -110,7 +110,11 @@
         return job_tree
 
     def getProjects(self):
-        return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+        # cmp is not in python3, applied idiom from
+        # http://python-future.org/compatible_idioms.html#cmp
+        return sorted(
+            self.job_trees.keys(),
+            key=lambda p: p.name)
 
     def addQueue(self, queue):
         self.queues.append(queue)
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 5329c41..716dcfb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -21,7 +21,7 @@
 import gear
 import six
 
-import model
+from zuul import model
 
 
 class RPCListener(object):
@@ -40,11 +40,11 @@
             port = 4730
         self.worker = gear.Worker('Zuul RPC Listener')
         self.worker.addServer(server, port)
+        self.worker.waitForServer()
+        self.register()
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
-        self.worker.waitForServer()
-        self.register()
 
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 30a6c81..f08612d 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -20,6 +20,7 @@
 import logging
 import os
 import pickle
+import six
 from six.moves import queue as Queue
 import re
 import sys
@@ -27,10 +28,10 @@
 import time
 import yaml
 
-import layoutvalidator
-import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
+from zuul import layoutvalidator
+from zuul import model
+from zuul.model import Pipeline, Project, ChangeQueue
+from zuul.model import ChangeishFilter, NullChange
 from zuul import change_matcher, exceptions
 from zuul import version as zuul_version
 
@@ -125,12 +126,10 @@
     """An event that should be processed within the main queue run loop"""
     def __init__(self):
         self._wait_event = threading.Event()
-        self._exception = None
-        self._traceback = None
+        self._exc_info = None
 
-    def exception(self, e, tb):
-        self._exception = e
-        self._traceback = tb
+    def exception(self, exc_info):
+        self._exc_info = exc_info
         self._wait_event.set()
 
     def done(self):
@@ -138,8 +137,8 @@
 
     def wait(self, timeout=None):
         self._wait_event.wait(timeout)
-        if self._exception:
-            raise self._exception, None, self._traceback
+        if self._exc_info:
+            six.reraise(*self._exc_info)
         return self._wait_event.is_set()
 
 
@@ -412,7 +411,9 @@
                     base = os.path.dirname(os.path.realpath(config_path))
                     fn = os.path.join(base, fn)
                 fn = os.path.expanduser(fn)
-                execfile(fn, config_env)
+                with open(fn) as _f:
+                    code = compile(_f.read(), fn, 'exec')
+                    six.exec_(code, config_env)
 
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])
@@ -1053,8 +1054,8 @@
             else:
                 self.log.error("Unable to handle event %s" % event)
             event.done()
-        except Exception as e:
-            event.exception(e, sys.exc_info()[2])
+        except Exception:
+            event.exception(sys.exc_info())
         self.management_event_queue.task_done()
 
     def process_result_queue(self):