Merge "Fix referenced before assignment for BuildCompletedEvent"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
 [DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
 test_id_option=--load-list $IDFILE
 test_list_option=--list
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 98e4bb8..be9570c 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -49,6 +49,11 @@
   Port on which the Gearman server is listening.
   ``port=4730``
 
+**check_job_registration**
+  Check to see if job is registered with Gearman or not. When True
+  a build result of NOT_REGISTERED will be return if job is not found.
+  ``check_job_registration=True``
+
 gearman_server
 """"""""""""""
 
@@ -394,11 +399,12 @@
   approval matching all specified requirements.
 
     *username*
-    If present, an approval from this username is required.
+    If present, an approval from this username is required.  It is
+    treated as a regular expression.
 
     *email*
     If present, an approval with this email address is required.  It
-    is treated as a regular expression as above.
+    is treated as a regular expression.
 
     *email-filter* (deprecated)
     A deprecated alternate spelling of *email*.  Only one of *email* or
diff --git a/tests/base.py b/tests/base.py
index 3c28a72..01097c1 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -27,6 +27,7 @@
 import re
 import select
 import shutil
+from six.moves import reload_module
 import socket
 import string
 import subprocess
@@ -861,6 +862,28 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
+            # NOTE(notmorgan): Extract logging overrides for specific libraries
+            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+            # each. This is used to limit the output during test runs from
+            # libraries that zuul depends on such as gear.
+            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+            if log_defaults_from_env:
+                for default in log_defaults_from_env.split(','):
+                    try:
+                        name, level_str = default.split('=', 1)
+                        level = getattr(logging, level_str, logging.DEBUG)
+                        self.useFixture(fixtures.FakeLogger(
+                            name=name,
+                            level=level,
+                            format='%(asctime)s %(name)-32s '
+                                   '%(levelname)-8s %(message)s'))
+                    except ValueError:
+                        # NOTE(notmorgan): Invalid format of the log default,
+                        # skip and don't try and apply a logger for the
+                        # specified module
+                        pass
+
 
 class ZuulTestCase(BaseTestCase):
 
@@ -916,8 +939,8 @@
         os.environ['STATSD_PORT'] = str(self.statsd.port)
         self.statsd.start()
         # the statsd client object is configured in the statsd module import
-        reload(statsd)
-        reload(zuul.scheduler)
+        reload_module(statsd)
+        reload_module(zuul.scheduler)
 
         self.gearman_server = FakeGearmanServer()
 
@@ -1295,8 +1318,11 @@
         start = time.time()
         while True:
             if time.time() - start > 10:
-                print('queue status:', ''.join(self.eventQueuesEmpty()))
-                print(self.areAllBuildsWaiting())
+                self.log.debug("Queue status:")
+                for queue in self.event_queues:
+                    self.log.debug("  %s: %s" % (queue, queue.empty()))
+                self.log.debug("All builds waiting: %s" %
+                               (self.areAllBuildsWaiting(),))
                 raise Exception("Timeout waiting for Zuul to settle")
             # Make sure no new events show up while we're checking
             self.worker.lock.acquire()
diff --git a/tests/fixtures/layout-requirement-username.yaml b/tests/fixtures/layout-requirement-username.yaml
index 7a549f0..f9e6477 100644
--- a/tests/fixtures/layout-requirement-username.yaml
+++ b/tests/fixtures/layout-requirement-username.yaml
@@ -3,7 +3,7 @@
     manager: IndependentPipelineManager
     require:
       approval:
-        - username: jenkins
+        - username: ^(jenkins|zuul)$
     trigger:
       gerrit:
         - event: comment-added
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3de4a94..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import ConfigParser
+from six.moves import configparser as ConfigParser
 import os
 import re
 
diff --git a/tox.ini b/tox.ini
index 79ea939..a8767c2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
          STATSD_PORT=8125
          VIRTUAL_ENV={envdir}
          OS_TEST_TIMEOUT=30
+         OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
 passenv = ZUUL_TEST_ROOT
 usedevelop = True
 install_command = pip install {opts} {packages}
@@ -18,6 +19,8 @@
   python setup.py testr --slowest --testr-args='{posargs}'
 
 [testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
 commands = flake8 {posargs}
 
 [testenv:cover]
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
index 8978275..2072bc9 100644
--- a/zuul/ansible/library/zuul_log.py
+++ b/zuul/ansible/library/zuul_log.py
@@ -34,14 +34,17 @@
 
 
 def log(msg):
+    if not isinstance(msg, list):
+        msg = [msg]
     with Console() as console:
-        console.addLine("[Zuul] %s\n" % msg)
+        for line in msg:
+            console.addLine("[Zuul] %s\n" % line)
 
 
 def main():
     module = AnsibleModule(
         argument_spec=dict(
-            msg=dict(required=True),
+            msg=dict(required=True, type='raw'),
         )
     )
 
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index f490162..5a38807 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -46,7 +46,16 @@
         if os.path.exists(fn):
             with open(fn) as f:
                 for line in f:
+                    if not line:
+                        continue
+                    if line[0] == '#':
+                        continue
+                    if '=' not in line:
+                        continue
                     k, v = line.strip().split('=')
+                    for q in ["'", '"']:
+                        if v[0] == q:
+                            v = v.strip(q)
                     env[k] = v
     return env
 
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 2902c50..5ffd431 100644
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -14,8 +14,8 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import six
 from six.moves import configparser as ConfigParser
-import cStringIO
 import extras
 import logging
 import logging.config
@@ -47,7 +47,7 @@
             yappi.start()
         else:
             yappi.stop()
-            yappi_out = cStringIO.StringIO()
+            yappi_out = six.BytesIO()
             yappi.get_func_stats().print_all(out=yappi_out)
             yappi.get_thread_stats().print_all(out=yappi_out)
             log.debug(yappi_out.getvalue())
diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py
index f400436..1fb4a32 100755
--- a/zuul/cmd/server.py
+++ b/zuul/cmd/server.py
@@ -117,18 +117,18 @@
         if child_pid == 0:
             os.close(pipe_write)
             self.setup_logging('gearman_server', 'log_config')
-            import gear
+            import zuul.lib.gearserver
             statsd_host = os.environ.get('STATSD_HOST')
             statsd_port = int(os.environ.get('STATSD_PORT', 8125))
             if self.config.has_option('gearman_server', 'listen_address'):
                 host = self.config.get('gearman_server', 'listen_address')
             else:
                 host = None
-            gear.Server(4730,
-                        host=host,
-                        statsd_host=statsd_host,
-                        statsd_port=statsd_port,
-                        statsd_prefix='zuul.geard')
+            zuul.lib.gearserver.GearServer(4730,
+                                           host=host,
+                                           statsd_host=statsd_host,
+                                           statsd_port=statsd_port,
+                                           statsd_prefix='zuul.geard')
 
             # Keep running until the parent dies:
             pipe_read = os.fdopen(pipe_read)
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index ae1e319..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -32,7 +32,7 @@
     """Move events from Gerrit to the scheduler."""
 
     log = logging.getLogger("zuul.GerritEventConnector")
-    delay = 5.0
+    delay = 10.0
 
     def __init__(self, connection):
         super(GerritEventConnector, self).__init__()
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 0f827c4..693d5a4 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -42,7 +42,7 @@
 ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
 
 
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release']
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release', 'graceful']
 
 
 def boolify(x):
@@ -51,6 +51,19 @@
     return bool(x)
 
 
+class GearWorker(gear.Worker):
+    MASS_DO = 101
+
+    def sendMassDo(self, functions):
+        data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
+        self.broadcast_lock.acquire()
+        try:
+            p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
+            self.broadcast(p)
+        finally:
+            self.broadcast_lock.release()
+
+
 class Watchdog(object):
     def __init__(self, timeout, function, args):
         self.timeout = timeout
@@ -80,14 +93,16 @@
         self.root = tempfile.mkdtemp()
         self.ansible_root = os.path.join(self.root, 'ansible')
         os.makedirs(self.ansible_root)
-        self.plugins_root = os.path.join(self.ansible_root, 'plugins')
-        os.makedirs(self.plugins_root)
         self.inventory = os.path.join(self.ansible_root, 'inventory')
         self.playbook = os.path.join(self.ansible_root, 'playbook')
         self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
         self.config = os.path.join(self.ansible_root, 'ansible.cfg')
         self.script_root = os.path.join(self.ansible_root, 'scripts')
         os.makedirs(self.script_root)
+        self.logs = os.path.join(self.ansible_root, 'logs')
+        os.makedirs(self.logs)
+        self.staging_root = os.path.join(self.root, 'staging')
+        os.makedirs(self.staging_root)
 
     def __enter__(self):
         return self
@@ -114,8 +129,18 @@
         self.termination_queue = Queue.Queue()
         self.sites = {}
         self.static_nodes = {}
-        if config.has_option('launcher', 'accept-nodes'):
-            self.accept_nodes = config.get('launcher', 'accept-nodes')
+        self.command_map = dict(
+            reconfigure=self.reconfigure,
+            stop=self.stop,
+            pause=self.pause,
+            unpause=self.unpause,
+            release=self.release,
+            graceful=self.graceful,
+        )
+
+        if config.has_option('launcher', 'accept_nodes'):
+            self.accept_nodes = config.getboolean('launcher',
+                                                  'accept_nodes')
         else:
             self.accept_nodes = True
 
@@ -235,8 +260,9 @@
     def register(self):
         new_functions = set()
         if self.accept_nodes:
-            new_functions.add("node-assign:zuul")
+            new_functions.add("node_assign:zuul")
         new_functions.add("stop:%s" % self.hostname)
+        new_functions.add("set_description:%s" % self.hostname)
 
         for function in new_functions - self.registered_functions:
             self.worker.registerFunction(function)
@@ -295,6 +321,18 @@
                                    "to worker:")
         self.log.debug("Finished releasing idle nodes")
 
+    def graceful(self):
+        # Note: this is run in the command processing thread; no more
+        # external commands will be processed after this.
+        self.log.debug("Gracefully stopping")
+        self.pause()
+        self.release()
+        self.log.debug("Waiting for all builds to finish")
+        while self.builds:
+            time.sleep(5)
+        self.log.debug("All builds are finished")
+        self.stop()
+
     def stop(self):
         self.log.debug("Stopping")
         # First, stop accepting new jobs
@@ -328,16 +366,7 @@
         while self._command_running:
             try:
                 command = self.command_socket.get()
-                if command == 'reconfigure':
-                    self.reconfigure()
-                elif command == 'stop':
-                    self.stop()
-                elif command == 'pause':
-                    self.pause()
-                elif command == 'unpause':
-                    self.unpause()
-                elif command == 'release':
-                    self.release()
+                self.command_map[command]()
             except Exception:
                 self.log.exception("Exception while processing command")
 
@@ -359,12 +388,16 @@
             try:
                 job = self.worker.getJob()
                 try:
-                    if job.name.startswith('node-assign:'):
-                        self.log.debug("Got node-assign job: %s" % job.unique)
+                    if job.name.startswith('node_assign:'):
+                        self.log.debug("Got node_assign job: %s" % job.unique)
                         self.assignNode(job)
                     elif job.name.startswith('stop:'):
                         self.log.debug("Got stop job: %s" % job.unique)
                         self.stopJob(job)
+                    elif job.name.startswith('set_description:'):
+                        self.log.debug("Got set_description job: %s" %
+                                       job.unique)
+                        job.sendWorkComplete()
                     else:
                         self.log.error("Unable to handle job %s" % job.name)
                         job.sendWorkFail()
@@ -469,11 +502,16 @@
         self.termination_queue = termination_queue
         self.keep_jobdir = keep_jobdir
         self.running_job_lock = threading.Lock()
+        self.pending_registration = False
+        self.registration_lock = threading.Lock()
         self._get_job_lock = threading.Lock()
         self._got_job = False
         self._job_complete_event = threading.Event()
         self._running_job = False
+        self._aborted_job = False
         self._sent_complete_event = False
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         self.workspace_root = config.get('launcher', 'workspace_root')
         if self.config.has_option('launcher', 'private_key_file'):
             self.private_key_file = config.get('launcher', 'private_key_file')
@@ -497,7 +535,7 @@
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker(self.name)
+        self.worker = GearWorker(self.name)
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
@@ -589,6 +627,8 @@
                 self._got_job = False
 
     def _runGearman(self):
+        if self.pending_registration:
+            self.register()
         with self._get_job_lock:
             try:
                 job = self.worker.getJob()
@@ -622,18 +662,26 @@
         return ret
 
     def register(self):
-        if self._running_job:
+        if not self.registration_lock.acquire(False):
+            self.log.debug("Registration already in progress")
             return
-        new_functions = set()
-        for job in self.jobs.values():
-            new_functions |= self.generateFunctionNames(job)
-        for function in new_functions - self.registered_functions:
-            self.worker.registerFunction(function)
-        for function in self.registered_functions - new_functions:
-            self.worker.unRegisterFunction(function)
-        self.registered_functions = new_functions
+        try:
+            if self._running_job:
+                self.pending_registration = True
+                self.log.debug("Ignoring registration due to running job")
+                return
+            self.log.debug("Updating registration")
+            self.pending_registration = False
+            new_functions = set()
+            for job in self.jobs.values():
+                new_functions |= self.generateFunctionNames(job)
+            self.worker.sendMassDo(new_functions)
+            self.registered_functions = new_functions
+        finally:
+            self.registration_lock.release()
 
     def abortRunningJob(self):
+        self._aborted_job = True
         return self.abortRunningProc(self.ansible_job_proc)
 
     def abortRunningProc(self, proc):
@@ -670,6 +718,7 @@
         # whether the job actually runs
         result = None
         self._sent_complete_event = False
+        self._aborted_job = False
 
         try:
             self.sendStartEvent(job_name, args)
@@ -762,13 +811,23 @@
             job.sendWorkStatus(0, 100)
 
             job_status = self.runAnsiblePlaybook(jobdir, timeout)
+            if job_status is None:
+                # The result of the job is indeterminate.  Zuul will
+                # run it again.
+                return result
+
             post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
-            if job_status and post_status:
+            if not post_status:
+                status = 'POST_FAILURE'
+            elif job_status:
                 status = 'SUCCESS'
             else:
                 status = 'FAILURE'
 
-            result = json.dumps(dict(result=status))
+            if not self._aborted_job:
+                # A Null result will cause zuul to relaunch the job if
+                # it needs to.
+                result = json.dumps(dict(result=status))
 
         return result
 
@@ -807,11 +866,20 @@
     def _makeSCPTask(self, jobdir, publisher, parameters):
         tasks = []
         for scpfile in publisher['scp']['files']:
+            scproot = tempfile.mkdtemp(dir=jobdir.staging_root)
+            os.chmod(scproot, 0o755)
+
             site = publisher['scp']['site']
-            if site not in self.sites:
-                raise Exception("Undefined SCP site: %s" % (site,))
-            site = self.sites[site]
             if scpfile.get('copy-console'):
+                # Include the local ansible directory in the console
+                # upload.  This uploads the playbook and ansible logs.
+                copyargs = dict(src=jobdir.ansible_root + '/',
+                                dest=os.path.join(scproot, '_zuul_ansible'))
+                task = dict(copy=copyargs,
+                            delegate_to='127.0.0.1')
+                tasks.append(task)
+
+                # Fetch the console log from the remote host.
                 src = '/tmp/console.html'
                 rsync_opts = []
             else:
@@ -821,8 +889,6 @@
                 rsync_opts = self._getRsyncOptions(scpfile['source'],
                                                    parameters)
 
-            scproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
-            os.chmod(scproot, 0o755)
             syncargs = dict(src=src,
                             dest=scproot,
                             copy_links='yes',
@@ -834,39 +900,49 @@
                 task['when'] = 'success'
             tasks.append(task)
 
-            dest = scpfile['target']
-            dest = self._substituteVariables(dest, parameters)
-            dest = os.path.join(site['root'], dest)
-            dest = os.path.normpath(dest)
-            if not dest.startswith(site['root']):
-                raise Exception("Target path %s is not below site root" %
-                                (dest,))
-
-            local_args = [
-                'shell', '/usr/bin/rsync', '--delay-updates', '-F',
-                '--compress', '-rt', '--safe-links',
-                '--rsync-path="mkdir -p {dest} && rsync"',
-                '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
-                '-o StrictHostKeyChecking=no -q"',
-                '--out-format="<<CHANGED>>%i %n%L"',
-                '{source}', '"{user}@{host}:{dest}"'
-            ]
-            if scpfile.get('keep-hierarchy'):
-                source = '"%s/"' % scproot
-            else:
-                source = '`/usr/bin/find "%s" -type f`' % scproot
-            local_action = ' '.join(local_args).format(
-                source=source,
-                dest=dest,
-                private_key_file=self.private_key_file,
-                host=site['host'],
-                user=site['user'])
-            task = dict(local_action=local_action)
-            if not scpfile.get('copy-after-failure'):
-                task['when'] = 'success'
+            task = self._makeSCPTaskLocalAction(
+                site, scpfile, scproot, parameters)
             tasks.append(task)
         return tasks
 
+    def _makeSCPTaskLocalAction(self, site, scpfile, scproot, parameters):
+        if site not in self.sites:
+            raise Exception("Undefined SCP site: %s" % (site,))
+        site = self.sites[site]
+        dest = scpfile['target'].lstrip('/')
+        dest = self._substituteVariables(dest, parameters)
+        dest = os.path.join(site['root'], dest)
+        dest = os.path.normpath(dest)
+        if not dest.startswith(site['root']):
+            raise Exception("Target path %s is not below site root" %
+                            (dest,))
+
+        rsync_cmd = [
+            '/usr/bin/rsync', '--delay-updates', '-F',
+            '--compress', '-rt', '--safe-links',
+            '--rsync-path="mkdir -p {dest} && rsync"',
+            '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
+            '-o StrictHostKeyChecking=no -q"',
+            '--out-format="<<CHANGED>>%i %n%L"',
+            '{source}', '"{user}@{host}:{dest}"'
+        ]
+        if scpfile.get('keep-hierarchy'):
+            source = '"%s/"' % scproot
+        else:
+            source = '`/usr/bin/find "%s" -type f`' % scproot
+        shellargs = ' '.join(rsync_cmd).format(
+            source=source,
+            dest=dest,
+            private_key_file=self.private_key_file,
+            host=site['host'],
+            user=site['user'])
+        task = dict(shell=shellargs,
+                    delegate_to='127.0.0.1')
+        if not scpfile.get('copy-after-failure'):
+            task['when'] = 'success'
+
+        return task
+
     def _makeFTPTask(self, jobdir, publisher, parameters):
         tasks = []
         ftp = publisher['ftp']
@@ -875,7 +951,7 @@
             raise Exception("Undefined FTP site: %s" % site)
         site = self.sites[site]
 
-        ftproot = tempfile.mkdtemp(dir=jobdir.ansible_root)
+        ftproot = tempfile.mkdtemp(dir=jobdir.staging_root)
         ftpcontent = os.path.join(ftproot, 'content')
         os.makedirs(ftpcontent)
         ftpscript = os.path.join(ftproot, 'script')
@@ -887,20 +963,22 @@
                                            parameters)
         syncargs = dict(src=src,
                         dest=ftpcontent,
-                        copy_links='yes')
+                        copy_links='yes',
+                        mode='pull')
         if rsync_opts:
             syncargs['rsync_opts'] = rsync_opts
         task = dict(synchronize=syncargs,
                     when='success')
         tasks.append(task)
         task = dict(shell='lftp -f %s' % ftpscript,
-                    when='success')
+                    when='success',
+                    delegate_to='127.0.0.1')
         ftpsource = ftpcontent
         if ftp.get('remove-prefix'):
             ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
         while ftpsource[-1] == '/':
             ftpsource = ftpsource[:-1]
