Add initial callback plugin to handle log streaming

This is basically the default callback plugin from ansible cleaned up
for pep8 and plumbed in. Changes we make here should directly affect
formatting and whatnot.

Change-Id: Ic807a80a13ad14e080e50307f6f78619235dfe7f
diff --git a/zuul/ansible/callback/ b/zuul/ansible/callback/
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/callback/
diff --git a/zuul/ansible/callback/ b/zuul/ansible/callback/
new file mode 100644
index 0000000..919574c
--- /dev/null
+++ b/zuul/ansible/callback/
@@ -0,0 +1,342 @@
+# (c) 2012-2014, Michael DeHaan <>
+# This file is part of Ansible
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with Ansible.  If not, see <>.
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+from ansible import constants as C
+from ansible.plugins import callback
+from ansible.utils.color import colorize, hostcolor
+class CallbackModule(callback.CallbackBase):
+    '''
+    This is the Zuul streaming callback. It's based on the default
+    callback plugin, but streams results from shell commands.
+    '''
+    CALLBACK_TYPE = 'stdout'
+    CALLBACK_NAME = 'zuul_stream'
+    def __init__(self):
+        self._play = None
+        self._last_task_banner = None
+        super(CallbackModule, self).__init__()
+    def _should_verbose(self, result, level=0):
+        return ((self._display.verbosity > level
+                 or '_ansible_verbose_always' in result._result)
+                and '_ansible_verbose_override' not in result._result)
+    def v2_runner_on_failed(self, result, ignore_errors=False):
+        if (self._play.strategy == 'free'
+                and self._last_task_banner != result._task._uuid):
+            self._print_task_banner(result._task)
+        delegated_vars = result._result.get('_ansible_delegated_vars', None)
+        if 'exception' in result._result:
+            if self._display.verbosity < 3:
+                # extract just the actual error message from the exception text
+                error = result._result['exception'].strip().split('\n')[-1]
+                msg = ("An exception occurred during task execution. To see"
+                       " the full traceback, use -vvv."
+                       " The error was: %s" % error)
+            else:
+                msg = ("An exception occurred during task execution. The full"
+                       " traceback is:\n" + result._result['exception'])
+            self._display.display(msg, color=C.COLOR_ERROR)
+        self._handle_warnings(result._result)
+        if result._task.loop and 'results' in result._result:
+            self._process_items(result)
+        else:
+            if delegated_vars:
+                self._display.display(
+                    "fatal: [%s -> %s]: FAILED! => %s" % (
+                        result._host.get_name(),
+                        delegated_vars['ansible_host'],
+                        self._dump_results(result._result)),
+                    color=C.COLOR_ERROR)
+            else:
+                self._display.display(
+                    "fatal: [%s]: FAILED! => %s" % (
+                        result._host.get_name(),
+                        self._dump_results(result._result)),
+                    color=C.COLOR_ERROR)
+        if ignore_errors:
+            self._display.display("...ignoring", color=C.COLOR_SKIP)
+    def v2_runner_on_ok(self, result):
+        if (self._play.strategy == 'free'
+                and self._last_task_banner != result._task._uuid):
+            self._print_task_banner(result._task)
+        self._clean_results(result._result, result._task.action)
+        delegated_vars = result._result.get('_ansible_delegated_vars', None)
+        self._clean_results(result._result, result._task.action)
+        if result._task.action in ('include', 'include_role'):
+            return
+        elif result._result.get('changed', False):
+            if delegated_vars:
+                msg = "changed: [%s -> %s]" % (
+                    result._host.get_name(), delegated_vars['ansible_host'])
+            else:
+                msg = "changed: [%s]" % result._host.get_name()
+            color = C.COLOR_CHANGED
+        else:
+            if delegated_vars:
+                msg = "ok: [%s -> %s]" % (
+                    result._host.get_name(), delegated_vars['ansible_host'])
+            else:
+                msg = "ok: [%s]" % result._host.get_name()
+            color = C.COLOR_OK
+        self._handle_warnings(result._result)
+        if result._task.loop and 'results' in result._result:
+            self._process_items(result)
+        else:
+            if self._should_verbose(result):
+                msg += " => %s" % (self._dump_results(result._result),)
+            self._display.display(msg, color=color)
+    def v2_runner_on_skipped(self, result):
+            if (self._play.strategy == 'free'
+                    and self._last_task_banner != result._task._uuid):
+                self._print_task_banner(result._task)
+            if result._task.loop and 'results' in result._result:
+                self._process_items(result)
+            else:
+                msg = "skipping: [%s]" % result._host.get_name()
+                if self._should_verbose(result):
+                    msg += " => %s" % self._dump_results(result._result)
+                self._display.display(msg, color=C.COLOR_SKIP)
+    def v2_runner_on_unreachable(self, result):
+        if (self._play.strategy == 'free'
+                and self._last_task_banner != result._task._uuid):
+            self._print_task_banner(result._task)
+        delegated_vars = result._result.get('_ansible_delegated_vars', None)
+        if delegated_vars:
+            self._display.display(
+                "fatal: [%s -> %s]: UNREACHABLE! => %s" % (
+                    result._host.get_name(),
+                    delegated_vars['ansible_host'],
+                    self._dump_results(result._result)),
+                color=C.COLOR_UNREACHABLE)
+        else:
+            self._display.display(
+                "fatal: [%s]: UNREACHABLE! => %s" % (
+                    result._host.get_name(),
+                    self._dump_results(result._result)),
+                color=C.COLOR_UNREACHABLE)
+    def v2_playbook_on_no_hosts_matched(self):
+        self._display.display("skipping: no hosts matched", color=C.COLOR_SKIP)
+    def v2_playbook_on_no_hosts_remaining(self):
+        self._display.banner("NO MORE HOSTS LEFT")
+    def v2_playbook_on_task_start(self, task, is_conditional):
+        if self._play.strategy != 'free':
+            self._print_task_banner(task)
+    def _print_task_banner(self, task):
+        # args can be specified as no_log in several places: in the task or in
+        # the argument spec.  We can check whether the task is no_log but the
+        # argument spec can't be because that is only run on the target
+        # machine and we haven't run it thereyet at this time.
+        #
+        # So we give people a config option to affect display of the args so
+        # that they can secure this if they feel that their stdout is insecure
+        # (shoulder surfing, logging stdout straight to a file, etc).
+        args = ''
+        if not task.no_log and C.DISPLAY_ARGS_TO_STDOUT:
+            args = u', '.join(u'%s=%s' % a for a in task.args.items())
+            args = u' %s' % args
+        self._display.banner(u"TASK [%s%s]" % (task.get_name().strip(), args))
+        if self._display.verbosity >= 2:
+            path = task.get_path()
+            if path:
+                self._display.display(
+                    u"task path: %s" % path, color=C.COLOR_DEBUG)
+        self._last_task_banner = task._uuid
+    def v2_playbook_on_cleanup_task_start(self, task):
+        self._display.banner("CLEANUP TASK [%s]" % task.get_name().strip())
+    def v2_playbook_on_handler_task_start(self, task):
+        self._display.banner("RUNNING HANDLER [%s]" % task.get_name().strip())
+    def v2_playbook_on_play_start(self, play):
+        name = play.get_name().strip()
+        if not name:
+            msg = u"PLAY"
+        else:
+            msg = u"PLAY [%s]" % name
+        self._play = play
+        self._display.banner(msg)
+    def v2_on_file_diff(self, result):
+        if result._task.loop and 'results' in result._result:
+            for res in result._result['results']:
+                if 'diff' in res and res['diff'] and res.get('changed', False):
+                    diff = self._get_diff(res['diff'])
+                    if diff:
+                        self._display.display(diff)
+        elif ('diff' in result._result and result._result['diff']
+              and result._result.get('changed', False)):
+            diff = self._get_diff(result._result['diff'])
+            if diff:
+                self._display.display(diff)
+    def v2_runner_item_on_ok(self, result):
+        delegated_vars = result._result.get('_ansible_delegated_vars', None)
+        if result._task.action in ('include', 'include_role'):
+            return
+        elif result._result.get('changed', False):
+            msg = 'changed'
+            color = C.COLOR_CHANGED
+        else:
+            msg = 'ok'
+            color = C.COLOR_OK
+        if delegated_vars:
+            msg += ": [%s -> %s]" % (
+                result._host.get_name(), delegated_vars['ansible_host'])
+        else:
+            msg += ": [%s]" % result._host.get_name()
+        msg += " => (item=%s)" % (self._get_item(result._result),)
+        if self._should_verbose(result):
+            msg += " => %s" % self._dump_results(result._result)
+        self._display.display(msg, color=color)
+    def v2_runner_item_on_failed(self, result):
+        delegated_vars = result._result.get('_ansible_delegated_vars', None)
+        if 'exception' in result._result:
+            if self._display.verbosity < 3:
+                # extract just the actual error message from the exception text
+                error = result._result['exception'].strip().split('\n')[-1]
+                msg = ("An exception occurred during task execution."
+                       " To see the full traceback, use -vvv. The error was:"
+                       " %s") % error
+            else:
+                msg = ("An exception occurred during task execution. The full"
+                       "traceback is:\n" + result._result['exception'])
+            self._display.display(msg, color=C.COLOR_ERROR)
+        msg = "failed: "
+        if delegated_vars:
+            msg += "[%s -> %s]" % (
+                result._host.get_name(), delegated_vars['ansible_host'])
+        else:
+            msg += "[%s]" % (result._host.get_name())
+        self._handle_warnings(result._result)
+        self._display.display(
+            msg + " (item=%s) => %s" % (
+                self._get_item(result._result),
+                self._dump_results(result._result)), color=C.COLOR_ERROR)
+    def v2_runner_item_on_skipped(self, result):
+            msg = "skipping: [%s] => (item=%s) " % (
+                result._host.get_name(), self._get_item(result._result))
+            if self._should_verbose(result):
+                msg += " => %s" % self._dump_results(result._result)
+            self._display.display(msg, color=C.COLOR_SKIP)
+    def v2_playbook_on_include(self, included_file):
+        msg = 'included: %s for %s' % (
+            included_file._filename,
+            ", ".join([ for h in included_file._hosts]))
+        self._display.display(msg, color=C.COLOR_SKIP)
+    def v2_playbook_on_stats(self, stats):
+        self._display.banner("PLAY RECAP")
+        hosts = sorted(stats.processed.keys())
+        for h in hosts:
+            t = stats.summarize(h)
+            self._display.display(u"%s : %s %s %s %s" % (
+                hostcolor(h, t),
+                colorize(u'ok', t['ok'], C.COLOR_OK),
+                colorize(u'changed', t['changed'], C.COLOR_CHANGED),
+                colorize(
+                    u'unreachable', t['unreachable'], C.COLOR_UNREACHABLE),
+                colorize(u'failed', t['failures'], C.COLOR_ERROR)),
+                screen_only=True
+            )
+            self._display.display(u"%s : %s %s %s %s" % (
+                hostcolor(h, t, False),
+                colorize(u'ok', t['ok'], None),
+                colorize(u'changed', t['changed'], None),
+                colorize(u'unreachable', t['unreachable'], None),
+                colorize(u'failed', t['failures'], None)),
+                log_only=True
+            )
+        self._display.display("", screen_only=True)
+    def v2_playbook_on_start(self, playbook):
+        if self._display.verbosity > 1:
+            from os.path import basename
+            self._display.banner(
+                "PLAYBOOK: %s" % basename(playbook._file_name))
+        if self._display.verbosity > 3:
+            if self._options is not None:
+                for option in dir(self._options):
+                    if option.startswith('_') or option in [
+                            'read_file', 'ensure_value', 'read_module']:
+                        continue
+                    val = getattr(self._options, option)
+                    if val:
+                        self._display.vvvv('%s: %s' % (option, val))
+    def v2_runner_retry(self, result):
+        msg = "FAILED - RETRYING: %s (%d retries left)." % (
+            result._task,
+            result._result['retries'] - result._result['attempts'])
+        if self._should_verbose(result, level=2):
+            msg += "Result was: %s" % self._dump_results(result._result)
+        self._display.display(msg, color=C.COLOR_DEBUG)
diff --git a/zuul/launcher/ b/zuul/launcher/
index e42a716..d412bec 100644
--- a/zuul/launcher/
+++ b/zuul/launcher/
@@ -30,6 +30,7 @@
 import zuul.merger.merger
 import zuul.ansible.action
