Add html based websocket client for console stream

This adds a static html page to zuul-web that a browser can be pointed
to for streaming log files. To leverage this in the status UI the
scheduler sets the build url to this html page adding build uuid,
logfile and optionally a different url for accessing the websocket.

Tobias has to run his websocket streamer on a different domain than
the other things, via proxy things - so the url zuul-web is serving
for the static file isn't the same as what it is for the websocket
from a consumer perspective. So introduce a config variable for
zuul-web that allows setting an explicit url for that. If it's not
set, use the relative path from static/stream.html to console-stream.

Further to not throwing away the finger url retail this as additional
field in status.json. With this a later change to the status ui could
let the user choose between html and finger log streaming.

Co-Authored-By: David Shrewsbury <shrewsbury.dave@gmail.com>
Co-Authored-By: Monty Taylor <mordred@inaugust.com>
Co-Authored-By: Tobias Henkel <tobias.henkel@bmw.de>
Change-Id: I2da7979f448934abe3d41f3a5e50d09004fcccc2
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index a0de922..cc9d181 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -385,6 +385,10 @@
 
      port=9000
 
+**websocket_url**
+  Base URL on which the websocket service is exposed, if different than the
+  base URL of the web app.
+
 Operation
 ~~~~~~~~~
 
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index ac2a779..d9cf839 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2290,10 +2290,14 @@
         self.assertEqual('project-merge', status_jobs[0]['name'])
         # TODO(mordred) pull uuids from self.builds
         self.assertEqual(
+            'static/stream.html?uuid={uuid}&logfile=console.log'.format(
+                uuid=status_jobs[0]['uuid']),
+            status_jobs[0]['url'])
+        self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
                 hostname=self.executor_server.hostname,
                 uuid=status_jobs[0]['uuid']),
-            status_jobs[0]['url'])
+            status_jobs[0]['finger_url'])
         # TOOD(mordred) configure a success-url on the base job
         self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
@@ -2302,10 +2306,14 @@
             status_jobs[0]['report_url'])
         self.assertEqual('project-test1', status_jobs[1]['name'])
         self.assertEqual(
+            'static/stream.html?uuid={uuid}&logfile=console.log'.format(
+                uuid=status_jobs[1]['uuid']),
+            status_jobs[1]['url'])
+        self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
                 hostname=self.executor_server.hostname,
                 uuid=status_jobs[1]['uuid']),
-            status_jobs[1]['url'])
+            status_jobs[1]['finger_url'])
         self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
                 hostname=self.executor_server.hostname,
@@ -2314,10 +2322,14 @@
 
         self.assertEqual('project-test2', status_jobs[2]['name'])
         self.assertEqual(
+            'static/stream.html?uuid={uuid}&logfile=console.log'.format(
+                uuid=status_jobs[2]['uuid']),
+            status_jobs[2]['url'])
+        self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
                 hostname=self.executor_server.hostname,
                 uuid=status_jobs[2]['uuid']),
-            status_jobs[2]['url'])
+            status_jobs[2]['finger_url'])
         self.assertEqual(
             'finger://{hostname}/{uuid}'.format(
                 hostname=self.executor_server.hostname,
@@ -3607,10 +3619,13 @@
                 self.assertEqual('gate', job['pipeline'])
                 self.assertEqual(False, job['retry'])
                 self.assertEqual(
+                    'static/stream.html?uuid={uuid}&logfile=console.log'
+                    .format(uuid=job['uuid']), job['url'])
+                self.assertEqual(
                     'finger://{hostname}/{uuid}'.format(
                         hostname=self.executor_server.hostname,
                         uuid=job['uuid']),
-                    job['url'])
+                    job['finger_url'])
                 self.assertEqual(2, len(job['worker']))
                 self.assertEqual(False, job['canceled'])
                 self.assertEqual(True, job['voting'])
diff --git a/zuul/model.py b/zuul/model.py
index 1df70db..ef67828 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -164,7 +164,7 @@
             items.extend(shared_queue.queue)
         return items
 
-    def formatStatusJSON(self):
+    def formatStatusJSON(self, websocket_url=None):
         j_pipeline = dict(name=self.name,
                           description=self.description)
         j_queues = []
@@ -181,7 +181,7 @@
                     if j_changes:
                         j_queue['heads'].append(j_changes)
                     j_changes = []
-                j_changes.append(e.formatJSON())
+                j_changes.append(e.formatJSON(websocket_url))
                 if (len(j_changes) > 1 and
                         (j_changes[-2]['remaining_time'] is not None) and
                         (j_changes[-1]['remaining_time'] is not None)):
@@ -1673,7 +1673,7 @@
             url = default_url or build.url or job.name
         return (result, url)
 
-    def formatJSON(self):
+    def formatJSON(self, websocket_url=None):
         ret = {}
         ret['active'] = self.active
         ret['live'] = self.live
@@ -1710,11 +1710,20 @@
             remaining = None
             result = None
             build_url = None
+            finger_url = None
             report_url = None
             worker = None
             if build:
                 result = build.result
-                build_url = build.url
+                finger_url = build.url
+                # TODO(tobiash): add support for custom web root
+                urlformat = 'static/stream.html?' \
+                            'uuid={build.uuid}&' \
+                            'logfile=console.log'
+                if websocket_url:
+                    urlformat += '&websocket_url={websocket_url}'
+                build_url = urlformat.format(
+                    build=build, websocket_url=websocket_url)
                 (unused, report_url) = self.formatJobResult(job)
                 if build.start_time:
                     if build.end_time:
@@ -1740,6 +1749,7 @@
                 'elapsed_time': elapsed,
                 'remaining_time': remaining,
                 'url': build_url,
+                'finger_url': finger_url,
                 'report_url': report_url,
                 'result': result,
                 'voting': job.voting,
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5e7f87..2217b0b 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -889,6 +889,7 @@
         data = {}
 
         data['zuul_version'] = self.zuul_version
+        websocket_url = get_default(self.config, 'web', 'websocket_url', None)
 
         if self._pause:
             ret = '<p><b>Queue only mode:</b> preparing to '
@@ -912,5 +913,5 @@
         data['pipelines'] = pipelines
         tenant = self.abide.tenants.get(tenant_name)
         for pipeline in tenant.layout.pipelines.values():
-            pipelines.append(pipeline.formatStatusJSON())
+            pipelines.append(pipeline.formatStatusJSON(websocket_url))
         return json.dumps(data)
diff --git a/zuul/web.py b/zuul/web/__init__.py
similarity index 93%
rename from zuul/web.py
rename to zuul/web/__init__.py
index ab16e11..faf22b5 100644
--- a/zuul/web.py
+++ b/zuul/web/__init__.py
@@ -18,6 +18,7 @@
 import asyncio
 import json
 import logging
+import os
 import uvloop
 
 import aiohttp
@@ -25,6 +26,8 @@
 
 import zuul.rpcclient
 
+STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
+
 
 class LogStreamingHandler(object):
     log = logging.getLogger("zuul.web.LogStreamingHandler")
@@ -39,11 +42,11 @@
         self.ssl_ca = ssl_ca
 
     def _getPortLocation(self, job_uuid):
-        '''
+        """
         Query Gearman for the executor running the given job.
 
         :param str job_uuid: The job UUID we want to stream.
-        '''
+        """
         # TODO: Fetch the entire list of uuid/file/server/ports once and
         #       share that, and fetch a new list on cache misses perhaps?
         # TODO: Avoid recreating a client for each request.
@@ -55,14 +58,14 @@
         return ret
 
     async def _fingerClient(self, ws, server, port, job_uuid):
-        '''
+        """
         Create a client to connect to the finger streamer and pull results.
 
         :param aiohttp.web.WebSocketResponse ws: The websocket response object.
         :param str server: The executor server running the job.
         :param str port: The executor server port.
         :param str job_uuid: The job UUID to stream.
-        '''
+        """
         self.log.debug("Connecting to finger server %s:%s", server, port)
         reader, writer = await asyncio.open_connection(host=server, port=port,
                                                        loop=self.event_loop)
@@ -82,12 +85,12 @@
                 return
 
     async def _streamLog(self, ws, request):
-        '''
+        """
         Stream the log for the requested job back to the client.
 
         :param aiohttp.web.WebSocketResponse ws: The websocket response object.
         :param dict request: The client request parameters.
-        '''
+        """
         for key in ('uuid', 'logfile'):
             if key not in request:
                 return (4000, "'{key}' missing from request payload".format(
@@ -112,11 +115,11 @@
         return (1000, "No more data")
 
     async def processRequest(self, request):
-        '''
+        """
         Handle a client websocket request for log streaming.
 
         :param aiohttp.web.Request request: The client request.
-        '''
+        """
         try:
             ws = web.WebSocketResponse()
             await ws.prepare(request)
@@ -161,6 +164,8 @@
         self.ssl_key = ssl_key
         self.ssl_cert = ssl_cert
         self.ssl_ca = ssl_ca
+        self.event_loop = None
+        self.term = None
 
     async def _handleWebsocket(self, request):
         handler = LogStreamingHandler(self.event_loop,
@@ -169,7 +174,7 @@
         return await handler.processRequest(request)
 
     def run(self, loop=None):
-        '''
+        """
         Run the websocket daemon.
 
         Because this method can be the target of a new thread, we need to
@@ -178,9 +183,9 @@
         :param loop: The event loop to use. If not supplied, the default main
             thread event loop is used. This should be supplied if ZuulWeb
             is run within a separate (non-main) thread.
-        '''
+        """
         routes = [
-            ('GET', '/console-stream', self._handleWebsocket)
+            ('GET', '/console-stream', self._handleWebsocket),
         ]
 
         self.log.debug("ZuulWeb starting")
@@ -195,6 +200,7 @@
         app = web.Application()
         for method, path, handler in routes:
             app.router.add_route(method, path, handler)
+        app.router.add_static('/static', STATIC_DIR)
         handler = app.make_handler(loop=self.event_loop)
 
         # create the server
@@ -224,7 +230,8 @@
             loop.close()
 
     def stop(self):
-        self.event_loop.call_soon_threadsafe(self.term.set_result, True)
+        if self.event_loop and self.term:
+            self.event_loop.call_soon_threadsafe(self.term.set_result, True)
 
 
 if __name__ == "__main__":
diff --git a/zuul/web/static/stream.html b/zuul/web/static/stream.html
new file mode 100644
index 0000000..dbeb66b
--- /dev/null
+++ b/zuul/web/static/stream.html
@@ -0,0 +1,114 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
+   "http://www.w3.org/TR/html4/strict.dtd">
+<html>
+  <head>
+      <style type="text/css">
+
+        body {
+          font-family: monospace;
+          background-color: black;
+          color: lightgrey;
+        }
+
+        #overlay {
+            position: fixed;
+            top: 5px;
+            right: 5px;
+            background-color: darkgrey;
+            color: black;
+        }
+
+        pre {
+            white-space: pre;
+            margin: 0px 10px;
+        }
+      </style>
+
+    <script type="text/javascript">
+
+      function escapeLog(text) {
+          var pattern = /[<>&"']/g;
+
+          return text.replace(pattern, function(match) {
+              return '&#' + match.charCodeAt(0) + ';';
+          });
+      }
+
+      window.onload = function() {
+
+          pageUpdateInMS = 250;
+          var receiveBuffer = "";
+          var websocket_url = null
+
+          setInterval(function() {
+              console.log("autoScroll");
+              if (receiveBuffer != "") {
+                  document.getElementById('pagecontent').innerHTML += receiveBuffer;
+                  receiveBuffer = "";
+                  if (document.getElementById('autoscroll').checked) {
+                      window.scrollTo(0, document.body.scrollHeight);
+                  }
+              }
+          }, pageUpdateInMS);
+
+          var url = new URL(window.location);
+
+          var params = {
+              uuid: url.searchParams.get('uuid')
+          }
+          document.getElementById('pagetitle').innerHTML = params['uuid'];
+          if (url.searchParams.has('logfile')) {
+              params['logfile'] = url.searchParams.get('logfile');
+              var logfile_suffix = "(" + params['logfile'] + ")";
+              document.getElementById('pagetitle').innerHTML += logfile_suffix;
+          }
+          if (url.searchParams.has('websocket_url')) {
+              params['websocket_url'] = url.searchParams.get('websocket_url');
+          } else {
+              // Websocket doesn't accept relative urls so construct an
+              // absolute one.
+              var protocol = '';
+              if (url['protocol'] == 'https:') {
+                  protocol = 'wss://';
+              } else {
+                  protocol = 'ws://';
+              }
+              path = url['pathname'].replace(/static\/.*$/g, '') + 'console-stream';
+              params['websocket_url'] = protocol + url['host'] + path;
+          }
+          var ws = new WebSocket(params['websocket_url']);
+
+          ws.onmessage = function(event) {
+              console.log("onmessage");
+              receiveBuffer = receiveBuffer + escapeLog(event.data);
+          };
+
+          ws.onopen = function(event) {
+              console.log("onopen");
+              ws.send(JSON.stringify(params));
+          };
+
+          ws.onclose = function(event) {
+              console.log("onclose");
+              receiveBuffer = receiveBuffer + "\n--- END OF STREAM ---\n";
+          };
+
+      };
+
+    </script>
+
+    <title id="pagetitle"></title>
+  </head>
+
+  <body>
+
+    <div id="overlay">
+      <form>
+        <input type="checkbox" id="autoscroll" checked> autoscroll
+      </form>
+    </div>
+
+    <pre id="pagecontent"></pre>
+
+  </body>
+</html>