Add finger gateway

This adds the zuul-fingergw app that should be run as root (so that
it can connect to the standard finger port 79), but changes user privs
immediately after binding that port.

Common streaming functions have been moved to streamer_utils.py to
be shared among modules.

Support for CommandSocket has been included.

Change-Id: Ia35492fe951e7b9367eeab0b145d96189d72c364
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 86b01ef..68dbf92 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -287,7 +287,7 @@
 
 .. attr:: merger
 
-   ,, attr:: command_socket
+   .. attr:: command_socket
       :default: /var/lib/zuul/merger.socket
 
       Path to command socket file for the merger process.
@@ -627,3 +627,65 @@
 
 To start the web server, run ``zuul-web``.  To stop it, kill the
 PID which was saved in the pidfile specified in the configuration.
+
+Finger Gateway
+--------------
+
+The Zuul finger gateway connects to the standard finger port (79) and listens
+for finger requests specifying a build UUID for which it should stream log
+results. The gateway will determine which executor is currently running that
+build and query that executor for the log stream.
+
+This is intended to be used with the standard finger command line client.
+For example::
+
+    finger UUID@zuul.example.com
+
+The above would stream the logs for the build identified by `UUID`.
+
+Configuration
+~~~~~~~~~~~~~
+
+In addition to the common configuration sections, the following
+sections of ``zuul.conf`` are used by the finger gateway:
+
+.. attr:: fingergw
+
+   .. attr:: command_socket
+      :default: /var/lib/zuul/fingergw.socket
+
+      Path to command socket file for the executor process.
+
+   .. attr:: listen_address
+      :default: all addresses
+
+      IP address or domain name on which to listen.
+
+   .. attr:: log_config
+
+      Path to log config file for the finger gateway process.
+
+   .. attr:: pidfile
+      :default: /var/run/zuul-fingergw/zuul-fingergw.pid
+
+      Path to PID lock file for the finger gateway process.
+
+   .. attr:: port
+      :default: 79
+
+      Port to use for the finger gateway. Note that since command line
+      finger clients cannot usually specify the port, leaving this set to
+      the default value is highly recommended.
+
+   .. attr:: user
+      :default: zuul
+
+      User ID for the zuul-fingergw process. In normal operation as a
+      daemon, the finger gateway should be started as the ``root`` user, but
+      it will drop privileges to this user during startup.
+
+Operation
+~~~~~~~~~
+
+To start the finger gateway, run ``zuul-fingergw``.  To stop it, kill the
+PID which was saved in the pidfile specified in the configuration.
diff --git a/setup.cfg b/setup.cfg
index 63ff562..dea3158 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -28,6 +28,7 @@
     zuul-bwrap = zuul.driver.bubblewrap:main
     zuul-web = zuul.cmd.web:main
     zuul-migrate = zuul.cmd.migrate:main
+    zuul-fingergw = zuul.cmd.fingergw:main
 
 [build_sphinx]
 source-dir = doc/source
diff --git a/tests/base.py b/tests/base.py
index ea01d20..69d9f55 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2421,7 +2421,7 @@
                      'pydevd.CommandThread',
                      'pydevd.Reader',
                      'pydevd.Writer',
-                     'FingerStreamer',
+                     'socketserver_Thread',
                      ]
         threads = [t for t in threading.enumerate()
                    if t.name not in whitelist]
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_streaming.py
similarity index 68%
rename from tests/unit/test_log_streamer.py
rename to tests/unit/test_streaming.py
index 27368e3..4bb541a 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_streaming.py
@@ -28,6 +28,7 @@
 
 import zuul.web
 import zuul.lib.log_streamer
+import zuul.lib.fingergw
 import tests.base
 
 
@@ -60,7 +61,7 @@
 class TestStreaming(tests.base.AnsibleZuulTestCase):
 
     tenant_config_file = 'config/streamer/main.yaml'
-    log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
+    log = logging.getLogger("zuul.test_streaming")
 
     def setUp(self):
         super(TestStreaming, self).setUp()
@@ -181,9 +182,38 @@
         loop.run_until_complete(client(loop, build_uuid, event))
         loop.close()
 
