Merge "Fix py3 issue with command module" into feature/zuulv3
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 6a7007a..c6cc7ab 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -92,6 +92,8 @@
self._play = None
self._streamers = []
self.configure_logger()
+ self._items_done = False
+ self._deferred_result = None
def configure_logger(self):
# ansible appends timestamp, user and pid to the log lines emitted
@@ -170,6 +172,9 @@
ip = play_vars[host].get(
'ansible_host', play_vars[host].get(
'ansible_inventory_host'))
+ if ip in ('localhost', '127.0.0.1'):
+ # Don't try to stream from localhost
+ continue
streamer = threading.Thread(
target=self._read_log, args=(
host, ip, log_id, task_name, hosts))
@@ -187,33 +192,50 @@
msg = "[Zuul] Log Stream did not terminate"
self._log(msg, job=True, executor=True)
- def _process_result_for_localhost(self, result):
+ def _process_result_for_localhost(self, result, is_task=True):
+ result_dict = dict(result._result)
+ localhost_names = ('localhost', '127.0.0.1')
is_localhost = False
- delegated_vars = result._result.get('_ansible_delegated_vars', None)
+ delegated_vars = result_dict.get('_ansible_delegated_vars', None)
if delegated_vars:
delegated_host = delegated_vars['ansible_host']
- if delegated_host in ('localhost', '127.0.0.1'):
+ if delegated_host in localhost_names:
+ is_localhost = True
+ else:
+ task_host = result._host.get_name()
+ task_hostvars = result._task._variable_manager._hostvars[task_host]
+ if task_hostvars['ansible_host'] in localhost_names:
is_localhost = True
- if not is_localhost:
+ if not is_localhost and is_task:
self._stop_streamers()
if result._task.action in ('command', 'shell'):
- stdout_lines = zuul_filter_result(result._result)
+ stdout_lines = zuul_filter_result(result_dict)
if is_localhost:
for line in stdout_lines:
- self._log("localhost | %s " % line.strip())
+ hostname = self._get_hostname(result)
+ self._log("%s | %s " % (hostname, line.strip()))
def v2_runner_on_failed(self, result, ignore_errors=False):
- self._process_result_for_localhost(result)
- self._handle_exception(result._result)
+ result_dict = dict(result._result)
- if result._task.loop and 'results' in result._result:
- self._process_items(result)
+ self._handle_exception(result_dict)
+
+ if result_dict.get('msg') == 'All items completed':
+ result_dict['status'] = 'ERROR'
+ self._deferred_result = result_dict
+ return
+
+ self._process_result_for_localhost(result)
+
+ if result._task.loop and 'results' in result_dict:
+ # items have their own events
+ pass
else:
self._log_message(
result=result,
msg="Results: => {results}".format(
- results=self._dump_results(result._result)),
+ results=self._dump_results(result_dict)),
status='ERROR')
if ignore_errors:
self._log_message(result, "Ignoring Errors", status="ERROR")
@@ -223,35 +245,107 @@
and self._last_task_banner != result._task._uuid):
self._print_task_banner(result._task)
- self._clean_results(result._result, result._task.action)
- self._process_result_for_localhost(result)
-
- if result._task.action in ('include', 'include_role'):
+ if result._task.action in ('include', 'include_role', 'setup'):
return
- if result._result.get('changed', False):
+ result_dict = dict(result._result)
+
+ self._clean_results(result_dict, result._task.action)
+
+ if result_dict.get('changed', False):
status = 'changed'
else:
status = 'ok'
- if result._task.loop and 'results' in result._result:
- self._process_items(result)
+ if (result_dict.get('msg') == 'All items completed'
+ and not self._items_done):
+ result_dict['status'] = status
+ self._deferred_result = result_dict
+ return
- self._handle_warnings(result._result)
+ if not result._task.loop:
+ self._process_result_for_localhost(result)
+ else:
+ self._items_done = False
- if result._task.loop and 'results' in result._result:
- self._process_items(result)
+ self._handle_warnings(result_dict)
+
+ if result._task.loop and 'results' in result_dict:
+ # items have their own events
+ pass
+
elif result._task.action not in ('command', 'shell'):
self._log_message(
result=result,
msg="Results: => {results}".format(
- results=self._dump_results(result._result)),
+ results=self._dump_results(result_dict)),
status=status)
+ elif 'results' in result_dict:
+ for res in result_dict['results']:
+ self._log_message(
+ result,
+ "Runtime: {delta} Start: {start} End: {end}".format(**res))
+ elif result_dict.get('msg') == 'All items completed':
+ self._log_message(result, result_dict['msg'])
else:
self._log_message(
result,
"Runtime: {delta} Start: {start} End: {end}".format(
- **result._result))
+ **result_dict))
+
+ def v2_runner_item_on_ok(self, result):
+ result_dict = dict(result._result)
+ self._process_result_for_localhost(result, is_task=False)
+
+ if result_dict.get('changed', False):
+ status = 'changed'
+ else:
+ status = 'ok'
+
+ if result._task.action not in ('command', 'shell'):
+ self._log_message(
+ result=result,
+ msg="Item: {item} => {results}".format(
+ item=result_dict['item'],
+ results=self._dump_results(result_dict)),
+ status=status)
+ else:
+ self._log_message(
+ result,
+ "Item: {item} Runtime: {delta}"
+ " Start: {start} End: {end}".format(**result_dict))
+
+ if self._deferred_result:
+ self._process_deferred(result)
+
+ def v2_runner_item_on_failed(self, result):
+ result_dict = dict(result._result)
+ self._process_result_for_localhost(result, is_task=False)
+
+ if result._task.action not in ('command', 'shell'):
+ self._log_message(
+ result=result,
+ msg="Item: {item} => {results}".format(
+ item=result_dict['item'],
+ results=self._dump_results(result_dict)),
+ status='ERROR')
+ else:
+ self._log_message(
+ result,
+ "Item: {item} Runtime: {delta}"
+ " Start: {start} End: {end}".format(**result_dict))
+
+ if self._deferred_result:
+ self._process_deferred(result)
+
+ def _process_deferred(self, result):
+ self._items_done = True
+ result_dict = self._deferred_result
+ self._deferred_result = None
+
+ self._log_message(
+ result, "All items complete",
+ status=result_dict['status'])
def _print_task_banner(self, task):
@@ -259,6 +353,10 @@
args = ''
task_args = task.args.copy()
+ if task.loop:
+ task_type = 'LOOP'
+ else:
+ task_type = 'TASK'
is_shell = task_args.pop('_uses_shell', False)
if is_shell and task_name == 'command':
task_name = 'shell'
@@ -272,7 +370,8 @@
args = u', '.join(u'%s=%s' % a for a in task_args.items())
args = u' %s' % args
- msg = "TASK [{task}{args}]".format(
+ msg = "{task_type} [{task}{args}]".format(
+ task_type=task_type,
task=task_name,
args=args)
self._log(msg)