Add log streaming test

Test that our finger log streamer will correctly find the log file
and stream its contents to a client.

Change-Id: Ia153e2d436855dcce256d5fc91d0fd4c47116042
diff --git a/tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml b/tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml
new file mode 100644
index 0000000..753e7e2
--- /dev/null
+++ b/tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml
@@ -0,0 +1,11 @@
+# NOTE(Shrews): Do not run any tasks that will need zuul_console to stream
+# output because that will not work. Since we just need any output in our
+# ansible log, the test coordination tasks should be sufficient.
+- hosts: all
+  tasks:
+    - debug: var=waitpath
+
+     # Do not finish until test creates the flag file
+    - wait_for:
+        state: present
+        path: "{{waitpath}}"
diff --git a/tests/fixtures/config/streamer/git/common-config/zuul.yaml b/tests/fixtures/config/streamer/git/common-config/zuul.yaml
new file mode 100644
index 0000000..d8df96a
--- /dev/null
+++ b/tests/fixtures/config/streamer/git/common-config/zuul.yaml
@@ -0,0 +1,17 @@
+- pipeline:
+    name: check
+    manager: independent
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        verified: 1
+    failure:
+      gerrit:
+        verified: -1
+
+- job:
+    name: python27
+    vars:
+      waitpath: '{{zuul._test.test_root}}/{{zuul.uuid}}/test_wait'
diff --git a/tests/fixtures/config/streamer/git/org_project/.zuul.yaml b/tests/fixtures/config/streamer/git/org_project/.zuul.yaml
new file mode 100644
index 0000000..89a5674
--- /dev/null
+++ b/tests/fixtures/config/streamer/git/org_project/.zuul.yaml
@@ -0,0 +1,5 @@
+- project:
+    name: org/project
+    check:
+      jobs:
+        - python27
diff --git a/tests/fixtures/config/streamer/git/org_project/README b/tests/fixtures/config/streamer/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/streamer/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/streamer/main.yaml b/tests/fixtures/config/streamer/main.yaml
new file mode 100644
index 0000000..208e274
--- /dev/null
+++ b/tests/fixtures/config/streamer/main.yaml
@@ -0,0 +1,8 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - common-config
+        untrusted-projects:
+          - org/project
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py
index 3ea5a8e..f422e97 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_log_streamer.py
@@ -14,9 +14,12 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import logging
+import os
+import os.path
 import socket
 import tempfile
+import threading
+import time
 
 import zuul.lib.log_streamer
 import tests.base
@@ -24,8 +27,6 @@
 
 class TestLogStreamer(tests.base.BaseTestCase):
 
-    log = logging.getLogger("zuul.test.cloner")
-
     def setUp(self):
         super(TestLogStreamer, self).setUp()
         self.host = '0.0.0.0'
@@ -51,3 +52,101 @@
         self.addCleanup(s.close)
         self.assertNotEqual(0, s.connect_ex((self.host, port)))
         s.close()
+
+
+class TestStreaming(tests.base.AnsibleZuulTestCase):
+
+    tenant_config_file = 'config/streamer/main.yaml'
+
+    def setUp(self):
+        super(TestStreaming, self).setUp()
+        self.host = '0.0.0.0'
+        self.streamer = None
+        self.stop_streamer = False
+        self.streaming_data = ''
+
+    def stopStreamer(self):
+        self.stop_streamer = True
+
+    def startStreamer(self, port, build_uuid, root=None):
+        if not root:
+            root = tempfile.gettempdir()
+        self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
+                                                          port, root)
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect((self.host, port))
+        self.addCleanup(s.close)
+
+        req = '%s\n' % build_uuid
+        s.sendall(req.encode('utf-8'))
+
+        while not self.stop_streamer:
+            data = s.recv(2048)
+            if not data:
+                break
+            self.streaming_data += data.decode('utf-8')
+
+        s.shutdown(socket.SHUT_RDWR)
+        s.close()
+        self.streamer.stop()
+
+    def test_streaming(self):
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        # We don't have any real synchronization for the ansible jobs, so
+        # just wait until we get our running build.
+        while not len(self.builds):
+            time.sleep(0.1)
+        build = self.builds[0]
+        self.assertEqual(build.name, 'python27')
+
+        build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+        while not os.path.exists(build_dir):
+            time.sleep(0.1)
+
+        # Need to wait to make sure that jobdir gets set
+        while build.jobdir is None:
+            time.sleep(0.1)
+            build = self.builds[0]
+
+        # Wait for the job to begin running and create the ansible log file.
+        # The job waits to complete until the flag file exists, so we can
+        # safely access the log here. We only open it (to force a file handle
+        # to be kept open for it after the job finishes) but wait to read the
+        # contents until the job is done.
+        ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+        while not os.path.exists(ansible_log):
+            time.sleep(0.1)
+        logfile = open(ansible_log, 'r')
+        self.addCleanup(logfile.close)
+
+        # Create a thread to stream the log. We need this to be happening
+        # before we create the flag file to tell the job to complete.
+        port = 7901
+        streamer_thread = threading.Thread(
+            target=self.startStreamer,
+            args=(port, build.uuid, self.executor_server.jobdir_root,)
+        )
+        streamer_thread.start()
+        self.addCleanup(self.stopStreamer)
+
+        # Allow the job to complete, which should close the streaming
+        # connection (and terminate the thread) as well since the log file
+        # gets closed/deleted.
+        flag_file = os.path.join(build_dir, 'test_wait')
+        open(flag_file, 'w').close()
+        self.waitUntilSettled()
+        streamer_thread.join()
+
+        # Now that the job is finished, the log file has been closed by the
+        # job and deleted. However, we still have a file handle to it, so we
+        # can make sure that we read the entire contents at this point.
+        file_contents = logfile.readlines()
+        logfile.close()
+
+        # Compact the returned lines into a single string for easy comparison.
+        orig = ''.join(file_contents)
+        self.log.debug("\n\nFile contents: %s\n\n", orig)
+        self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
+        self.assertEqual(orig, self.streaming_data)