Merge "Do late decoding of log stream buffer" into feature/zuulv3
diff --git a/playbooks/zuul-stream/fixtures/test-stream.yaml b/playbooks/zuul-stream/fixtures/test-stream.yaml
index fd28757..c4946e8 100644
--- a/playbooks/zuul-stream/fixtures/test-stream.yaml
+++ b/playbooks/zuul-stream/fixtures/test-stream.yaml
@@ -46,3 +46,6 @@
       args:
         chdir: /itemloop/somewhere/that/does/not/exist
       failed_when: false
+
+    - name: Print binary data
+      command: echo -e '\x80abc'
diff --git a/playbooks/zuul-stream/functional.yaml b/playbooks/zuul-stream/functional.yaml
index 6b67b05..779a102 100644
--- a/playbooks/zuul-stream/functional.yaml
+++ b/playbooks/zuul-stream/functional.yaml
@@ -58,3 +58,8 @@
       shell: |
         egrep "^.+\| node1 \| OSError.+\/failure-itemloop\/" job-output.txt
         egrep "^.+\| node2 \| OSError.+\/failure-itemloop\/" job-output.txt
+
+    - name: Validate output - binary data
+      shell: |
+        egrep "^.*\| node1 \| \\\\x80abc" job-output.txt
+        egrep "^.*\| node2 \| \\\\x80abc" job-output.txt
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 8ba3b86..8845e9b 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -128,22 +128,29 @@
                 continue
             msg = "%s\n" % log_id
             s.send(msg.encode("utf-8"))
-            buff = s.recv(4096).decode("utf-8")
+            buff = s.recv(4096)
             buffering = True
             while buffering:
-                if "\n" in buff:
-                    (line, buff) = buff.split("\n", 1)
-                    done = self._log_streamline(host, line)
+                if b'\n' in buff:
+                    (line, buff) = buff.split(b'\n', 1)
+                    # We can potentially get binary data here. In order to
+                    # being able to handle that use the backslashreplace
+                    # error handling method. This decodes unknown utf-8
+                    # code points to escape sequences which exactly represent
+                    # the correct data without throwing a decoding exception.
+                    done = self._log_streamline(
+                        host, line.decode("utf-8", "backslashreplace"))
                     if done:
                         return
                 else:
-                    more = s.recv(4096).decode("utf-8")
+                    more = s.recv(4096)
                     if not more:
                         buffering = False
                     else:
                         buff += more
             if buff:
-                self._log_streamline(host, line)
+                self._log_streamline(
+                    host, line.decode("utf-8", "backslashreplace"))
 
     def _log_streamline(self, host, line):
         if "[Zuul] Task exit code" in line: