Add log streaming test
Test that our finger log streamer will correctly find the log file
and stream its contents to a client.
Change-Id: Ia153e2d436855dcce256d5fc91d0fd4c47116042
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py
index 3ea5a8e..f422e97 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_log_streamer.py
@@ -14,9 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
+import os
+import os.path
import socket
import tempfile
+import threading
+import time
import zuul.lib.log_streamer
import tests.base
@@ -24,8 +27,6 @@
class TestLogStreamer(tests.base.BaseTestCase):
- log = logging.getLogger("zuul.test.cloner")
-
def setUp(self):
super(TestLogStreamer, self).setUp()
self.host = '0.0.0.0'
@@ -51,3 +52,101 @@
self.addCleanup(s.close)
self.assertNotEqual(0, s.connect_ex((self.host, port)))
s.close()
+
+
+class TestStreaming(tests.base.AnsibleZuulTestCase):
+
+ tenant_config_file = 'config/streamer/main.yaml'
+
+ def setUp(self):
+ super(TestStreaming, self).setUp()
+ self.host = '0.0.0.0'
+ self.streamer = None
+ self.stop_streamer = False
+ self.streaming_data = ''
+
+ def stopStreamer(self):
+ self.stop_streamer = True
+
+ def startStreamer(self, port, build_uuid, root=None):
+ if not root:
+ root = tempfile.gettempdir()
+ self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
+ port, root)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((self.host, port))
+ self.addCleanup(s.close)
+
+ req = '%s\n' % build_uuid
+ s.sendall(req.encode('utf-8'))
+
+ while not self.stop_streamer:
+ data = s.recv(2048)
+ if not data:
+ break
+ self.streaming_data += data.decode('utf-8')
+
+ s.shutdown(socket.SHUT_RDWR)
+ s.close()
+ self.streamer.stop()
+
+ def test_streaming(self):
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+ # We don't have any real synchronization for the ansible jobs, so
+ # just wait until we get our running build.
+ while not len(self.builds):
+ time.sleep(0.1)
+ build = self.builds[0]
+ self.assertEqual(build.name, 'python27')
+
+ build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+ while not os.path.exists(build_dir):
+ time.sleep(0.1)
+
+ # Need to wait to make sure that jobdir gets set
+ while build.jobdir is None:
+ time.sleep(0.1)
+ build = self.builds[0]
+
+ # Wait for the job to begin running and create the ansible log file.
+ # The job waits to complete until the flag file exists, so we can
+ # safely access the log here. We only open it (to force a file handle
+ # to be kept open for it after the job finishes) but wait to read the
+ # contents until the job is done.
+ ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+ while not os.path.exists(ansible_log):
+ time.sleep(0.1)
+ logfile = open(ansible_log, 'r')
+ self.addCleanup(logfile.close)
+
+ # Create a thread to stream the log. We need this to be happening
+ # before we create the flag file to tell the job to complete.
+ port = 7901
+ streamer_thread = threading.Thread(
+ target=self.startStreamer,
+ args=(port, build.uuid, self.executor_server.jobdir_root,)
+ )
+ streamer_thread.start()
+ self.addCleanup(self.stopStreamer)
+
+ # Allow the job to complete, which should close the streaming
+ # connection (and terminate the thread) as well since the log file
+ # gets closed/deleted.
+ flag_file = os.path.join(build_dir, 'test_wait')
+ open(flag_file, 'w').close()
+ self.waitUntilSettled()
+ streamer_thread.join()
+
+ # Now that the job is finished, the log file has been closed by the
+ # job and deleted. However, we still have a file handle to it, so we
+ # can make sure that we read the entire contents at this point.
+ file_contents = logfile.readlines()
+ logfile.close()
+
+ # Compact the returned lines into a single string for easy comparison.
+ orig = ''.join(file_contents)
+ self.log.debug("\n\nFile contents: %s\n\n", orig)
+ self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
+ self.assertEqual(orig, self.streaming_data)