Add a finger protocol log streamer

This will be started along side the the executor process, similar
to how the scheduler and gearman server are linked.

This will require starting the process as the root user so that we
can grab the finger port properly. The process listening on the
finger port will drop its privileges to the designated user after
grabbing the socket. The executor will also drop its privileges
to the same user after starting the log streamer.

Change-Id: Ib52585cafbd073ccdb7f87432888ce15c7a66f67
diff --git a/tests/unit/ b/tests/unit/
new file mode 100644
index 0000000..3ea5a8e
--- /dev/null
+++ b/tests/unit/
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import logging
+import socket
+import tempfile
+import zuul.lib.log_streamer
+import tests.base
+class TestLogStreamer(tests.base.BaseTestCase):
+    log = logging.getLogger("zuul.test.cloner")
+    def setUp(self):
+        super(TestLogStreamer, self).setUp()
+ = ''
+    def startStreamer(self, port, root=None):
+        if not root:
+            root = tempfile.gettempdir()
+        return zuul.lib.log_streamer.LogStreamer(None,, port, root)
+    def test_start_stop(self):
+        port = 7900
+        streamer = self.startStreamer(port)
+        self.addCleanup(streamer.stop)
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.addCleanup(s.close)
+        self.assertEqual(0, s.connect_ex((, port)))
+        s.close()
+        streamer.stop()
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.addCleanup(s.close)
+        self.assertNotEqual(0, s.connect_ex((, port)))
+        s.close()
diff --git a/zuul/cmd/ b/zuul/cmd/
index 4f5b61c..1124d68 100755
--- a/zuul/cmd/
+++ b/zuul/cmd/
@@ -24,9 +24,11 @@
 import logging
 import os
+import pwd
 import socket
 import sys
 import signal
+import tempfile
 import zuul.cmd
 import zuul.executor.server
@@ -37,6 +39,12 @@
 # Similar situation with gear and statsd.
+# TODO(Shrews): Get this from the config file
+USER = 'zuul'
 class Executor(zuul.cmd.ZuulApp):
     def parse_arguments(self):
@@ -72,21 +80,58 @@
+    def start_log_streamer(self):
+        pipe_read, pipe_write = os.pipe()
+        child_pid = os.fork()
+        if child_pid == 0:
+            os.close(pipe_write)
+            import zuul.lib.log_streamer
+  "Starting log streamer")
+            streamer = zuul.lib.log_streamer.LogStreamer(
+                USER, '', FINGER_PORT, self.jobdir_root)
+            # Keep running until the parent dies:
+            pipe_read = os.fdopen(pipe_read)
+  "Stopping log streamer")
+            streamer.stop()
+            os._exit(0)
+        else:
+            os.close(pipe_read)
+            self.log_streamer_pid = child_pid
+    def change_privs(self):
+        '''
+        Drop our privileges to the zuul user.
+        '''
+        if os.getuid() != 0:
+            return
+        pw = pwd.getpwnam(USER)
+        os.setgroups([])
+        os.setgid(pw.pw_gid)
+        os.setuid(pw.pw_uid)
+        os.umask(0o022)
     def main(self, daemon=True):
         # See comment at top of file about zuul imports
-        self.setup_logging('executor', 'log_config')
+        self.jobroot_dir = None
+        if self.config.has_option('zuul', 'jobroot_dir'):
+            self.jobroot_dir = os.path.expanduser(
+                self.config.get('zuul', 'jobroot_dir'))
+        else:
+            self.jobdir_root = tempfile.gettempdir()
+        self.setup_logging('executor', 'log_config')
         self.log = logging.getLogger("zuul.Executor")
-        jobroot_dir = None
-        if self.config.has_option('zuul', 'jobroot_dir'):
-            jobroot_dir = os.path.expanduser(
-                self.config.get('zuul', 'jobroot_dir'))
+        self.start_log_streamer()
+        self.change_privs()
         ExecutorServer = zuul.executor.server.ExecutorServer
         self.executor = ExecutorServer(self.config, self.connections,
-                                       jobdir_root=jobroot_dir,
+                                       jobdir_root=self.jobroot_dir,
diff --git a/zuul/lib/ b/zuul/lib/
new file mode 100644
index 0000000..9764237
--- /dev/null
+++ b/zuul/lib/
@@ -0,0 +1,196 @@
+#!/usr/bin/env python
+# Copyright (c) 2016 IBM Corp.
+# Copyright 2017 Red Hat, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import os
+import os.path
+import pwd
+import re
+import select
+import socket
+import threading
+import time
+    import SocketServer as ss  # python 2.x
+except ImportError:
+    import socketserver as ss  # python 3
+class Log(object):
+    def __init__(self, path):
+        self.path = path
+        self.file = open(path)
+        self.stat = os.stat(path)
+        self.size = self.stat.st_size
+class RequestHandler(ss.BaseRequestHandler):
+    '''
+    Class to handle a single log streaming request.
+    The log streaming code was blatantly stolen from Only
+    the (class/method/attribute) names were changed to protect the innocent.
+    '''
+    def handle(self):
+        build_uuid = self.request.recv(1024)
+        build_uuid = build_uuid.rstrip()
+        # validate build ID
+        if not re.match("[0-9A-Fa-f]+$", build_uuid):
+            self.request.sendall('Build ID %s is not valid' % build_uuid)
+            return
+        job_dir = os.path.join(self.server.jobdir_root, build_uuid)
+        if not os.path.exists(job_dir):
+            self.request.sendall('Build ID %s not found' % build_uuid)
+            return
+        # check if log file exists
+        log_file = os.path.join(job_dir, 'ansible', 'ansible_log.txt')
+        if not os.path.exists(log_file):
+            self.request.sendall('Log not found for build ID %s' % build_uuid)
+            return
+        self.stream_log(log_file)
+    def stream_log(self, log_file):
+        log = None
+        while True:
+            if log is not None:
+                try:
+                    log.file.close()
+                except:
+                    pass
+            while True:
+                log = self.chunk_log(log_file)
+                if log:
+                    break
+                time.sleep(0.5)
+            while True:
+                if self.follow_log(log):
+                    break
+                else:
+                    return
+    def chunk_log(self, log_file):
+        try:
+            log = Log(log_file)
+        except Exception:
+            return
+        while True:
+            chunk =
+            if not chunk:
+                break
+            self.request.send(chunk)
+        return log
+    def follow_log(self, log):
+        while True:
+            # As long as we have unread data, keep reading/sending
+            while True:
+                chunk =
+                if chunk:
+                    self.request.send(chunk)
+                else:
+                    break
+            # At this point, we are waiting for more data to be written
+            time.sleep(0.5)
+            # Check to see if the remote end has sent any data, if so,
+            # discard
+            r, w, e =[self.request], [], [self.request], 0)
+            if self.request in e:
+                return False
+            if self.request in r:
+                ret = self.request.recv(1024)
+                # Discard anything read, if input is eof, it has
+                # disconnected.
+                if not ret:
+                    return False
+            # See if the file has been truncated
+            try:
+                st = os.stat(log.path)
+                if (st.st_ino != log.stat.st_ino or
+                    st.st_size < log.size):
+                    return True
+            except Exception:
+                return True
+            log.size = st.st_size
+class CustomForkingTCPServer(ss.ForkingTCPServer):
+    '''
+    Custom version that allows us to drop privileges after port binding.
+    '''
+    def __init__(self, *args, **kwargs):
+        self.user = kwargs.pop('user')
+        self.jobdir_root = kwargs.pop('jobdir_root')
+        # For some reason, setting custom attributes does not work if we
+        # call the base class __init__ first. Wha??
+        ss.ForkingTCPServer.__init__(self, *args, **kwargs)
+    def change_privs(self):
+        '''
+        Drop our privileges to the zuul user.
+        '''
+        if os.getuid() != 0:
+            return
+        pw = pwd.getpwnam(self.user)
+        os.setgroups([])
+        os.setgid(pw.pw_gid)
+        os.setuid(pw.pw_uid)
+        os.umask(0o022)
+    def server_bind(self):
+        self.allow_reuse_address = True
+        ss.ForkingTCPServer.server_bind(self)
+        if self.user:
+            self.change_privs()
+    def server_close(self):
+        '''
+        Overridden from base class to shutdown the socket immediately.
+        '''
+        self.socket.shutdown(socket.SHUT_RD)
+        self.socket.close()
+class LogStreamer(object):
+    '''
+    Class implementing log streaming over the finger daemon port.
+    '''
+    def __init__(self, user, host, port, jobdir_root):
+        self.server = CustomForkingTCPServer((host, port),
+                                             RequestHandler,
+                                             user=user,
+                                             jobdir_root=jobdir_root)
+        # We start the actual serving within a thread so we can return to
+        # the owner.
+ = threading.Thread(target=self.server.serve_forever)
+ = True
+    def stop(self):
+        if
+            self.server.shutdown()
+            self.server.server_close()