Merge "Ansible launcher: add private gearman function"
diff --git a/.testr.conf b/.testr.conf
index 5433c07..222ce97 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -1,4 +1,4 @@
 [DEFAULT]
-test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
+test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
 test_id_option=--load-list $IDFILE
 test_list_option=--list
diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst
index 98e4bb8..07b777a 100644
--- a/doc/source/zuul.rst
+++ b/doc/source/zuul.rst
@@ -49,6 +49,11 @@
   Port on which the Gearman server is listening.
   ``port=4730``
 
+**check_job_registration**
+  Check to see if job is registered with Gearman or not. When True
+  a build result of NOT_REGISTERED will be return if job is not found.
+  ``check_job_registration=True``
+
 gearman_server
 """"""""""""""
 
diff --git a/tests/base.py b/tests/base.py
index 3c28a72..5b31eea 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -27,6 +27,7 @@
 import re
 import select
 import shutil
+from six.moves import reload_module
 import socket
 import string
 import subprocess
@@ -861,6 +862,28 @@
                 format='%(asctime)s %(name)-32s '
                 '%(levelname)-8s %(message)s'))
 
+            # NOTE(notmorgan): Extract logging overrides for specific libraries
+            # from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
+            # each. This is used to limit the output during test runs from
+            # libraries that zuul depends on such as gear.
+            log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
+
+            if log_defaults_from_env:
+                for default in log_defaults_from_env.split(','):
+                    try:
+                        name, level_str = default.split('=', 1)
+                        level = getattr(logging, level_str, logging.DEBUG)
+                        self.useFixture(fixtures.FakeLogger(
+                            name=name,
+                            level=level,
+                            format='%(asctime)s %(name)-32s '
+                                   '%(levelname)-8s %(message)s'))
+                    except ValueError:
+                        # NOTE(notmorgan): Invalid format of the log default,
+                        # skip and don't try and apply a logger for the
+                        # specified module
+                        pass
+
 
 class ZuulTestCase(BaseTestCase):
 
@@ -916,8 +939,8 @@
         os.environ['STATSD_PORT'] = str(self.statsd.port)
         self.statsd.start()
         # the statsd client object is configured in the statsd module import
-        reload(statsd)
-        reload(zuul.scheduler)
+        reload_module(statsd)
+        reload_module(zuul.scheduler)
 
         self.gearman_server = FakeGearmanServer()
 
diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py
index 3de4a94..46a8c7c 100644
--- a/tests/test_layoutvalidator.py
+++ b/tests/test_layoutvalidator.py
@@ -14,7 +14,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import ConfigParser
+from six.moves import configparser as ConfigParser
 import os
 import re
 
diff --git a/tox.ini b/tox.ini
index 79ea939..a8767c2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,6 +9,7 @@
          STATSD_PORT=8125
          VIRTUAL_ENV={envdir}
          OS_TEST_TIMEOUT=30
+         OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
 passenv = ZUUL_TEST_ROOT
 usedevelop = True
 install_command = pip install {opts} {packages}
@@ -18,6 +19,8 @@
   python setup.py testr --slowest --testr-args='{posargs}'
 
 [testenv:pep8]
+# streamer is python3 only, so we need to run flake8 in python3
+basepython = python3
 commands = flake8 {posargs}
 
 [testenv:cover]
diff --git a/zuul/ansible/library/zuul_log.py b/zuul/ansible/library/zuul_log.py
index 8978275..2072bc9 100644
--- a/zuul/ansible/library/zuul_log.py
+++ b/zuul/ansible/library/zuul_log.py
@@ -34,14 +34,17 @@
 
 
 def log(msg):
+    if not isinstance(msg, list):
+        msg = [msg]
     with Console() as console:
-        console.addLine("[Zuul] %s\n" % msg)
+        for line in msg:
+            console.addLine("[Zuul] %s\n" % line)
 
 
 def main():
     module = AnsibleModule(
         argument_spec=dict(
-            msg=dict(required=True),
+            msg=dict(required=True, type='raw'),
         )
     )
 
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index 20b5600..5a38807 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -18,7 +18,6 @@
 import datetime
 import getpass
 import os
-import re
 import subprocess
 import threading
 
@@ -47,12 +46,16 @@
         if os.path.exists(fn):
             with open(fn) as f:
                 for line in f:
-                    line = re.sub('#.*', '', line).strip()
                     if not line:
                         continue
+                    if line[0] == '#':
+                        continue
                     if '=' not in line:
                         continue
-                    k, v = line.split('=')
+                    k, v = line.strip().split('=')
+                    for q in ["'", '"']:
+                        if v[0] == q:
+                            v = v.strip(q)
                     env[k] = v
     return env
 
diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py
index ae1e319..62891cd 100644
--- a/zuul/connection/gerrit.py
+++ b/zuul/connection/gerrit.py
@@ -32,7 +32,7 @@
     """Move events from Gerrit to the scheduler."""
 
     log = logging.getLogger("zuul.GerritEventConnector")
-    delay = 5.0
+    delay = 10.0
 
     def __init__(self, connection):
         super(GerritEventConnector, self).__init__()
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index f002fed..aef54d3 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -795,8 +795,16 @@
             job.sendWorkStatus(0, 100)
 
             job_status = self.runAnsiblePlaybook(jobdir, timeout)
+            if job_status == 3:
+                # AnsibleHostUnreachable: We had a network issue connecting to
+                # our zuul-worker. Rather then contiune, have zuul requeue the
+                # job.
+                return result
+
             post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
-            if job_status and post_status:
+            if not post_status:
+                status = 'POST_FAILURE'
+            elif job_status:
                 status = 'SUCCESS'
             else:
                 status = 'FAILURE'
@@ -878,8 +886,8 @@
                 raise Exception("Target path %s is not below site root" %
                                 (dest,))
 
-            local_args = [
-                'shell', '/usr/bin/rsync', '--delay-updates', '-F',
+            rsync_cmd = [
+                '/usr/bin/rsync', '--delay-updates', '-F',
                 '--compress', '-rt', '--safe-links',
                 '--rsync-path="mkdir -p {dest} && rsync"',
                 '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
@@ -891,13 +899,14 @@
                 source = '"%s/"' % scproot
             else:
                 source = '`/usr/bin/find "%s" -type f`' % scproot
-            local_action = ' '.join(local_args).format(
+            shellargs = ' '.join(rsync_cmd).format(
                 source=source,
                 dest=dest,
                 private_key_file=self.private_key_file,
                 host=site['host'],
                 user=site['user'])
-            task = dict(local_action=local_action)
+            task = dict(shell=shellargs,
+                        delegate_to='127.0.0.1')
             if not scpfile.get('copy-after-failure'):
                 task['when'] = 'success'
             tasks.append(task)
@@ -923,14 +932,16 @@
                                            parameters)
         syncargs = dict(src=src,
                         dest=ftpcontent,
-                        copy_links='yes')
+                        copy_links='yes',
+                        mode='pull')
         if rsync_opts:
             syncargs['rsync_opts'] = rsync_opts
         task = dict(synchronize=syncargs,
                     when='success')
         tasks.append(task)
         task = dict(shell='lftp -f %s' % ftpscript,
-                    when='success')
+                    when='success',
+                    delegate_to='127.0.0.1')
         ftpsource = ftpcontent
         if ftp.get('remove-prefix'):
             ftpsource = os.path.join(ftpcontent, ftp['remove-prefix'])
@@ -1000,15 +1011,19 @@
                 inventory.write('\n')
 
         timeout = None
+        timeout_var = None
         for wrapper in jjb_job.get('wrappers', []):
             if isinstance(wrapper, dict):
-                timeout = wrapper.get('build-timeout', {})
-                if isinstance(timeout, dict):
-                    timeout = timeout.get('timeout')
+                build_timeout = wrapper.get('build-timeout', {})
+                if isinstance(build_timeout, dict):
+                    timeout_var = build_timeout.get('timeout-var', None)
+                    timeout = build_timeout.get('timeout')
                     if timeout:
                         timeout = timeout * 60
         if not timeout:
             timeout = ANSIBLE_DEFAULT_TIMEOUT
+        if timeout_var:
+            parameters[timeout_var] = timeout
 
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
@@ -1027,6 +1042,13 @@
                                   state='directory'))
             main_block.append(task)
 
+            msg = [
+                "Launched by %s" % self.manager_name,
+                "Building remotely on %s in workspace %s" % (
+                    self.name, parameters['WORKSPACE'])]
+            task = dict(zuul_log=dict(msg=msg))
+            main_block.append(task)
+
             for builder in jjb_job.get('builders', []):
                 if 'shell' in builder:
                     main_block.extend(
@@ -1061,6 +1083,7 @@
             config.write('hostfile = %s\n' % jobdir.inventory)
             config.write('host_key_checking = False\n')
             config.write('private_key_file = %s\n' % self.private_key_file)
+            config.write('retry_files_enabled = False\n')
 
             callback_path = zuul.ansible.plugins.callback_plugins.__file__
             callback_path = os.path.abspath(callback_path)
@@ -1101,7 +1124,7 @@
         finally:
             watchdog.stop()
         self.log.debug("Ansible exit code: %s" % (ret,))
-        self.ansible_proc = None
+        self.ansible_job_proc = None
         return ret == 0
 
     def runAnsiblePostPlaybook(self, jobdir, success):
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index f3b867c..98307ee 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -17,6 +17,7 @@
 import json
 import logging
 import os
+import six
 import time
 import threading
 from uuid import uuid4
@@ -164,6 +165,11 @@
             port = config.get('gearman', 'port')
         else:
             port = 4730
+        if config.has_option('gearman', 'check_job_registration'):
+            self.job_registration = config.getboolean(
+                'gearman', 'check_job_registration')
+        else:
+            self.job_registration = True
 
         self.gearman = ZuulGearmanClient(self)
         self.gearman.addServer(server, port)
@@ -231,7 +237,7 @@
                 s_config = {}
                 s_config.update((k, v.format(item=item, job=job,
                                              change=item.change))
-                                if isinstance(v, basestring)
+                                if isinstance(v, six.string_types)
                                 else (k, v)
                                 for k, v in s.items())
 
@@ -351,7 +357,8 @@
         build.__gearman_job = gearman_job
         self.builds[uuid] = build
 
-        if not self.isJobRegistered(gearman_job.name):
+        if self.job_registration and not self.isJobRegistered(
+                gearman_job.name):
             self.log.error("Job %s is not registered with Gearman" %
                            gearman_job)
             self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@@ -502,7 +509,7 @@
             # us where the job is running.
             return False
 
-        if not self.isJobRegistered(name):
+        if self.job_registration and not self.isJobRegistered(name):
             return False
 
         desc_uuid = str(uuid4().hex)
diff --git a/zuul/lib/clonemapper.py b/zuul/lib/clonemapper.py
index ae558cd..57ac177 100644
--- a/zuul/lib/clonemapper.py
+++ b/zuul/lib/clonemapper.py
@@ -19,6 +19,9 @@
 import os
 import re
 
+import six
+
+
 OrderedDict = extras.try_imports(['collections.OrderedDict',
                                   'ordereddict.OrderedDict'])
 
@@ -59,17 +62,17 @@
             raise Exception("Expansion error. Check error messages above")
 
         self.log.info("Mapping projects to workspace...")
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             dest = os.path.normpath(os.path.join(workspace, dest[0]))
             ret[project] = dest
             self.log.info("  %s -> %s", project, dest)
 
         self.log.debug("Checking overlap in destination directories...")
         check = defaultdict(list)
-        for project, dest in ret.iteritems():
+        for project, dest in six.iteritems(ret):
             check[dest].append(project)
 
-        dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
+        dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
         if dupes:
             raise Exception("Some projects share the same destination: %s",
                             dupes)
diff --git a/zuul/lib/cloner.py b/zuul/lib/cloner.py
index f0235a6..3155df6 100644
--- a/zuul/lib/cloner.py
+++ b/zuul/lib/cloner.py
@@ -19,6 +19,8 @@
 import re
 import yaml
 
+import six
+
 from git import GitCommandError
 from zuul.lib.clonemapper import CloneMapper
 from zuul.merger.merger import Repo
@@ -62,7 +64,7 @@
         dests = mapper.expand(workspace=self.workspace)
 
         self.log.info("Preparing %s repositories", len(dests))
-        for project, dest in dests.iteritems():
+        for project, dest in six.iteritems(dests):
             self.prepareRepo(project, dest)
         self.log.info("Prepared all repositories")
 
diff --git a/zuul/model.py b/zuul/model.py
index 3fb0577..542d0b6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -110,7 +110,11 @@
         return job_tree
 
     def getProjects(self):
-        return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
+        # cmp is not in python3, applied idiom from
+        # http://python-future.org/compatible_idioms.html#cmp
+        return sorted(
+            self.job_trees.keys(),
+            key=lambda p: p.name)
 
     def addQueue(self, queue):
         self.queues.append(queue)
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 83d119f..551dd03 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -40,11 +40,11 @@
             port = 4730
         self.worker = gear.Worker('Zuul RPC Listener')
         self.worker.addServer(server, port)
+        self.worker.waitForServer()
+        self.register()
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
         self.thread.start()
-        self.worker.waitForServer()
-        self.register()
 
     def register(self):
         self.worker.registerFunction("zuul:enqueue")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index dcc5f88..f08612d 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -411,7 +411,9 @@
                     base = os.path.dirname(os.path.realpath(config_path))
                     fn = os.path.join(base, fn)
                 fn = os.path.expanduser(fn)
-                execfile(fn, config_env)
+                with open(fn) as _f:
+                    code = compile(_f.read(), fn, 'exec')
+                    six.exec_(code, config_env)
 
         for conf_pipeline in data.get('pipelines', []):
             pipeline = Pipeline(conf_pipeline['name'])