+    def runFingerClient(self, build_uuid, gateway_address, event):
+        # Wait until the gateway is started
+        while True:
+            try:
+                # NOTE(Shrews): This causes the gateway to begin to handle
+                # a request for which it never receives data, and thus
+                # causes the getCommand() method to timeout (seen in the
+                # test results, but is harmless).
+                with socket.create_connection(gateway_address) as s:
+                    break
+            except ConnectionRefusedError:
+                time.sleep(0.1)
+
+        with socket.create_connection(gateway_address) as s:
+            msg = "%s\n" % build_uuid
+            s.sendall(msg.encode('utf-8'))
+            event.set()  # notify we are connected and req sent
+            while True:
+                data = s.recv(1024)
+                if not data:
+                    break
+                self.streaming_data += data.decode('utf-8')
+            s.shutdown(socket.SHUT_RDWR)
+
     def test_websocket_streaming(self):
+        # Start the finger streamer daemon
+        streamer = zuul.lib.log_streamer.LogStreamer(
+            None, self.host, 0, self.executor_server.jobdir_root)
+        self.addCleanup(streamer.stop)
+
         # Need to set the streaming port before submitting the job
-        finger_port = 7902
+        finger_port = streamer.server.socket.getsockname()[1]
         self.executor_server.log_streaming_port = finger_port
 
         A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -216,11 +246,6 @@
         logfile = open(ansible_log, 'r')
         self.addCleanup(logfile.close)
 