-        ftptarget = ftp['target']
+        ftptarget = ftp['target'].lstrip('/')
         ftptarget = self._substituteVariables(ftptarget, parameters)
         ftptarget = os.path.join(site['root'], ftp['target'])
         ftptarget = os.path.normpath(ftptarget)
@@ -929,7 +1007,7 @@
         remote_path = os.path.join('/tmp', script_fn)
         copy = dict(src=script_path,
                     dest=remote_path,
-                    mode=0555)
+                    mode=0o555)
         task = dict(copy=copy)
         tasks.append(task)
 
@@ -949,6 +1027,34 @@
 
         return tasks
 
+    def _transformPublishers(self, jjb_job):
+        early_publishers = []
+        late_publishers = []
+        old_publishers = jjb_job.get('publishers', [])
+        for publisher in old_publishers:
+            early_scpfiles = []
+            late_scpfiles = []
+            if 'scp' not in publisher:
+                early_publishers.append(publisher)
+                continue
+            copy_console = False
+            for scpfile in publisher['scp']['files']:
+                if scpfile.get('copy-console'):
+                    scpfile['keep-hierarchy'] = True
+                    late_scpfiles.append(scpfile)
+                    copy_console = True
+                else:
+                    early_scpfiles.append(scpfile)
+            publisher['scp']['files'] = early_scpfiles + late_scpfiles
+            if copy_console:
+                late_publishers.append(publisher)
+            else:
+                early_publishers.append(publisher)
+        publishers = early_publishers + late_publishers
+        if old_publishers != publishers:
+            self.log.debug("Transformed job publishers")
+        return early_publishers, late_publishers
+
     def prepareAnsibleFiles(self, jobdir, gearman_job, args):
         job_name = gearman_job.name.split(':')[1]
         jjb_job = self.jobs[job_name]
@@ -964,15 +1070,19 @@
                 inventory.write('\n')
 
         timeout = None
+        timeout_var = None
         for wrapper in jjb_job.get('wrappers', []):
             if isinstance(wrapper, dict):
-                timeout = wrapper.get('build-timeout', {})
-                if isinstance(timeout, dict):
-                    timeout = timeout.get('timeout')
+                build_timeout = wrapper.get('build-timeout', {})
+                if isinstance(build_timeout, dict):
+                    timeout_var = build_timeout.get('timeout-var', None)
+                    timeout = build_timeout.get('timeout')
                     if timeout:
                         timeout = timeout * 60
         if not timeout:
             timeout = ANSIBLE_DEFAULT_TIMEOUT
+        if timeout_var:
+            parameters[timeout_var] = timeout
 
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
@@ -991,6 +1101,13 @@
                                   state='directory'))
             main_block.append(task)
 
+            msg = [
+                "Launched by %s" % self.manager_name,
+                "Building remotely on %s in workspace %s" % (
+                    self.name, parameters['WORKSPACE'])]
+            task = dict(zuul_log=dict(msg=msg))
+            main_block.append(task)
+
             for builder in jjb_job.get('builders', []):
                 if 'shell' in builder:
                     main_block.extend(
@@ -1005,26 +1122,44 @@
 
             play = dict(hosts='node', name='Job body',
                         tasks=tasks)
-            playbook.write(yaml.dump([play], default_flow_style=False))
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
+
+        early_publishers, late_publishers = self._transformPublishers(jjb_job)
 
         with open(jobdir.post_playbook, 'w') as playbook:
+            blocks = []
+            for publishers in [early_publishers, late_publishers]:
+                block = []
+                for publisher in publishers:
+                    if 'scp' in publisher:
+                        block.extend(self._makeSCPTask(jobdir, publisher,
+                                                       parameters))
+                    if 'ftp' in publisher:
+                        block.extend(self._makeFTPTask(jobdir, publisher,
+                                                       parameters))
+                blocks.append(block)
+
+            # The 'always' section contains the log publishing tasks,
+            # the 'block' contains all the other publishers.  This way
+            # we run the log publisher regardless of whether the rest
+            # of the publishers succeed.
             tasks = []
-            for publisher in jjb_job.get('publishers', []):
-                if 'scp' in publisher:
-                    tasks.extend(self._makeSCPTask(jobdir, publisher,
-                                                   parameters))
-                if 'ftp' in publisher:
-                    tasks.extend(self._makeFTPTask(jobdir, publisher,
-                                                   parameters))
+            tasks.append(dict(block=blocks[0],
+                              always=blocks[1]))
+
             play = dict(hosts='node', name='Publishers',
                         tasks=tasks)
-            playbook.write(yaml.dump([play], default_flow_style=False))
+            playbook.write(yaml.safe_dump([play], default_flow_style=False))
 
         with open(jobdir.config, 'w') as config:
             config.write('[defaults]\n')
             config.write('hostfile = %s\n' % jobdir.inventory)
             config.write('host_key_checking = False\n')
             config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
+            config.write('log_path = %s\n' % os.path.join(
+                jobdir.logs, 'ansible.log'))
+            config.write('gathering = explicit\n')
 
             callback_path = zuul.ansible.plugins.callback_plugins.__file__
             callback_path = os.path.abspath(callback_path)
@@ -1065,7 +1200,11 @@
         finally:
             watchdog.stop()
         self.log.debug("Ansible exit code: %s" % (ret,))
-        self.ansible_proc = None
+        self.ansible_job_proc = None
+        if ret == 3:
+            # AnsibleHostUnreachable: We had a network issue connecting to
+            # our zuul-worker.
+            return None
         return ret == 0
 
     def runAnsiblePostPlaybook(self, jobdir, success):
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index f3b867c..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
 import json
 import logging
 import os
+import six
 import time
 import threading
 from uuid import uuid4
@@ -164,6 +165,11 @@
             port = config.get('gearman', 'port')
         else:
             port = 4730
+        if config.has_option('gearman', 'check_job_registration'):
+            self.job_registration = config.getboolean(
+                'gearman', 'check_job_registration')
+        else:
+            self.job_registration = True
 
         self.gearman = ZuulGearmanClient(self)
         self.gearman.addServer(server, port)
@@ -231,7 +237,7 @@
                 s_config = {}
                 s_config.update((k, v.format(item=item, job=job,
                                              change=item.change))
-                                if isinstance(v, basestring)
+                                if isinstance(v, six.string_types)
                                 else (k, v)
                                 for k, v in s.items())
 
@@ -351,7 +357,8 @@
         build.__gearman_job = gearman_job
         self.builds[uuid] = build
 
-        if not self.isJobRegistered(gearman_job.name):
+        if self.job_registration and not self.isJobRegistered(
+                gearman_job.name):
             self.log.error("Job %s is not registered with Gearman" %
                            gearman_job)
             self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@@ -502,7 +509,7 @@
             # us where the job is running.
             return False
 
-        if not self.isJobRegistered(name):
+        if self.job_registration and not self.isJobRegistered(name):
             return False
 
         desc_uuid = str(uuid4().hex)
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
 import os
 import re
 
+import six
+
+
 OrderedDict = extras.try_imports(['collections.OrderedDict',
                                   'ordereddict.OrderedDict'])
 
@@ -59,17 +62,17 @@
             raise Exception("Expansion error. Check error messages above")
 
         self.log.info("Mapping projects to workspace...")
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             dest = os.path.normpath(os.path.join(workspace, dest[0]))
             ret[project] = dest
             self.log.info("  %s -> %s", project, dest)
 
         self.log.debug("Checking overlap in destination directories...")
         check = defaultdict(list)
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             check[dest].append(project)
 
-        dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+        dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
         if dupes:
             raise Exception("Some projects share the same destination: %s",
                             dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index f0235a6..3155df6 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,6 +19,8 @@
 import re
 import yaml
 
+import six
+
 from git import GitCommandError
 from zuul.lib.clonemapper import CloneMapper
 from zuul.merger.merger import Repo
@@ -62,7 +64,7 @@
         dests = mapper.expand(workspace=self.workspace)
 
         self.log.info("Preparing %s repositories", len(dests))
-        for project, dest in dests.iteritems():
+        for project, dest in six.iteritems(dests):
             self.prepareRepo(project, dest)
         self.log.info("Prepared all repositories")
 
diff --git a/zuul/lib/gearserver.py b/zuul/lib/gearserver.py
new file mode 100644
index 0000000..9cddca3
--- /dev/null
+++ b/zuul/lib/gearserver.py
@@ -0,0 +1,35 @@
+# Copyright 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gear
+
+MASS_DO = 101
+
+
+class GearServer(gear.Server):
+    def handlePacket(self, packet):
+        if packet.ptype == MASS_DO:
+            self.log.info("Received packet from %s: %s" % (packet.connection,
+                                                           packet))
+            self.handleMassDo(packet)
+        else:
+            return super(GearServer, self).handlePacket(packet)
+
+    def handleMassDo(self, packet):
+        packet.connection.functions = set()
+        for name in packet.data.split(b'\x00'):
+            self.log.debug("Adding function %s to %s" % (
+                name, packet.connection))
+            packet.connection.functions.add(name)
+            self.functions.add(name)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c6ae35d..3bc29e6 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -210,7 +210,7 @@
         fd.write('#!/bin/bash\n')
         fd.write('ssh -i %s $@\n' % key)
         fd.close()
-        os.chmod(name, 0755)
+        os.chmod(name, 0o755)
 
     def addProject(self, project, url):
         repo = None
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 30cd732..d56993c 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,7 +19,7 @@
 
 import gear
 
-import merger
+from zuul.merger import merger
 
 
 class MergeServer(object):
diff --git a/zuul/model.py b/zuul/model.py
index 3fb0577..46b0b98 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -110,7 +110,11 @@
         return job_tree
 
     def getProjects(self):
-        return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+        # cmp is not in python3, applied idiom from
+        # http://python-future.org/compatible_idioms.html#cmp
+        return sorted(
+            self.job_trees.keys(),
+            key=lambda p: p.name)
 
     def addQueue(self, queue):
         self.queues.append(queue)
@@ -417,7 +421,7 @@
             elif self.window_decrease_type == 'exponential':
                 self.window = max(
                     self.window_floor,
-                    self.window / self.window_decrease_factor)
+                    int(self.window / self.window_decrease_factor))
 
 
 class Project(object):
@@ -1075,7 +1079,7 @@
         for a in approvals:
             for k, v in a.items():
                 if k == 'username':
-                    pass
+                    a['username'] = re.compile(v)
                 elif k in ['email', 'email-filter']:
                     a['email'] = re.compile(v)
                 elif k == 'newer-than':
@@ -1094,7 +1098,7 @@
         by = approval.get('by', {})
         for k, v in rapproval.items():
             if k == 'username':
-                if (by.get('username', '') != v):
+                if (not v.search(by.get('username', ''))):
                         return False
             elif k == 'email':
                 if (not v.search(by.get('email', ''))):
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d54da9f..716dcfb 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -21,7 +21,7 @@
 import gear
 import six
 
-import model
+from zuul import model
 
 
 class RPCListener(object):
@@ -40,11 +40,11 @@
             port = 4730
         self.worker = gear.Worker('Zuul RPC Listener')
         self.worker.addServer(server, port)
+        self.worker.waitForServer()
+        self.register()
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
-        self.worker.waitForServer()
-        self.register()
 
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
@@ -66,8 +66,8 @@
         while self._running:
             try:
                 job = self.worker.getJob()
-                z, jobname = job.name.split(':')
                 self.log.debug("Received job %s" % job.name)
+                z, jobname = job.name.split(':')
                 attrname = 'handle_' + jobname
                 if hasattr(self, attrname):
                     f = getattr(self, attrname)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index c3fd3c9..a30b735 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -28,10 +28,10 @@
 import time
 import yaml
 
-import layoutvalidator
-import model
-from model import Pipeline, Project, ChangeQueue
-from model import ChangeishFilter, NullChange
+from zuul import layoutvalidator
+from zuul import model
+from zuul.model import Pipeline, Project, ChangeQueue
+from zuul.model import ChangeishFilter, NullChange
 from zuul import change_matcher, exceptions
 from zuul import version as zuul_version
 
@@ -411,7 +411,9 @@
                     base = os.path.dirname(os.path.realpath(config_path))
                     fn = os.path.join(base, fn)
                 fn = os.path.expanduser(fn)
-                execfile(fn, config_env)
+                with open(fn) as _f:
+                    code = compile(_f.read(), fn, 'exec')
+                    six.exec_(code, config_env)
 
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])