Add web-based console log streaming

zuul now provides socket-based console streaming, which is super cool.
In order to have jenkins parity with web streaming, we need to provide a
websocket (javascript in browsers can't really connect to random ports
on servers)

After surveying the existing python websocket options, basically all of
them are based around twisted, eventlet, gevent or asyncio. It's not
just a thing we can easily deal with from our current webob/paste
structure, because it is a change to the fundamental HTTP handling.
While we could write our own websocket server implementation that was
threaded like the rest of zuul, that's a pretty giant amount of work.

Instead, we can run an async-based server that's just for the
websockets, so that we're not all of a sudden putting async code into
the rest of zuul and winding up frankensteined. Since this is new code,
using asyncio and python3 seems like an excellent starting place.

aiohttp supports running a websocket server in a thread. It also
supports doing other HTTP/REST calls, so by going aiohttp we can set
ourselves up for a single answer for the HTTP tier.

In order to keep us from being an open socket relay, we'll expect two
parameters as the first message on the websocket - what's the zuul build
uuid, and what log file do we want to stream. (the second thing,
multiple log files, isn't supported yet by the rest of zuul, but one can
imagine a future where we'd like to support that too, so it's in the
protocol) The websocket server will then ask zuul over gearman for the
IP and port associated with the build and logfile and will start
streaming it to the socket.

Ultimately we'll want the status page to make links of the form:

  /console.html?uuid=<uuid>&logfile=console.log

and we'll want to have apache map the websocket server to something like
/console.

Co-Authored-By: Monty Taylor <mordred@inaugust.com>

Change-Id: Idd0d3f9259e81fa9a60d7540664ce8d5ad2c298f
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index 2909ea6..4685f71 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -29,6 +29,10 @@
 trusted_ro_dirs=/opt/zuul-scripts:/var/cache
 trusted_rw_dirs=/opt/zuul-logs
 
+[web]
+listen_address=127.0.0.1
+port=9000
+
 [webapp]
 listen_address=0.0.0.0
 port=8001
diff --git a/requirements.txt b/requirements.txt
index 5caa1b5..69509d0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,3 +24,5 @@
 cachecontrol
 pyjwt
 iso8601
+aiohttp
+uvloop;python_version>='3.5'
diff --git a/setup.cfg b/setup.cfg
index 0d22cb1..ce7a40e 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -26,6 +26,7 @@
     zuul-cloner = zuul.cmd.cloner:main
     zuul-executor = zuul.cmd.executor:main
     zuul-bwrap = zuul.driver.bubblewrap:main
+    zuul-web = zuul.cmd.web:main
 
 [build_sphinx]
 source-dir = doc/source
diff --git a/tests/base.py b/tests/base.py
index ff1f531..bc6fea8 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1226,7 +1226,6 @@
         self.build_history = []
         self.fail_tests = {}
         self.job_builds = {}
-        self.hostname = 'zl.example.com'
 
     def failJob(self, name, change):
         """Instruct the executor to report matching builds as failures.
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index fcfaf5d..8d9d127 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -121,7 +121,8 @@
         self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
         self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
         self.assertEqual(
-            'finger://zl.example.com/{uuid}'.format(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
                 uuid=buildset0_builds[0]['uuid']),
             buildset0_builds[0]['log_url'])
         self.assertEqual('check', buildset1['pipeline'])
@@ -144,7 +145,8 @@
         self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
         self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
         self.assertEqual(
-            'finger://zl.example.com/{uuid}'.format(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
                 uuid=buildset1_builds[-2]['uuid']),
             buildset1_builds[-2]['log_url'])
 
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py
index b0ef2c2..f47a8c8 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_log_streamer.py
@@ -14,6 +14,10 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import aiohttp
+import asyncio
+import logging
+import json
 import os
 import os.path
 import socket
@@ -21,6 +25,7 @@
 import threading
 import time
 
+import zuul.web
 import zuul.lib.log_streamer
 import tests.base
 
@@ -57,6 +62,7 @@
 class TestStreaming(tests.base.AnsibleZuulTestCase):
 
     tenant_config_file = 'config/streamer/main.yaml'
+    log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
 
     def setUp(self):
         super(TestStreaming, self).setUp()
@@ -146,9 +152,116 @@
         # job and deleted. However, we still have a file handle to it, so we
         # can make sure that we read the entire contents at this point.
         # Compact the returned lines into a single string for easy comparison.
-        file_contents = ''.join(logfile.readlines())
+        file_contents = logfile.read()
         logfile.close()
 
         self.log.debug("\n\nFile contents: %s\n\n", file_contents)
         self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
         self.assertEqual(file_contents, self.streaming_data)
+
+    def runWSClient(self, build_uuid, event):
+        async def client(loop, build_uuid, event):
+            uri = 'http://127.0.0.1:9000/console-stream'
+            try:
+                session = aiohttp.ClientSession(loop=loop)
+                async with session.ws_connect(uri) as ws:
+                    req = {'uuid': build_uuid, 'logfile': None}
+                    ws.send_str(json.dumps(req))
+                    event.set()  # notify we are connected and req sent
+                    async for msg in ws:
+                        if msg.type == aiohttp.WSMsgType.TEXT:
+                            self.ws_client_results += msg.data
+                        elif msg.type == aiohttp.WSMsgType.CLOSED:
+                            break
+                        elif msg.type == aiohttp.WSMsgType.ERROR:
+                            break
+                session.close()
+            except Exception as e:
+                self.log.exception("client exception:")
+
+        loop = asyncio.new_event_loop()
+        loop.set_debug(True)
+        loop.run_until_complete(client(loop, build_uuid, event))
+        loop.close()
+
+    def test_websocket_streaming(self):
+        # Need to set the streaming port before submitting the job
+        finger_port = 7902
+        self.executor_server.log_streaming_port = finger_port
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        # We don't have any real synchronization for the ansible jobs, so
+        # just wait until we get our running build.
+        while not len(self.builds):
+            time.sleep(0.1)
+        build = self.builds[0]
+        self.assertEqual(build.name, 'python27')
+
+        build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+        while not os.path.exists(build_dir):
+            time.sleep(0.1)
+
+        # Need to wait to make sure that jobdir gets set
+        while build.jobdir is None:
+            time.sleep(0.1)
+            build = self.builds[0]
+
+        # Wait for the job to begin running and create the ansible log file.
+        # The job waits to complete until the flag file exists, so we can
+        # safely access the log here. We only open it (to force a file handle
+        # to be kept open for it after the job finishes) but wait to read the
+        # contents until the job is done.
+        ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+        while not os.path.exists(ansible_log):
+            time.sleep(0.1)
+        logfile = open(ansible_log, 'r')
+        self.addCleanup(logfile.close)
+
+        # Start the finger streamer daemon
+        streamer = zuul.lib.log_streamer.LogStreamer(
+            None, self.host, finger_port, self.executor_server.jobdir_root)
+        self.addCleanup(streamer.stop)
+
+        # Start the web server
+        web_server = zuul.web.ZuulWeb(
+            listen_address='127.0.0.1', listen_port=9000,
+            gear_server='127.0.0.1', gear_port=self.gearman_server.port)
+        loop = asyncio.new_event_loop()
+        loop.set_debug(True)
+        ws_thread = threading.Thread(target=web_server.run, args=(loop,))
+        ws_thread.start()
+        self.addCleanup(loop.close)
+        self.addCleanup(ws_thread.join)
+        self.addCleanup(web_server.stop)
+
+        # Wait until web server is started
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            while s.connect_ex((self.host, 9000)):
+                time.sleep(0.1)
+
+        # Start a thread with the websocket client
+        ws_client_event = threading.Event()
+        self.ws_client_results = ''
+        ws_client_thread = threading.Thread(
+            target=self.runWSClient, args=(build.uuid, ws_client_event)
+        )
+        ws_client_thread.start()
+        ws_client_event.wait()
+
+        # Allow the job to complete
+        flag_file = os.path.join(build_dir, 'test_wait')
+        open(flag_file, 'w').close()
+
+        # Wait for the websocket client to complete, which it should when
+        # it's received the full log.
+        ws_client_thread.join()
+
+        self.waitUntilSettled()
+
+        file_contents = logfile.read()
+        logfile.close()
+        self.log.debug("\n\nFile contents: %s\n\n", file_contents)
+        self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
+        self.assertEqual(file_contents, self.ws_client_results)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index e402342..c3cbf6d 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2289,22 +2289,40 @@
                             status_jobs.append(job)
         self.assertEqual('project-merge', status_jobs[0]['name'])
         # TODO(mordred) pull uuids from self.builds
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
-                         status_jobs[0]['url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[0]['uuid']),
+            status_jobs[0]['url'])
         # TOOD(mordred) configure a success-url on the base job
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
-                         status_jobs[0]['report_url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[0]['uuid']),
+            status_jobs[0]['report_url'])
         self.assertEqual('project-test1', status_jobs[1]['name'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
-                         status_jobs[1]['url'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
-                         status_jobs[1]['report_url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[1]['uuid']),
+            status_jobs[1]['url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[1]['uuid']),
+            status_jobs[1]['report_url'])
 
         self.assertEqual('project-test2', status_jobs[2]['name'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
-                         status_jobs[2]['url'])
-        self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
-                         status_jobs[2]['report_url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[2]['uuid']),
+            status_jobs[2]['url'])
+        self.assertEqual(
+            'finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
+                uuid=status_jobs[2]['uuid']),
+            status_jobs[2]['report_url'])
 
     def test_live_reconfiguration(self):
         "Test that live reconfiguration works"
@@ -3577,8 +3595,11 @@
                 self.assertEqual('project-merge', job['name'])
                 self.assertEqual('gate', job['pipeline'])
                 self.assertEqual(False, job['retry'])
-                self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
-                                 job['url'])
+                self.assertEqual(
+                    'finger://{hostname}/{uuid}'.format(
+                        hostname=self.executor_server.hostname,
+                        uuid=job['uuid']),
+                    job['url'])
                 self.assertEqual(2, len(job['worker']))
                 self.assertEqual(False, job['canceled'])
                 self.assertEqual(True, job['voting'])
@@ -4674,7 +4695,8 @@
 
         # NOTE: This default URL is currently hard-coded in executor/server.py
         self.assertIn(
-            '- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
+            '- docs-draft-test2 finger://{hostname}/{uuid}'.format(
+                hostname=self.executor_server.hostname,
                 uuid=uuid_test2),
             body[3])
 
diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py
new file mode 100755
index 0000000..9869a2c
--- /dev/null
+++ b/zuul/cmd/web.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import asyncio
+import daemon
+import extras
+import logging
+import signal
+import sys
+import threading
+
+import zuul.cmd
+import zuul.web
+
+from zuul.lib.config import get_default
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+
+class WebServer(zuul.cmd.ZuulApp):
+
+    def parse_arguments(self):
+        parser = argparse.ArgumentParser(description='Zuul Web Server.')
+        parser.add_argument('-c', dest='config',
+                            help='specify the config file')
+        parser.add_argument('-d', dest='nodaemon', action='store_true',
+                            help='do not run as a daemon')
+        parser.add_argument('--version', dest='version', action='version',
+                            version=self._get_version(),
+                            help='show zuul version')
+        self.args = parser.parse_args()
+
+    def exit_handler(self, signum, frame):
+        self.web.stop()
+
+    def _main(self):
+        params = dict()
+
+        params['listen_address'] = get_default(self.config,
+                                               'web', 'listen_address',
+                                               '127.0.0.1')
+        params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
+        params['gear_server'] = get_default(self.config, 'gearman', 'server')
+        params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
+        params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
+        params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
+        params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
+
+        try:
+            self.web = zuul.web.ZuulWeb(**params)
+        except Exception as e:
+            self.log.exception("Error creating ZuulWeb:")
+            sys.exit(1)
+
+        loop = asyncio.get_event_loop()
+        signal.signal(signal.SIGUSR1, self.exit_handler)
+        signal.signal(signal.SIGTERM, self.exit_handler)
+
+        self.log.info('Zuul Web Server starting')
+        self.thread = threading.Thread(target=self.web.run,
+                                       args=(loop,),
+                                       name='web')
+        self.thread.start()
+
+        try:
+            signal.pause()
+        except KeyboardInterrupt:
+            print("Ctrl + C: asking web server to exit nicely...\n")
+            self.exit_handler(signal.SIGINT, None)
+
+        self.thread.join()
+        loop.stop()
+        loop.close()
+        self.log.info("Zuul Web Server stopped")
+
+    def main(self):
+        self.setup_logging('web', 'log_config')
+        self.log = logging.getLogger("zuul.WebServer")
+
+        try:
+            self._main()
+        except Exception:
+            self.log.exception("Exception from WebServer:")
+
+
+def main():
+    server = WebServer()
+    server.parse_arguments()
+    server.read_config()
+
+    pid_fn = get_default(server.config, 'web', 'pidfile',
+                         '/var/run/zuul-web/zuul-web.pid', expand_user=True)
+
+    pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+    if server.args.nodaemon:
+        server.main()
+    else:
+        with daemon.DaemonContext(pidfile=pid):
+            server.main()
diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py
index c76b057..a1c3aeb 100644
--- a/zuul/lib/log_streamer.py
+++ b/zuul/lib/log_streamer.py
@@ -15,6 +15,7 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import logging
 import os
 import os.path
 import pwd
@@ -210,6 +211,8 @@
     '''
 
     def __init__(self, user, host, port, jobdir_root):
+        self.log = logging.getLogger('zuul.lib.LogStreamer')
+        self.log.debug("LogStreamer starting on port %s", port)
         self.server = CustomForkingTCPServer((host, port),
                                              RequestHandler,
                                              user=user,
@@ -225,3 +228,4 @@
         if self.thd.isAlive():
             self.server.shutdown()
             self.server.server_close()
+            self.log.debug("LogStreamer stopped")
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 6f0d34b..fd3517f 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -86,3 +86,11 @@
 
     def shutdown(self):
         self.gearman.shutdown()
+
+    def get_job_log_stream_address(self, uuid, logfile='console.log'):
+        data = {'uuid': uuid, 'logfile': logfile}
+        job = self.submitJob('zuul:get_job_log_stream_address', data)
+        if job.failure:
+            return False
+        else:
+            return json.loads(job.data[0])
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index be3b7d1..6543c91 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -53,6 +53,7 @@
         self.worker.registerFunction("zuul:enqueue_ref")
         self.worker.registerFunction("zuul:promote")
         self.worker.registerFunction("zuul:get_running_jobs")
+        self.worker.registerFunction("zuul:get_job_log_stream_address")
 
     def stop(self):
         self.log.debug("Stopping")
@@ -173,3 +174,29 @@
                         running_items.append(item.formatJSON())
 
         job.sendWorkComplete(json.dumps(running_items))
+
+    def handle_get_job_log_stream_address(self, job):
+        # TODO: map log files to ports. Currently there is only one
+        #       log stream for a given job. But many jobs produce many
+        #       log files, so this is forwards compatible with a future
+        #       where there are more logs to potentially request than
+        #       "console.log"
+        def find_build(uuid):
+            for tenant in self.sched.abide.tenants.values():
+                for pipeline_name, pipeline in tenant.layout.pipelines.items():
+                    for queue in pipeline.queues:
+                        for item in queue.queue:
+                            for bld in item.current_build_set.getBuilds():
+                                if bld.uuid == uuid:
+                                    return bld
+            return None
+
+        args = json.loads(job.arguments)
+        uuid = args['uuid']
+        # TODO: logfile = args['logfile']
+        job_log_stream_address = {}
+        build = find_build(uuid)
+        if build:
+            job_log_stream_address['server'] = build.worker.hostname
+            job_log_stream_address['port'] = build.worker.log_port
+        job.sendWorkComplete(json.dumps(job_log_stream_address))
diff --git a/zuul/web.py b/zuul/web.py
new file mode 100644
index 0000000..2ef65fe
--- /dev/null
+++ b/zuul/web.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python
+# Copyright (c) 2017 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import json
+import logging
+import uvloop
+
+import aiohttp
+from aiohttp import web
+
+import zuul.rpcclient
+
+
+class LogStreamingHandler(object):
+    log = logging.getLogger("zuul.web.LogStreamingHandler")
+
+    def __init__(self, loop, gear_server, gear_port,
+                 ssl_key=None, ssl_cert=None, ssl_ca=None):
+        self.event_loop = loop
+        self.gear_server = gear_server
+        self.gear_port = gear_port
+        self.ssl_key = ssl_key
+        self.ssl_cert = ssl_cert
+        self.ssl_ca = ssl_ca
+
+    def _getPortLocation(self, job_uuid):
+        '''
+        Query Gearman for the executor running the given job.
+
+        :param str job_uuid: The job UUID we want to stream.
+        '''
+        # TODO: Fetch the entire list of uuid/file/server/ports once and
+        #       share that, and fetch a new list on cache misses perhaps?
+        # TODO: Avoid recreating a client for each request.
+        rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
+                                       self.ssl_key, self.ssl_cert,
+                                       self.ssl_ca)
+        ret = rpc.get_job_log_stream_address(job_uuid)
+        rpc.shutdown()
+        return ret
+
+    async def _fingerClient(self, ws, server, port, job_uuid):
+        '''
+        Create a client to connect to the finger streamer and pull results.
+
+        :param aiohttp.web.WebSocketResponse ws: The websocket response object.
+        :param str server: The executor server running the job.
+        :param str port: The executor server port.
+        :param str job_uuid: The job UUID to stream.
+        '''
+        self.log.debug("Connecting to finger server %s:%s", server, port)
+        reader, writer = await asyncio.open_connection(host=server, port=port,
+                                                       loop=self.event_loop)
+
+        self.log.debug("Sending finger request for %s", job_uuid)
+        msg = "%s\n" % job_uuid    # Must have a trailing newline!
+
+        writer.write(msg.encode('utf8'))
+        await writer.drain()
+
+        while True:
+            data = await reader.read(1024)
+            if data:
+                await ws.send_str(data.decode('utf8'))
+            else:
+                writer.close()
+                return
+
+    async def _streamLog(self, ws, request):
+        '''
+        Stream the log for the requested job back to the client.
+
+        :param aiohttp.web.WebSocketResponse ws: The websocket response object.
+        :param dict request: The client request parameters.
+        '''
+        for key in ('uuid', 'logfile'):
+            if key not in request:
+                return (4000, "'{key}' missing from request payload".format(
+                        key=key))
+
+        # Schedule the blocking gearman work in an Executor
+        gear_task = self.event_loop.run_in_executor(
+            None, self._getPortLocation, request['uuid'])
+
+        try:
+            port_location = await asyncio.wait_for(gear_task, 10)
+        except asyncio.TimeoutError:
+            return (4010, "Gearman timeout")
+
+        if not port_location:
+            return (4011, "Error with Gearman")
+
+        await self._fingerClient(
+            ws, port_location['server'], port_location['port'], request['uuid']
+        )
+
+        return (1000, "No more data")
+
+    async def processRequest(self, request):
+        '''
+        Handle a client websocket request for log streaming.
+
+        :param aiohttp.web.Request request: The client request.
+        '''
+        try:
+            ws = web.WebSocketResponse()
+            await ws.prepare(request)
+            async for msg in ws:
+                if msg.type == aiohttp.WSMsgType.TEXT:
+                    req = json.loads(msg.data)
+                    self.log.debug("Websocket request: %s", req)
+                    code, msg = await self._streamLog(ws, req)
+
+                    # We expect to process only a single message. I.e., we
+                    # can stream only a single file at a time.
+                    await ws.close(code=code, message=msg)
+                    break
+                elif msg.type == aiohttp.WSMsgType.ERROR:
+                    self.log.error(
+                        "Websocket connection closed with exception %s",
+                        ws.exception()
+                    )
+                    break
+                elif msg.type == aiohttp.WSMsgType.CLOSED:
+                    break
+        except Exception as e:
+            self.log.exception("Websocket exception:")
+            await ws.close(code=4009, message=str(e).encode('utf-8'))
+        return ws
+
+
+class ZuulWeb(object):
+
+    log = logging.getLogger("zuul.web.ZuulWeb")
+
+    def __init__(self, listen_address, listen_port,
+                 gear_server, gear_port,
+                 ssl_key=None, ssl_cert=None, ssl_ca=None):
+        self.listen_address = listen_address
+        self.listen_port = listen_port
+        self.gear_server = gear_server
+        self.gear_port = gear_port
+        self.ssl_key = ssl_key
+        self.ssl_cert = ssl_cert
+        self.ssl_ca = ssl_ca
+
+    async def _handleWebsocket(self, request):
+        handler = LogStreamingHandler(self.event_loop,
+                                      self.gear_server, self.gear_port,
+                                      self.ssl_key, self.ssl_cert, self.ssl_ca)
+        return await handler.processRequest(request)
+
+    def run(self, loop=None):
+        '''
+        Run the websocket daemon.
+
+        Because this method can be the target of a new thread, we need to
+        set the thread event loop here, rather than in __init__().
+
+        :param loop: The event loop to use. If not supplied, the default main
+            thread event loop is used. This should be supplied if ZuulWeb
+            is run within a separate (non-main) thread.
+        '''
+        routes = [
+            ('GET', '/console-stream', self._handleWebsocket)
+        ]
+
+        self.log.debug("ZuulWeb starting")
+        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
+        user_supplied_loop = loop is not None
+        if not loop:
+            loop = asyncio.get_event_loop()
+        asyncio.set_event_loop(loop)
+
+        self.event_loop = loop
+
+        app = web.Application()
+        for method, path, handler in routes:
+            app.router.add_route(method, path, handler)
+        handler = app.make_handler(loop=self.event_loop)
+
+        # create the server
+        coro = self.event_loop.create_server(handler,
+                                             self.listen_address,
+                                             self.listen_port)
+        self.server = self.event_loop.run_until_complete(coro)
+
+        self.term = asyncio.Future()
+
+        # start the server
+        self.event_loop.run_until_complete(self.term)
+
+        # cleanup
+        self.log.debug("ZuulWeb stopping")
+        self.server.close()
+        self.event_loop.run_until_complete(self.server.wait_closed())
+        self.event_loop.run_until_complete(app.shutdown())
+        self.event_loop.run_until_complete(handler.shutdown(60.0))
+        self.event_loop.run_until_complete(app.cleanup())
+        self.log.debug("ZuulWeb stopped")
+
+        # Only run these if we are controlling the loop - they need to be
+        # run from the main thread
+        if not user_supplied_loop:
+            loop.stop()
+            loop.close()
+
+    def stop(self):
+        self.event_loop.call_soon_threadsafe(self.term.set_result, True)
+
+
+if __name__ == "__main__":
+    logging.basicConfig(level=logging.DEBUG)
+    loop = asyncio.get_event_loop()
+    loop.set_debug(True)
+    z = ZuulWeb()
+    z.run(loop)