+import zuul.ansible.callback
 import zuul.ansible.library
 from zuul.lib import commandsocket
@@ -256,15 +257,25 @@
         if not os.path.exists(self.action_dir):
+        self.callback_dir = os.path.join(ansible_dir, 'callback')
+        if not os.path.exists(self.callback_dir):
+            os.makedirs(self.callback_dir)
         library_path = os.path.dirname(os.path.abspath(
         for fn in os.listdir(library_path):
             shutil.copy(os.path.join(library_path, fn), self.library_dir)
         action_path = os.path.dirname(os.path.abspath(
         for fn in os.listdir(action_path):
             shutil.copy(os.path.join(action_path, fn), self.action_dir)
+        callback_path = os.path.dirname(os.path.abspath(
+            zuul.ansible.callback.__file__))
+        for fn in os.listdir(callback_path):
+            shutil.copy(os.path.join(callback_path, fn), self.callback_dir)
         self.job_workers = {}
     def _getMerger(self, root):
@@ -800,6 +811,9 @@
             if self.jobdir.roles_path:
                 config.write('roles_path = %s\n' %
+            config.write('callback_plugins = %s\n'
+                         % self.launcher_server.callback_dir)
+            config.write('stdout_callback = zuul_stream\n')
             # bump the timeout because busy nodes may take more than
             # 10s to respond
             config.write('timeout = 30\n')