Merge "Add html based websocket client for console stream" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index a0de922..cc9d181 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -385,6 +385,10 @@
port=9000
+**websocket_url**
+ Base URL on which the websocket service is exposed, if different than the
+ base URL of the web app.
+
Operation
~~~~~~~~~
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index ac2a779..d9cf839 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2290,10 +2290,14 @@
self.assertEqual('project-merge', status_jobs[0]['name'])
# TODO(mordred) pull uuids from self.builds
self.assertEqual(
+ 'static/stream.html?uuid={uuid}&logfile=console.log'.format(
+ uuid=status_jobs[0]['uuid']),
+ status_jobs[0]['url'])
+ self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[0]['uuid']),
- status_jobs[0]['url'])
+ status_jobs[0]['finger_url'])
# TOOD(mordred) configure a success-url on the base job
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
@@ -2302,10 +2306,14 @@
status_jobs[0]['report_url'])
self.assertEqual('project-test1', status_jobs[1]['name'])
self.assertEqual(
+ 'static/stream.html?uuid={uuid}&logfile=console.log'.format(
+ uuid=status_jobs[1]['uuid']),
+ status_jobs[1]['url'])
+ self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[1]['uuid']),
- status_jobs[1]['url'])
+ status_jobs[1]['finger_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
@@ -2314,10 +2322,14 @@
self.assertEqual('project-test2', status_jobs[2]['name'])
self.assertEqual(
+ 'static/stream.html?uuid={uuid}&logfile=console.log'.format(
+ uuid=status_jobs[2]['uuid']),
+ status_jobs[2]['url'])
+ self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[2]['uuid']),
- status_jobs[2]['url'])
+ status_jobs[2]['finger_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
@@ -3607,10 +3619,13 @@
self.assertEqual('gate', job['pipeline'])
self.assertEqual(False, job['retry'])
self.assertEqual(
+ 'static/stream.html?uuid={uuid}&logfile=console.log'
+ .format(uuid=job['uuid']), job['url'])
+ self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=job['uuid']),
- job['url'])
+ job['finger_url'])
self.assertEqual(2, len(job['worker']))
self.assertEqual(False, job['canceled'])
self.assertEqual(True, job['voting'])
diff --git a/zuul/model.py b/zuul/model.py
index 1df70db..ef67828 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -164,7 +164,7 @@
items.extend(shared_queue.queue)
return items
- def formatStatusJSON(self):
+ def formatStatusJSON(self, websocket_url=None):
j_pipeline = dict(name=self.name,
description=self.description)
j_queues = []
@@ -181,7 +181,7 @@
if j_changes:
j_queue['heads'].append(j_changes)
j_changes = []
- j_changes.append(e.formatJSON())
+ j_changes.append(e.formatJSON(websocket_url))
if (len(j_changes) > 1 and
(j_changes[-2]['remaining_time'] is not None) and
(j_changes[-1]['remaining_time'] is not None)):
@@ -1673,7 +1673,7 @@
url = default_url or build.url or job.name
return (result, url)
- def formatJSON(self):
+ def formatJSON(self, websocket_url=None):
ret = {}
ret['active'] = self.active
ret['live'] = self.live
@@ -1710,11 +1710,20 @@
remaining = None
result = None
build_url = None
+ finger_url = None
report_url = None
worker = None
if build:
result = build.result
- build_url = build.url
+ finger_url = build.url
+ # TODO(tobiash): add support for custom web root
+ urlformat = 'static/stream.html?' \
+ 'uuid={build.uuid}&' \
+ 'logfile=console.log'
+ if websocket_url:
+ urlformat += '&websocket_url={websocket_url}'
+ build_url = urlformat.format(
+ build=build, websocket_url=websocket_url)
(unused, report_url) = self.formatJobResult(job)
if build.start_time:
if build.end_time:
@@ -1740,6 +1749,7 @@
'elapsed_time': elapsed,
'remaining_time': remaining,
'url': build_url,
+ 'finger_url': finger_url,
'report_url': report_url,
'result': result,
'voting': job.voting,
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5e7f87..2217b0b 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -889,6 +889,7 @@
data = {}
data['zuul_version'] = self.zuul_version
+ websocket_url = get_default(self.config, 'web', 'websocket_url', None)
if self._pause:
ret = '<p><b>Queue only mode:</b> preparing to '
@@ -912,5 +913,5 @@
data['pipelines'] = pipelines
tenant = self.abide.tenants.get(tenant_name)
for pipeline in tenant.layout.pipelines.values():
- pipelines.append(pipeline.formatStatusJSON())
+ pipelines.append(pipeline.formatStatusJSON(websocket_url))
return json.dumps(data)
diff --git a/zuul/web.py b/zuul/web/__init__.py
similarity index 93%
rename from zuul/web.py
rename to zuul/web/__init__.py
index ab16e11..faf22b5 100644
--- a/zuul/web.py
+++ b/zuul/web/__init__.py
@@ -18,6 +18,7 @@
import asyncio
import json
import logging
+import os
import uvloop
import aiohttp
@@ -25,6 +26,8 @@
import zuul.rpcclient
+STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
+
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
@@ -39,11 +42,11 @@
self.ssl_ca = ssl_ca
def _getPortLocation(self, job_uuid):
- '''
+ """
Query Gearman for the executor running the given job.
:param str job_uuid: The job UUID we want to stream.
- '''
+ """
# TODO: Fetch the entire list of uuid/file/server/ports once and
# share that, and fetch a new list on cache misses perhaps?
# TODO: Avoid recreating a client for each request.
@@ -55,14 +58,14 @@
return ret
async def _fingerClient(self, ws, server, port, job_uuid):
- '''
+ """
Create a client to connect to the finger streamer and pull results.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str job_uuid: The job UUID to stream.
- '''
+ """
self.log.debug("Connecting to finger server %s:%s", server, port)
reader, writer = await asyncio.open_connection(host=server, port=port,
loop=self.event_loop)
@@ -82,12 +85,12 @@
return
async def _streamLog(self, ws, request):
- '''
+ """
Stream the log for the requested job back to the client.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param dict request: The client request parameters.
- '''
+ """
for key in ('uuid', 'logfile'):
if key not in request:
return (4000, "'{key}' missing from request payload".format(
@@ -112,11 +115,11 @@
return (1000, "No more data")
async def processRequest(self, request):
- '''
+ """
Handle a client websocket request for log streaming.
:param aiohttp.web.Request request: The client request.
- '''
+ """
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
@@ -161,6 +164,8 @@
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
+ self.event_loop = None
+ self.term = None
async def _handleWebsocket(self, request):
handler = LogStreamingHandler(self.event_loop,
@@ -169,7 +174,7 @@
return await handler.processRequest(request)
def run(self, loop=None):
- '''
+ """
Run the websocket daemon.
Because this method can be the target of a new thread, we need to
@@ -178,9 +183,9 @@
:param loop: The event loop to use. If not supplied, the default main
thread event loop is used. This should be supplied if ZuulWeb
is run within a separate (non-main) thread.
- '''
+ """
routes = [
- ('GET', '/console-stream', self._handleWebsocket)
+ ('GET', '/console-stream', self._handleWebsocket),
]
self.log.debug("ZuulWeb starting")
@@ -195,6 +200,7 @@
app = web.Application()
for method, path, handler in routes:
app.router.add_route(method, path, handler)
+ app.router.add_static('/static', STATIC_DIR)
handler = app.make_handler(loop=self.event_loop)
# create the server
@@ -224,7 +230,8 @@
loop.close()
def stop(self):
- self.event_loop.call_soon_threadsafe(self.term.set_result, True)
+ if self.event_loop and self.term:
+ self.event_loop.call_soon_threadsafe(self.term.set_result, True)
if __name__ == "__main__":
diff --git a/zuul/web/static/stream.html b/zuul/web/static/stream.html
new file mode 100644
index 0000000..dbeb66b
--- /dev/null
+++ b/zuul/web/static/stream.html
@@ -0,0 +1,114 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
+ "http://www.w3.org/TR/html4/strict.dtd">
+<html>
+ <head>
+ <style type="text/css">
+
+ body {
+ font-family: monospace;
+ background-color: black;
+ color: lightgrey;
+ }
+
+ #overlay {
+ position: fixed;
+ top: 5px;
+ right: 5px;
+ background-color: darkgrey;
+ color: black;
+ }
+
+ pre {
+ white-space: pre;
+ margin: 0px 10px;
+ }
+ </style>
+
+ <script type="text/javascript">
+
+ function escapeLog(text) {
+ var pattern = /[<>&"']/g;
+
+ return text.replace(pattern, function(match) {
+ return '&#' + match.charCodeAt(0) + ';';
+ });
+ }
+
+ window.onload = function() {
+
+ pageUpdateInMS = 250;
+ var receiveBuffer = "";
+ var websocket_url = null
+
+ setInterval(function() {
+ console.log("autoScroll");
+ if (receiveBuffer != "") {
+ document.getElementById('pagecontent').innerHTML += receiveBuffer;
+ receiveBuffer = "";
+ if (document.getElementById('autoscroll').checked) {
+ window.scrollTo(0, document.body.scrollHeight);
+ }
+ }
+ }, pageUpdateInMS);
+
+ var url = new URL(window.location);
+
+ var params = {
+ uuid: url.searchParams.get('uuid')
+ }
+ document.getElementById('pagetitle').innerHTML = params['uuid'];
+ if (url.searchParams.has('logfile')) {
+ params['logfile'] = url.searchParams.get('logfile');
+ var logfile_suffix = "(" + params['logfile'] + ")";
+ document.getElementById('pagetitle').innerHTML += logfile_suffix;
+ }
+ if (url.searchParams.has('websocket_url')) {
+ params['websocket_url'] = url.searchParams.get('websocket_url');
+ } else {
+ // Websocket doesn't accept relative urls so construct an
+ // absolute one.
+ var protocol = '';
+ if (url['protocol'] == 'https:') {
+ protocol = 'wss://';
+ } else {
+ protocol = 'ws://';
+ }
+ path = url['pathname'].replace(/static\/.*$/g, '') + 'console-stream';
+ params['websocket_url'] = protocol + url['host'] + path;
+ }
+ var ws = new WebSocket(params['websocket_url']);
+
+ ws.onmessage = function(event) {
+ console.log("onmessage");
+ receiveBuffer = receiveBuffer + escapeLog(event.data);
+ };
+
+ ws.onopen = function(event) {
+ console.log("onopen");
+ ws.send(JSON.stringify(params));
+ };
+
+ ws.onclose = function(event) {
+ console.log("onclose");
+ receiveBuffer = receiveBuffer + "\n--- END OF STREAM ---\n";
+ };
+
+ };
+
+ </script>
+
+ <title id="pagetitle"></title>
+ </head>
+
+ <body>
+
+ <div id="overlay">
+ <form>
+ <input type="checkbox" id="autoscroll" checked> autoscroll
+ </form>
+ </div>
+
+ <pre id="pagecontent"></pre>
+
+ </body>
+</html>