Do late decoding of log stream buffer

The log stream is read in chunked blocks. When having multi byte
unicode characters in the log stream it can happen that this character
is split into different buffers. This can break the decode step with
an exception [1]. This can be fixed by treating the buffer as binary
and decoding the final lines.

Further we must expect that the data also contains binary data. In
order to cope with this further harden the final decoding by adding
'backslashreplace'. This will replace every occurrence of an
undecodable character by an appropriate escape sequence. This way we
can retain all the information (even binary) without being unable to
decode the stream.

[1]: Log output
Ansible output: b'Exception in thread Thread-10:'
Ansible output: b'Traceback (most recent call last):'
Ansible output: b'  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner'
Ansible output: b'    self.run()'
Ansible output: b'  File "/usr/lib/python3.5/threading.py", line 862, in run'
Ansible output: b'    self._target(*self._args, **self._kwargs)'
Ansible output: b'  File "/var/lib/zuul/ansible/zuul/ansible/callback/zuul_stream.py", line 140, in _read_log'
Ansible output: b'    more = s.recv(4096).decode("utf-8")'
Ansible output: b"UnicodeDecodeError: 'utf-8' codec can't decode bytes in position 4094-4095: unexpected end of data"
Ansible output: b''

Change-Id: I568ede2a2a4a64fd3a98480cebcbc2e86c54a2cf
diff --git a/playbooks/zuul-stream/fixtures/test-stream.yaml b/playbooks/zuul-stream/fixtures/test-stream.yaml
index fd28757..c4946e8 100644
--- a/playbooks/zuul-stream/fixtures/test-stream.yaml
+++ b/playbooks/zuul-stream/fixtures/test-stream.yaml
@@ -46,3 +46,6 @@
       args:
         chdir: /itemloop/somewhere/that/does/not/exist
       failed_when: false
+
+    - name: Print binary data
+      command: echo -e '\x80abc'
diff --git a/playbooks/zuul-stream/functional.yaml b/playbooks/zuul-stream/functional.yaml
index 6b67b05..779a102 100644
--- a/playbooks/zuul-stream/functional.yaml
+++ b/playbooks/zuul-stream/functional.yaml
@@ -58,3 +58,8 @@
       shell: |
         egrep "^.+\| node1 \| OSError.+\/failure-itemloop\/" job-output.txt
         egrep "^.+\| node2 \| OSError.+\/failure-itemloop\/" job-output.txt
+
+    - name: Validate output - binary data
+      shell: |
+        egrep "^.*\| node1 \| \\\\x80abc" job-output.txt
+        egrep "^.*\| node2 \| \\\\x80abc" job-output.txt
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 8ba3b86..8845e9b 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -128,22 +128,29 @@
                 continue
             msg = "%s\n" % log_id
             s.send(msg.encode("utf-8"))
-            buff = s.recv(4096).decode("utf-8")
+            buff = s.recv(4096)
             buffering = True
             while buffering:
-                if "\n" in buff:
-                    (line, buff) = buff.split("\n", 1)
-                    done = self._log_streamline(host, line)
+                if b'\n' in buff:
+                    (line, buff) = buff.split(b'\n', 1)
+                    # We can potentially get binary data here. In order to
+                    # being able to handle that use the backslashreplace
+                    # error handling method. This decodes unknown utf-8
+                    # code points to escape sequences which exactly represent
+                    # the correct data without throwing a decoding exception.
+                    done = self._log_streamline(
+                        host, line.decode("utf-8", "backslashreplace"))
                     if done:
                         return
                 else:
-                    more = s.recv(4096).decode("utf-8")
+                    more = s.recv(4096)
                     if not more:
                         buffering = False
                     else:
                         buff += more
             if buff:
-                self._log_streamline(host, line)
+                self._log_streamline(
+                    host, line.decode("utf-8", "backslashreplace"))
 
     def _log_streamline(self, host, line):
         if "[Zuul] Task exit code" in line: