Merge "Fix py3 issue with command module" into feature/zuulv3
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 6a7007a..c6cc7ab 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -92,6 +92,8 @@
         self._play = None
         self._streamers = []
         self.configure_logger()
+        self._items_done = False
+        self._deferred_result = None
 
     def configure_logger(self):
         # ansible appends timestamp, user and pid to the log lines emitted
@@ -170,6 +172,9 @@
                 ip = play_vars[host].get(
                     'ansible_host', play_vars[host].get(
                         'ansible_inventory_host'))
+                if ip in ('localhost', '127.0.0.1'):
+                    # Don't try to stream from localhost
+                    continue
                 streamer = threading.Thread(
                     target=self._read_log, args=(
                         host, ip, log_id, task_name, hosts))
@@ -187,33 +192,50 @@
                 msg = "[Zuul] Log Stream did not terminate"
                 self._log(msg, job=True, executor=True)
 
-    def _process_result_for_localhost(self, result):
+    def _process_result_for_localhost(self, result, is_task=True):
+        result_dict = dict(result._result)
+        localhost_names = ('localhost', '127.0.0.1')
         is_localhost = False
-        delegated_vars = result._result.get('_ansible_delegated_vars', None)
+        delegated_vars = result_dict.get('_ansible_delegated_vars', None)
         if delegated_vars:
             delegated_host = delegated_vars['ansible_host']
-            if delegated_host in ('localhost', '127.0.0.1'):
+            if delegated_host in localhost_names:
+                is_localhost = True
+        else:
+            task_host = result._host.get_name()
+            task_hostvars = result._task._variable_manager._hostvars[task_host]
+            if task_hostvars['ansible_host'] in localhost_names:
                 is_localhost = True
 
-        if not is_localhost:
+        if not is_localhost and is_task:
             self._stop_streamers()
         if result._task.action in ('command', 'shell'):
-            stdout_lines = zuul_filter_result(result._result)
+            stdout_lines = zuul_filter_result(result_dict)
             if is_localhost:
                 for line in stdout_lines:
-                    self._log("localhost | %s " % line.strip())
+                    hostname = self._get_hostname(result)
+                    self._log("%s | %s " % (hostname, line.strip()))
 
     def v2_runner_on_failed(self, result, ignore_errors=False):
-        self._process_result_for_localhost(result)
-        self._handle_exception(result._result)
+        result_dict = dict(result._result)
 
-        if result._task.loop and 'results' in result._result:
-            self._process_items(result)
+        self._handle_exception(result_dict)
+
+        if result_dict.get('msg') == 'All items completed':
+            result_dict['status'] = 'ERROR'
+            self._deferred_result = result_dict
+            return
+
+        self._process_result_for_localhost(result)
+
+        if result._task.loop and 'results' in result_dict:
+            # items have their own events
+            pass
         else:
             self._log_message(
                 result=result,
                 msg="Results: => {results}".format(
-                    results=self._dump_results(result._result)),
+                    results=self._dump_results(result_dict)),
                 status='ERROR')
         if ignore_errors:
             self._log_message(result, "Ignoring Errors", status="ERROR")
@@ -223,35 +245,107 @@
                 and self._last_task_banner != result._task._uuid):
             self._print_task_banner(result._task)
 
-        self._clean_results(result._result, result._task.action)
-        self._process_result_for_localhost(result)
-
-        if result._task.action in ('include', 'include_role'):
+        if result._task.action in ('include', 'include_role', 'setup'):
             return
 
-        if result._result.get('changed', False):
+        result_dict = dict(result._result)
+
+        self._clean_results(result_dict, result._task.action)
+
+        if result_dict.get('changed', False):
             status = 'changed'
         else:
             status = 'ok'
 
-        if result._task.loop and 'results' in result._result:
-            self._process_items(result)
+        if (result_dict.get('msg') == 'All items completed'
+                and not self._items_done):
+            result_dict['status'] = status
+            self._deferred_result = result_dict
+            return
 
-        self._handle_warnings(result._result)
+        if not result._task.loop:
+            self._process_result_for_localhost(result)
+        else:
+            self._items_done = False
 
-        if result._task.loop and 'results' in result._result:
-            self._process_items(result)
+        self._handle_warnings(result_dict)
+
+        if result._task.loop and 'results' in result_dict:
+            # items have their own events
+            pass
+
         elif result._task.action not in ('command', 'shell'):
             self._log_message(
                 result=result,
                 msg="Results: => {results}".format(
-                    results=self._dump_results(result._result)),
+                    results=self._dump_results(result_dict)),
                 status=status)
+        elif 'results' in result_dict:
+            for res in result_dict['results']:
+                self._log_message(
+                    result,
+                    "Runtime: {delta} Start: {start} End: {end}".format(**res))
+        elif result_dict.get('msg') == 'All items completed':
+            self._log_message(result, result_dict['msg'])
         else:
             self._log_message(
                 result,
                 "Runtime: {delta} Start: {start} End: {end}".format(
-                    **result._result))
+                    **result_dict))
+
+    def v2_runner_item_on_ok(self, result):
+        result_dict = dict(result._result)
+        self._process_result_for_localhost(result, is_task=False)
+
+        if result_dict.get('changed', False):
+            status = 'changed'
+        else:
+            status = 'ok'
+
+        if result._task.action not in ('command', 'shell'):
+            self._log_message(
+                result=result,
+                msg="Item: {item} => {results}".format(
+                    item=result_dict['item'],
+                    results=self._dump_results(result_dict)),
+                status=status)
+        else:
+            self._log_message(
+                result,
+                "Item: {item} Runtime: {delta}"
+                " Start: {start} End: {end}".format(**result_dict))
+
+        if self._deferred_result:
+            self._process_deferred(result)
+
+    def v2_runner_item_on_failed(self, result):
+        result_dict = dict(result._result)
+        self._process_result_for_localhost(result, is_task=False)
+
+        if result._task.action not in ('command', 'shell'):
+            self._log_message(
+                result=result,
+                msg="Item: {item} => {results}".format(
+                    item=result_dict['item'],
+                    results=self._dump_results(result_dict)),
+                status='ERROR')
+        else:
+            self._log_message(
+                result,
+                "Item: {item} Runtime: {delta}"
+                " Start: {start} End: {end}".format(**result_dict))
+
+        if self._deferred_result:
+            self._process_deferred(result)
+
+    def _process_deferred(self, result):
+        self._items_done = True
+        result_dict = self._deferred_result
+        self._deferred_result = None
+
+        self._log_message(
+            result, "All items complete",
+            status=result_dict['status'])
 
     def _print_task_banner(self, task):
 
@@ -259,6 +353,10 @@
 
         args = ''
         task_args = task.args.copy()
+        if task.loop:
+            task_type = 'LOOP'
+        else:
+            task_type = 'TASK'
         is_shell = task_args.pop('_uses_shell', False)
         if is_shell and task_name == 'command':
             task_name = 'shell'
@@ -272,7 +370,8 @@
             args = u', '.join(u'%s=%s' % a for a in task_args.items())
             args = u' %s' % args
 
-        msg = "TASK [{task}{args}]".format(
+        msg = "{task_type} [{task}{args}]".format(
+            task_type=task_type,
             task=task_name,
             args=args)
         self._log(msg)