Add web-based console log streaming

zuul now provides socket-based console streaming, which is super cool.
In order to have jenkins parity with web streaming, we need to provide a
websocket (javascript in browsers can't really connect to random ports
on servers)

After surveying the existing python websocket options, basically all of
them are based around twisted, eventlet, gevent or asyncio. It's not
just a thing we can easily deal with from our current webob/paste
structure, because it is a change to the fundamental HTTP handling.
While we could write our own websocket server implementation that was
threaded like the rest of zuul, that's a pretty giant amount of work.

Instead, we can run an async-based server that's just for the
websockets, so that we're not all of a sudden putting async code into
the rest of zuul and winding up frankensteined. Since this is new code,
using asyncio and python3 seems like an excellent starting place.

aiohttp supports running a websocket server in a thread. It also
supports doing other HTTP/REST calls, so by going aiohttp we can set
ourselves up for a single answer for the HTTP tier.

In order to keep us from being an open socket relay, we'll expect two
parameters as the first message on the websocket - what's the zuul build
uuid, and what log file do we want to stream. (the second thing,
multiple log files, isn't supported yet by the rest of zuul, but one can
imagine a future where we'd like to support that too, so it's in the
protocol) The websocket server will then ask zuul over gearman for the
IP and port associated with the build and logfile and will start
streaming it to the socket.

Ultimately we'll want the status page to make links of the form:

  /console.html?uuid=<uuid>&logfile=console.log

and we'll want to have apache map the websocket server to something like
/console.

Co-Authored-By: Monty Taylor <mordred@inaugust.com>

Change-Id: Idd0d3f9259e81fa9a60d7540664ce8d5ad2c298f
diff --git a/tests/base.py b/tests/base.py
index ff1f531..bc6fea8 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1226,7 +1226,6 @@
         self.build_history = []
         self.fail_tests = {}
         self.job_builds = {}