-        # Start the finger streamer daemon
-        streamer = zuul.lib.log_streamer.LogStreamer(
-            None, self.host, finger_port, self.executor_server.jobdir_root)
-        self.addCleanup(streamer.stop)
-
         # Start the web server
         web_server = zuul.web.ZuulWeb(
             listen_address='::', listen_port=9000,
@@ -265,3 +290,83 @@
         self.log.debug("\n\nFile contents: %s\n\n", file_contents)
         self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
         self.assertEqual(file_contents, self.ws_client_results)
+
+    def test_finger_gateway(self):
+        # Start the finger streamer daemon
+        streamer = zuul.lib.log_streamer.LogStreamer(
+            None, self.host, 0, self.executor_server.jobdir_root)
+        self.addCleanup(streamer.stop)
+        finger_port = streamer.server.socket.getsockname()[1]
+
+        # Need to set the streaming port before submitting the job
+        self.executor_server.log_streaming_port = finger_port
+
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        # We don't have any real synchronization for the ansible jobs, so
+        # just wait until we get our running build.
+        while not len(self.builds):
+            time.sleep(0.1)
+        build = self.builds[0]
+        self.assertEqual(build.name, 'python27')
+
+        build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+        while not os.path.exists(build_dir):
+            time.sleep(0.1)
+
+        # Need to wait to make sure that jobdir gets set
+        while build.jobdir is None:
+            time.sleep(0.1)
+            build = self.builds[0]
+
+        # Wait for the job to begin running and create the ansible log file.
+        # The job waits to complete until the flag file exists, so we can
+        # safely access the log here. We only open it (to force a file handle
+        # to be kept open for it after the job finishes) but wait to read the
+        # contents until the job is done.
+        ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+        while not os.path.exists(ansible_log):
+            time.sleep(0.1)
+        logfile = open(ansible_log, 'r')
+        self.addCleanup(logfile.close)
+
+        # Start the finger gateway daemon
+        gateway = zuul.lib.fingergw.FingerGateway(
+            ('127.0.0.1', self.gearman_server.port, None, None, None),
+            (self.host, 0),
+            user=None,
+            command_socket=None,
+            pid_file=None
+        )
+        gateway.start()
+        self.addCleanup(gateway.stop)
+
+        gateway_port = gateway.server.socket.getsockname()[1]
+        gateway_address = (self.host, gateway_port)
+
+        # Start a thread with the finger client
+        finger_client_event = threading.Event()
+        self.finger_client_results = ''
+        finger_client_thread = threading.Thread(
+            target=self.runFingerClient,
+            args=(build.uuid, gateway_address, finger_client_event)
+        )
+        finger_client_thread.start()
+        finger_client_event.wait()
+
+        # Allow the job to complete
+        flag_file = os.path.join(build_dir, 'test_wait')
+        open(flag_file, 'w').close()
+
+        # Wait for the finger client to complete, which it should when
+        # it's received the full log.
+        finger_client_thread.join()
+
+        self.waitUntilSettled()
+
+        file_contents = logfile.read()
+        logfile.close()
+        self.log.debug("\n\nFile contents: %s\n\n", file_contents)
+        self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
+        self.assertEqual(file_contents, self.streaming_data)
diff --git a/zuul/cmd/fingergw.py b/zuul/cmd/fingergw.py
new file mode 100644
index 0000000..920eed8
--- /dev/null
+++ b/zuul/cmd/fingergw.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import signal
+import sys
+
+import zuul.cmd
+import zuul.lib.fingergw
+
+from zuul.lib.config import get_default
+
+
+class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
+    '''
+    Class for the daemon that will distribute any finger requests to the
+    appropriate Zuul executor handling the specified build UUID.
+    '''
+    app_name = 'fingergw'
+    app_description = 'The Zuul finger gateway.'
+
+    def __init__(self):
+        super(FingerGatewayApp, self).__init__()
+        self.gateway = None
+
+    def createParser(self):
+        parser = super(FingerGatewayApp, self).createParser()
+        parser.add_argument('command',
+                            choices=zuul.lib.fingergw.COMMANDS,
+                            nargs='?')
+        return parser
+
+    def parseArguments(self, args=None):
+        super(FingerGatewayApp, self).parseArguments()
+        if self.args.command:
+            self.args.nodaemon = True
+
+    def run(self):
+        '''
+        Main entry point for the FingerGatewayApp.
+
+        Called by the main() method of the parent class.
+        '''
+        if self.args.command in zuul.lib.fingergw.COMMANDS:
+            self.send_command(self.args.command)
+            sys.exit(0)
+
+        self.setup_logging('fingergw', 'log_config')
+        self.log = logging.getLogger('zuul.fingergw')
+
+        # Get values from configuration file
+        host = get_default(self.config, 'fingergw', 'listen_address', '::')
+        port = int(get_default(self.config, 'fingergw', 'port', 79))
+        user = get_default(self.config, 'fingergw', 'user', 'zuul')
+        cmdsock = get_default(
+            self.config, 'fingergw', 'command_socket',
+            '/var/lib/zuul/%s.socket' % self.app_name)
+        gear_server = get_default(self.config, 'gearman', 'server')
+        gear_port = get_default(self.config, 'gearman', 'port', 4730)
+        ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+        ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+        ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+
+        self.gateway = zuul.lib.fingergw.FingerGateway(
+            (gear_server, gear_port, ssl_key, ssl_cert, ssl_ca),
+            (host, port),
+            user,
+            cmdsock,
+            self.getPidFile(),
+        )
+
+        self.log.info('Starting Zuul finger gateway app')
+        self.gateway.start()
+
+        if self.args.nodaemon:
+            # NOTE(Shrews): When running in non-daemon mode, although sending
+            # the 'stop' command via the command socket will shutdown the
+            # gateway, it's still necessary to Ctrl+C to stop the app.
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking gateway to exit nicely...\n")
+                    self.stop()
+                    break
+        else:
+            self.gateway.wait()
+
+        self.log.info('Stopped Zuul finger gateway app')
+
+    def stop(self):
+        if self.gateway:
+            self.gateway.stop()
+
+
+def main():
+    FingerGatewayApp().main()
diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py
new file mode 100644
index 0000000..c89ed0f
--- /dev/null
+++ b/zuul/lib/fingergw.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import logging
+import socket
+import threading
+
+import zuul.rpcclient
+
+from zuul.lib import commandsocket
+from zuul.lib import streamer_utils
+
+
+COMMANDS = ['stop']
+
+
+class RequestHandler(streamer_utils.BaseFingerRequestHandler):
+    '''
+    Class implementing the logic for handling a single finger request.
+    '''
+
+    log = logging.getLogger("zuul.fingergw")
+
+    def __init__(self, *args, **kwargs):
+        self.rpc = kwargs.pop('rpc')
+        super(RequestHandler, self).__init__(*args, **kwargs)
+
+    def _fingerClient(self, server, port, build_uuid):
+        '''
+        Open a finger connection and return all streaming results.
+
+        :param server: The remote server.
+        :param port: The remote port.
+        :param build_uuid: The build UUID to stream.
+
+        Both IPv4 and IPv6 are supported.
+        '''
+        with socket.create_connection((server, port), timeout=10) as s:
+            msg = "%s\n" % build_uuid    # Must have a trailing newline!
+            s.sendall(msg.encode('utf-8'))
+            while True:
+                data = s.recv(1024)
+                if data:
+                    self.request.sendall(data)
+                else:
+                    break
+
+    def handle(self):
+        '''
+        This method is called by the socketserver framework to handle an
+        incoming request.
+        '''
+        try:
+            build_uuid = self.getCommand()
+            port_location = self.rpc.get_job_log_stream_address(build_uuid)
+            self._fingerClient(
+                port_location['server'],
+                port_location['port'],
+                build_uuid,
+            )
+        except Exception:
+            self.log.exception('Finger request handling exception:')
+            msg = 'Internal streaming error'
+            self.request.sendall(msg.encode('utf-8'))
+            return
+
+
+class FingerGateway(object):
+    '''
+    Class implementing the finger multiplexing/gateway logic.
+
+    For each incoming finger request, a new thread is started that will
+    be responsible for finding which Zuul executor is executing the
+    requested build (by asking Gearman), forwarding the request to that
+    executor, and streaming the results back to our client.
+    '''
+
+    log = logging.getLogger("zuul.fingergw")
+
+    def __init__(self, gearman, address, user, command_socket, pid_file):
+        '''
+        Initialize the finger gateway.
+
+        :param tuple gearman: Gearman connection information. This should
+            include the server, port, SSL key, SSL cert, and SSL CA.
+        :param tuple address: The address and port to bind to for our gateway.
+        :param str user: The user to which we should drop privileges after
+            binding to our address.
+        :param str command_socket: Path to the daemon command socket.
+        :param str pid_file: Path to the daemon PID file.
+        '''
+        self.gear_server = gearman[0]
+        self.gear_port = gearman[1]
+        self.gear_ssl_key = gearman[2]
+        self.gear_ssl_cert = gearman[3]
+        self.gear_ssl_ca = gearman[4]
+        self.address = address
+        self.user = user
+        self.pid_file = pid_file
+
+        self.rpc = None
+        self.server = None
+        self.server_thread = None
+
+        self.command_thread = None
+        self.command_running = False
+        self.command_socket = command_socket
+
+        self.command_map = dict(
+            stop=self.stop,
+        )
+
+    def _runCommand(self):
+        while self.command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+                else:
+                    return
+            except Exception:
+                self.log.exception("Exception while processing command")
+
+    def _run(self):
+        try:
+            self.server.serve_forever()
+        except Exception:
+            self.log.exception('Abnormal termination:')
+            raise
+
+    def start(self):
+        self.rpc = zuul.rpcclient.RPCClient(
+            self.gear_server,
+            self.gear_port,
+            self.gear_ssl_key,
+            self.gear_ssl_cert,
+            self.gear_ssl_ca)
+
+        self.server = streamer_utils.CustomThreadingTCPServer(
+            self.address,
+            functools.partial(RequestHandler, rpc=self.rpc),
+            user=self.user,
+            pid_file=self.pid_file)
+
+        # Start the command processor after the server and privilege drop
+        if self.command_socket:
+            self.log.debug("Starting command processor")
+            self.command_socket = commandsocket.CommandSocket(
+                self.command_socket)
+            self.command_socket.start()
+            self.command_running = True
+            self.command_thread = threading.Thread(
+                target=self._runCommand, name='command')
+            self.command_thread.daemon = True
+            self.command_thread.start()
+
+        # The socketserver shutdown() call will hang unless the call
+        # to server_forever() happens in another thread. So let's do that.
+        self.server_thread = threading.Thread(target=self._run)
+        self.server_thread.daemon = True
+        self.server_thread.start()
+        self.log.info("Finger gateway is started")
+
+    def stop(self):
+        if self.command_socket:
+            self.command_running = False
+            try:
+                self.command_socket.stop()
+            except Exception:
+                self.log.exception("Error stopping command socket:")
+
+        if self.server:
+            try:
+                self.server.shutdown()
+                self.server.server_close()
+                self.server = None
+            except Exception:
+                self.log.exception("Error stopping TCP server:")
+
+        if self.rpc:
+            try:
+                self.rpc.shutdown()
+                self.rpc = None
+            except Exception:
+                self.log.exception("Error stopping RCP client:")
+
+        self.log.info("Finger gateway is stopped")
+
+    def wait(self):
+        '''
+        Wait on the gateway to shutdown.
+        '''
+        self.server_thread.join()
diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py
index 1906be7..5c894b4 100644
--- a/zuul/lib/log_streamer.py
+++ b/zuul/lib/log_streamer.py
@@ -18,14 +18,13 @@
 import logging
 import os
 import os.path
-import pwd
 import re
 import select
-import socket
-import socketserver
 import threading
 import time
 
+from zuul.lib import streamer_utils
+
 
 class Log(object):
 
@@ -38,7 +37,7 @@
         self.size = self.stat.st_size
 
 
-class RequestHandler(socketserver.BaseRequestHandler):
+class RequestHandler(streamer_utils.BaseFingerRequestHandler):
     '''
     Class to handle a single log streaming request.
 
@@ -46,47 +45,13 @@
     the (class/method/attribute) names were changed to protect the innocent.
     '''
 
-    MAX_REQUEST_LEN = 1024
-    REQUEST_TIMEOUT = 10
-
-    # NOTE(Shrews): We only use this to log exceptions since a new process
-    # is used per-request (and having multiple processes write to the same
-    # log file constantly is bad).
-    log = logging.getLogger("zuul.log_streamer.RequestHandler")
-
-    def get_command(self):
-        poll = select.poll()
-        bitmask = (select.POLLIN | select.POLLERR |
-                   select.POLLHUP | select.POLLNVAL)
-        poll.register(self.request, bitmask)
-        buffer = b''
-        ret = None
-        start = time.time()
-        while True:
-            elapsed = time.time() - start
-            timeout = max(self.REQUEST_TIMEOUT - elapsed, 0)
-            if not timeout:
-                raise Exception("Timeout while waiting for input")
-            for fd, event in poll.poll(timeout):
-                if event & select.POLLIN:
-                    buffer += self.request.recv(self.MAX_REQUEST_LEN)
-                else:
-                    raise Exception("Received error event")
-            if len(buffer) >= self.MAX_REQUEST_LEN:
-                raise Exception("Request too long")
-            try:
-                ret = buffer.decode('utf-8')
-                x = ret.find('\n')
-                if x > 0:
-                    return ret[:x]
-            except UnicodeDecodeError:
-                pass
+    log = logging.getLogger("zuul.log_streamer")
 
     def handle(self):
         try:
-            build_uuid = self.get_command()
+            build_uuid = self.getCommand()
         except Exception:
-            self.log.exception("Failure during get_command:")
+            self.log.exception("Failure during getCommand:")
             msg = 'Internal streaming error'
             self.request.sendall(msg.encode("utf-8"))
             return
@@ -182,59 +147,11 @@
                     return False
 
 
-class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
-    '''
-    Custom version that allows us to drop privileges after port binding.
-    '''
-    address_family = socket.AF_INET6
+class LogStreamerServer(streamer_utils.CustomThreadingTCPServer):
 
     def __init__(self, *args, **kwargs):
-        self.user = kwargs.pop('user')
         self.jobdir_root = kwargs.pop('jobdir_root')
-        # For some reason, setting custom attributes does not work if we
-        # call the base class __init__ first. Wha??
-        socketserver.ThreadingTCPServer.__init__(self, *args, **kwargs)
-
-    def change_privs(self):
-        '''
-        Drop our privileges to the zuul user.
-        '''
-        if os.getuid() != 0:
-            return
-        pw = pwd.getpwnam(self.user)
-        os.setgroups([])
-        os.setgid(pw.pw_gid)
-        os.setuid(pw.pw_uid)
-        os.umask(0o022)
-
-    def server_bind(self):
-        self.allow_reuse_address = True
-        socketserver.ThreadingTCPServer.server_bind(self)
-        if self.user:
-            self.change_privs()
-
-    def server_close(self):
-        '''
-        Overridden from base class to shutdown the socket immediately.
-        '''
-        try:
-            self.socket.shutdown(socket.SHUT_RD)
-            self.socket.close()
-        except socket.error as e:
-            # If it's already closed, don't error.
-            if e.errno == socket.EBADF:
-                return
-            raise
-
-    def process_request(self, request, client_address):
-        '''
-        Overridden from the base class to name the thread.
-        '''
-        t = threading.Thread(target=self.process_request_thread,
-                             name='FingerStreamer',
-                             args=(request, client_address))
-        t.daemon = self.daemon_threads
-        t.start()
+        super(LogStreamerServer, self).__init__(*args, **kwargs)
 
 
 class LogStreamer(object):
@@ -243,12 +160,12 @@
     '''
 
     def __init__(self, user, host, port, jobdir_root):
-        self.log = logging.getLogger('zuul.lib.LogStreamer')
+        self.log = logging.getLogger('zuul.log_streamer')
         self.log.debug("LogStreamer starting on port %s", port)
-        self.server = CustomThreadingTCPServer((host, port),
-                                               RequestHandler,
-                                               user=user,
-                                               jobdir_root=jobdir_root)
+        self.server = LogStreamerServer((host, port),
+                                        RequestHandler,
+                                        user=user,
+                                        jobdir_root=jobdir_root)
 
         # We start the actual serving within a thread so we can return to
         # the owner.
diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py
new file mode 100644
index 0000000..985f3c3
--- /dev/null
+++ b/zuul/lib/streamer_utils.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''
+This file contains code common to finger log streaming functionality.
+The log streamer process within each executor, the finger gateway service,
+and the web interface will all make use of this module.
+'''
+
+import os
+import pwd
+import select
+import socket
+import socketserver
+import threading
+import time
+
+
+class BaseFingerRequestHandler(socketserver.BaseRequestHandler):
+    '''
+    Base class for common methods for handling finger requests.
+    '''
+
+    MAX_REQUEST_LEN = 1024
+    REQUEST_TIMEOUT = 10
+
+    def getCommand(self):
+        poll = select.poll()
+        bitmask = (select.POLLIN | select.POLLERR |
+                   select.POLLHUP | select.POLLNVAL)
+        poll.register(self.request, bitmask)
+        buffer = b''
+        ret = None
+        start = time.time()
+        while True:
+            elapsed = time.time() - start
+            timeout = max(self.REQUEST_TIMEOUT - elapsed, 0)
+            if not timeout:
+                raise Exception("Timeout while waiting for input")
+            for fd, event in poll.poll(timeout):
+                if event & select.POLLIN:
+                    buffer += self.request.recv(self.MAX_REQUEST_LEN)
+                else:
+                    raise Exception("Received error event")
+            if len(buffer) >= self.MAX_REQUEST_LEN:
+                raise Exception("Request too long")
+            try:
+                ret = buffer.decode('utf-8')
+                x = ret.find('\n')
+                if x > 0:
+                    return ret[:x]
+            except UnicodeDecodeError:
+                pass
+
+
+class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
+    '''
+    Custom version that allows us to drop privileges after port binding.
+    '''
+
+    address_family = socket.AF_INET6
+
+    def __init__(self, *args, **kwargs):
+        self.user = kwargs.pop('user')
+        self.pid_file = kwargs.pop('pid_file', None)
+        socketserver.ThreadingTCPServer.__init__(self, *args, **kwargs)
+
+    def change_privs(self):
+        '''
+        Drop our privileges to another user.
+        '''
+        if os.getuid() != 0:
+            return
+
+        pw = pwd.getpwnam(self.user)
+
+        # Change owner on our pid file so it can be removed by us after
+        # dropping privileges. May not exist if not a daemon.
+        if self.pid_file and os.path.exists(self.pid_file):
+            os.chown(self.pid_file, pw.pw_uid, pw.pw_gid)
+
+        os.setgroups([])
+        os.setgid(pw.pw_gid)
+        os.setuid(pw.pw_uid)
+        os.umask(0o022)
+
+    def server_bind(self):
+        '''
+        Overridden from the base class to allow address reuse and to drop
+        privileges after binding to the listening socket.
+        '''
+        self.allow_reuse_address = True
+        socketserver.ThreadingTCPServer.server_bind(self)
+        if self.user:
+            self.change_privs()
+
+    def server_close(self):
+        '''
+        Overridden from base class to shutdown the socket immediately.
+        '''
+        try:
+            self.socket.shutdown(socket.SHUT_RD)
+            self.socket.close()
+        except socket.error as e:
+            # If it's already closed, don't error.
+            if e.errno == socket.EBADF:
+                return
+            raise
+
+    def process_request(self, request, client_address):
+        '''
+        Overridden from the base class to name the thread.
+        '''
+        t = threading.Thread(target=self.process_request_thread,
+                             name='socketserver_Thread',
+                             args=(request, client_address))
+        t.daemon = self.daemon_threads
+        t.start()
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index e4a3612..3c0b855 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -42,17 +42,6 @@
     def setEventLoop(self, event_loop):
         self.event_loop = event_loop
 
-    def _getPortLocation(self, job_uuid):
-        """
-        Query Gearman for the executor running the given job.
-
-        :param str job_uuid: The job UUID we want to stream.
-        """
-        # TODO: Fetch the entire list of uuid/file/server/ports once and
-        #       share that, and fetch a new list on cache misses perhaps?
-        ret = self.rpc.get_job_log_stream_address(job_uuid)
-        return ret
-
     async def _fingerClient(self, ws, server, port, job_uuid):
         """
         Create a client to connect to the finger streamer and pull results.
@@ -94,7 +83,10 @@
 
         # Schedule the blocking gearman work in an Executor
         gear_task = self.event_loop.run_in_executor(
-            None, self._getPortLocation, request['uuid'])
+            None,
+            self.rpc.get_job_log_stream_address,
+            request['uuid'],
+        )
 
         try:
             port_location = await asyncio.wait_for(gear_task, 10)