-        self.hostname = 'zl.example.com'
 
     def failJob(self, name, change):
         """Instruct the executor to report matching builds as failures.
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index fcfaf5d..8d9d127 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -121,7 +121,8 @@
         self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
         self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
         self.assertEqual(
-            'finger://zl.example.com/{uuid}'.format(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
                 uuid=buildset0_builds[0]['uuid']),
             buildset0_builds[0]['log_url'])
         self.assertEqual('check', buildset1['pipeline'])
@@ -144,7 +145,8 @@
         self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
         self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
         self.assertEqual(
-            'finger://zl.example.com/{uuid}'.format(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
                 uuid=buildset1_builds[-2]['uuid']),
             buildset1_builds[-2]['log_url'])
 
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py
index b0ef2c2..f47a8c8 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_log_streamer.py
@@ -14,6 +14,10 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import aiohttp
+import asyncio
+import logging
+import json
 import os
 import os.path
 import socket
@@ -21,6 +25,7 @@
 import threading
 import time
 
+import zuul.web
 import zuul.lib.log_streamer
 import tests.base
 
@@ -57,6 +62,7 @@
 class TestStreaming(tests.base.AnsibleZuulTestCase):
 
     tenant_config_file = 'config/streamer/main.yaml'
+    log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
 
     def setUp(self):
         super(TestStreaming, self).setUp()
@@ -146,9 +152,116 @@
         # job and deleted. However, we still have a file handle to it, so we
         # can make sure that we read the entire contents at this point.
         # Compact the returned lines into a single string for easy comparison.
-        file_contents = ''.join(logfile.readlines())
+        file_contents = logfile.read()
         logfile.close()
 
         self.log.debug("\n\nFile contents: %s\n\n", file_contents)
         self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
         self.assertEqual(file_contents, self.streaming_data)
+
+    def runWSClient(self, build_uuid, event):
+        async def client(loop, build_uuid, event):
+            uri = 'http://127.0.0.1:9000/console-stream'
+            try:
+                session = aiohttp.ClientSession(loop=loop)
+                async with session.ws_connect(uri) as ws:
+                    req = {'uuid': build_uuid, 'logfile': None}
+                    ws.send_str(json.dumps(req))
+                    event.set()  # notify we are connected and req sent
+                    async for msg in ws:
+                        if msg.type == aiohttp.WSMsgType.TEXT:
+                            self.ws_client_results += msg.data
+                        elif msg.type == aiohttp.WSMsgType.CLOSED:
+                            break
+                        elif msg.type == aiohttp.WSMsgType.ERROR:
+                            break
+                session.close()
+            except Exception as e:
+                self.log.exception("client exception:")
+
+        loop = asyncio.new_event_loop()
+        loop.set_debug(True)
+        loop.run_until_complete(client(loop, build_uuid, event))
+        loop.close()
+
+    def test_websocket_streaming(self):
+        # Need to set the streaming port before submitting the job
+        finger_port = 7902
+        self.executor_server.log_streaming_port = finger_port
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        # We don't have any real synchronization for the ansible jobs, so
+        # just wait until we get our running build.
+        while not len(self.builds):
+            time.sleep(0.1)
+        build = self.builds[0]
+        self.assertEqual(build.name, 'python27')
+
+        build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+        while not os.path.exists(build_dir):
+            time.sleep(0.1)
+
+        # Need to wait to make sure that jobdir gets set
+        while build.jobdir is None:
+            time.sleep(0.1)
+            build = self.builds[0]
+
+        # Wait for the job to begin running and create the ansible log file.
+        # The job waits to complete until the flag file exists, so we can
+        # safely access the log here. We only open it (to force a file handle
+        # to be kept open for it after the job finishes) but wait to read the
+        # contents until the job is done.
+        ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+        while not os.path.exists(ansible_log):
+            time.sleep(0.1)
+        logfile = open(ansible_log, 'r')
+        self.addCleanup(logfile.close)
+
+        # Start the finger streamer daemon
+        streamer = zuul.lib.log_streamer.LogStreamer(
+            None, self.host, finger_port, self.executor_server.jobdir_root)
+        self.addCleanup(streamer.stop)
+
+        # Start the web server
+        web_server = zuul.web.ZuulWeb(
+            listen_address='127.0.0.1', listen_port=9000,
+            gear_server='127.0.0.1', gear_port=self.gearman_server.port)
+        loop = asyncio.new_event_loop()
+        loop.set_debug(True)
+        ws_thread = threading.Thread(target=web_server.run, args=(loop,))
+        ws_thread.start()
+        self.addCleanup(loop.close)
+        self.addCleanup(ws_thread.join)
+        self.addCleanup(web_server.stop)
+
+        # Wait until web server is started
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            while s.connect_ex((self.host, 9000)):
+                time.sleep(0.1)
+
+        # Start a thread with the websocket client
+        ws_client_event = threading.Event()
+        self.ws_client_results = ''
+        ws_client_thread = threading.Thread(
+            target=self.runWSClient, args=(build.uuid, ws_client_event)
+        )
+        ws_client_thread.start()
+        ws_client_event.wait()
+
+        # Allow the job to complete
+        flag_file = os.path.join(build_dir, 'test_wait')
+        open(flag_file, 'w').close()
+
+        # Wait for the websocket client to complete, which it should when
+        # it's received the full log.
+        ws_client_thread.join()
+
+        self.waitUntilSettled()
+
+        file_contents = logfile.read()
+        logfile.close()
+        self.log.debug("\n\nFile contents: %s\n\n", file_contents)
+        self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
+        self.assertEqual(file_contents, self.ws_client_results)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index e402342..c3cbf6d 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2289,22 +2289,40 @@
                             status_jobs.append(job)
         self.assertEqual('project-merge', status_jobs[0]['name'])
         # TODO(mordred) pull uuids from self.builds
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
-                         status_jobs[0]['url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[0]['uuid']),
+            status_jobs[0]['url'])
         # TOOD(mordred) configure a success-url on the base job
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
-                         status_jobs[0]['report_url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[0]['uuid']),
+            status_jobs[0]['report_url'])
         self.assertEqual('project-test1', status_jobs[1]['name'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
-                         status_jobs[1]['url'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
-                         status_jobs[1]['report_url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[1]['uuid']),
+            status_jobs[1]['url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[1]['uuid']),
+            status_jobs[1]['report_url'])
 
         self.assertEqual('project-test2', status_jobs[2]['name'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
-                         status_jobs[2]['url'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
-                         status_jobs[2]['report_url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[2]['uuid']),
+            status_jobs[2]['url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[2]['uuid']),
+            status_jobs[2]['report_url'])
 
     def test_live_reconfiguration(self):
         "Test that live reconfiguration works"
@@ -3577,8 +3595,11 @@
                 self.assertEqual('project-merge', job['name'])
                 self.assertEqual('gate', job['pipeline'])
                 self.assertEqual(False, job['retry'])
-                self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
-                                 job['url'])
+                self.assertEqual(
+                    'finger://{hostname}/{uuid}'.format(
+                        hostname=self.executor_server.hostname,
+                        uuid=job['uuid']),
+                    job['url'])
                 self.assertEqual(2, len(job['worker']))
                 self.assertEqual(False, job['canceled'])
                 self.assertEqual(True, job['voting'])
@@ -4674,7 +4695,8 @@
 
         # NOTE: This default URL is currently hard-coded in executor/server.py
         self.assertIn(
-            '- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
+            '- docs-draft-test2 finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
                 uuid=uuid_test2),
             body[3])