Merge "Send open CORS header for jobs and builds" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 86b01ef..3bec28a 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -287,7 +287,7 @@
.. attr:: merger
- ,, attr:: command_socket
+ .. attr:: command_socket
:default: /var/lib/zuul/merger.socket
Path to command socket file for the merger process.
@@ -627,3 +627,65 @@
To start the web server, run ``zuul-web``. To stop it, kill the
PID which was saved in the pidfile specified in the configuration.
+
+Finger Gateway
+--------------
+
+The Zuul finger gateway listens on the standard finger port (79) for
+finger requests specifying a build UUID for which it should stream log
+results. The gateway will determine which executor is currently running that
+build and query that executor for the log stream.
+
+This is intended to be used with the standard finger command line client.
+For example::
+
+ finger UUID@zuul.example.com
+
+The above would stream the logs for the build identified by `UUID`.
+
+Configuration
+~~~~~~~~~~~~~
+
+In addition to the common configuration sections, the following
+sections of ``zuul.conf`` are used by the finger gateway:
+
+.. attr:: fingergw
+
+ .. attr:: command_socket
+ :default: /var/lib/zuul/fingergw.socket
+
+ Path to command socket file for the executor process.
+
+ .. attr:: listen_address
+ :default: all addresses
+
+ IP address or domain name on which to listen.
+
+ .. attr:: log_config
+
+ Path to log config file for the finger gateway process.
+
+ .. attr:: pidfile
+ :default: /var/run/zuul-fingergw/zuul-fingergw.pid
+
+ Path to PID lock file for the finger gateway process.
+
+ .. attr:: port
+ :default: 79
+
+ Port to use for the finger gateway. Note that since command line
+ finger clients cannot usually specify the port, leaving this set to
+ the default value is highly recommended.
+
+ .. attr:: user
+ :default: zuul
+
+ User ID for the zuul-fingergw process. In normal operation as a
+ daemon, the finger gateway should be started as the ``root`` user, but
+ it will drop privileges to this user during startup.
+
+Operation
+~~~~~~~~~
+
+To start the finger gateway, run ``zuul-fingergw``. To stop it, kill the
+PID which was saved in the pidfile specified in the configuration.
diff --git a/doc/source/user/encryption.rst b/doc/source/user/encryption.rst
index 7ced589..d45195f 100644
--- a/doc/source/user/encryption.rst
+++ b/doc/source/user/encryption.rst
@@ -15,9 +15,8 @@
which can be used by anyone to encrypt a secret and only Zuul is able
to decrypt it. Zuul serves each project's public key using its
build-in webserver. They can be fetched at the path
-``/keys/<source>/<project>.pub`` where ``<project>`` is the name of a
-project and ``<source>`` is the name of that project's connection in
-the main Zuul configuration file.
+``/<tenant>/<project>.pub`` where ``<project>`` is the canonical name
+of a project and ``<tenant>`` is the name of a tenant with that project.
Zuul currently supports one encryption scheme, PKCS#1 with OAEP, which
can not store secrets longer than the 3760 bits (derived from the key
diff --git a/setup.cfg b/setup.cfg
index 63ff562..dea3158 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -28,6 +28,7 @@
zuul-bwrap = zuul.driver.bubblewrap:main
zuul-web = zuul.cmd.web:main
zuul-migrate = zuul.cmd.migrate:main
+ zuul-fingergw = zuul.cmd.fingergw:main
[build_sphinx]
source-dir = doc/source
diff --git a/tests/base.py b/tests/base.py
index ea01d20..69d9f55 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2421,7 +2421,7 @@
'pydevd.CommandThread',
'pydevd.Reader',
'pydevd.Writer',
- 'FingerStreamer',
+ 'socketserver_Thread',
]
threads = [t for t in threading.enumerate()
if t.name not in whitelist]
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_streaming.py
similarity index 68%
rename from tests/unit/test_log_streamer.py
rename to tests/unit/test_streaming.py
index 27368e3..4bb541a 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_streaming.py
@@ -28,6 +28,7 @@
import zuul.web
import zuul.lib.log_streamer
+import zuul.lib.fingergw
import tests.base
@@ -60,7 +61,7 @@
class TestStreaming(tests.base.AnsibleZuulTestCase):
tenant_config_file = 'config/streamer/main.yaml'
- log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
+ log = logging.getLogger("zuul.test_streaming")
def setUp(self):
super(TestStreaming, self).setUp()
@@ -181,9 +182,38 @@
loop.run_until_complete(client(loop, build_uuid, event))
loop.close()
+ def runFingerClient(self, build_uuid, gateway_address, event):
+ # Wait until the gateway is started
+ while True:
+ try:
+ # NOTE(Shrews): This causes the gateway to begin to handle
+ # a request for which it never receives data, and thus
+ # causes the getCommand() method to timeout (seen in the
+ # test results, but is harmless).
+ with socket.create_connection(gateway_address) as s:
+ break
+ except ConnectionRefusedError:
+ time.sleep(0.1)
+
+ with socket.create_connection(gateway_address) as s:
+ msg = "%s\n" % build_uuid
+ s.sendall(msg.encode('utf-8'))
+ event.set() # notify we are connected and req sent
+ while True:
+ data = s.recv(1024)
+ if not data:
+ break
+ self.streaming_data += data.decode('utf-8')
+ s.shutdown(socket.SHUT_RDWR)
+
def test_websocket_streaming(self):
+ # Start the finger streamer daemon
+ streamer = zuul.lib.log_streamer.LogStreamer(
+ None, self.host, 0, self.executor_server.jobdir_root)
+ self.addCleanup(streamer.stop)
+
# Need to set the streaming port before submitting the job
- finger_port = 7902
+ finger_port = streamer.server.socket.getsockname()[1]
self.executor_server.log_streaming_port = finger_port
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@@ -216,11 +246,6 @@
logfile = open(ansible_log, 'r')
self.addCleanup(logfile.close)
- # Start the finger streamer daemon
- streamer = zuul.lib.log_streamer.LogStreamer(
- None, self.host, finger_port, self.executor_server.jobdir_root)
- self.addCleanup(streamer.stop)
-
# Start the web server
web_server = zuul.web.ZuulWeb(
listen_address='::', listen_port=9000,
@@ -265,3 +290,83 @@
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
self.assertEqual(file_contents, self.ws_client_results)
+
+ def test_finger_gateway(self):
+ # Start the finger streamer daemon
+ streamer = zuul.lib.log_streamer.LogStreamer(
+ None, self.host, 0, self.executor_server.jobdir_root)
+ self.addCleanup(streamer.stop)
+ finger_port = streamer.server.socket.getsockname()[1]
+
+ # Need to set the streaming port before submitting the job
+ self.executor_server.log_streaming_port = finger_port
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+ # We don't have any real synchronization for the ansible jobs, so
+ # just wait until we get our running build.
+ while not len(self.builds):
+ time.sleep(0.1)
+ build = self.builds[0]
+ self.assertEqual(build.name, 'python27')
+
+ build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+ while not os.path.exists(build_dir):
+ time.sleep(0.1)
+
+ # Need to wait to make sure that jobdir gets set
+ while build.jobdir is None:
+ time.sleep(0.1)
+ build = self.builds[0]
+
+ # Wait for the job to begin running and create the ansible log file.
+ # The job waits to complete until the flag file exists, so we can
+ # safely access the log here. We only open it (to force a file handle
+ # to be kept open for it after the job finishes) but wait to read the
+ # contents until the job is done.
+ ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+ while not os.path.exists(ansible_log):
+ time.sleep(0.1)
+ logfile = open(ansible_log, 'r')
+ self.addCleanup(logfile.close)
+
+ # Start the finger gateway daemon
+ gateway = zuul.lib.fingergw.FingerGateway(
+ ('127.0.0.1', self.gearman_server.port, None, None, None),
+ (self.host, 0),
+ user=None,
+ command_socket=None,
+ pid_file=None
+ )
+ gateway.start()
+ self.addCleanup(gateway.stop)
+
+ gateway_port = gateway.server.socket.getsockname()[1]
+ gateway_address = (self.host, gateway_port)
+
+ # Start a thread with the finger client
+ finger_client_event = threading.Event()
+ self.finger_client_results = ''
+ finger_client_thread = threading.Thread(
+ target=self.runFingerClient,
+ args=(build.uuid, gateway_address, finger_client_event)
+ )
+ finger_client_thread.start()
+ finger_client_event.wait()
+
+ # Allow the job to complete
+ flag_file = os.path.join(build_dir, 'test_wait')
+ open(flag_file, 'w').close()
+
+ # Wait for the finger client to complete, which it should when
+ # it's received the full log.
+ finger_client_thread.join()
+
+ self.waitUntilSettled()
+
+ file_contents = logfile.read()
+ logfile.close()
+ self.log.debug("\n\nFile contents: %s\n\n", file_contents)
+ self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
+ self.assertEqual(file_contents, self.streaming_data)
diff --git a/tools/encrypt_secret.py b/tools/encrypt_secret.py
index 9b52846..2a4ea1d 100755
--- a/tools/encrypt_secret.py
+++ b/tools/encrypt_secret.py
@@ -43,10 +43,7 @@
parser.add_argument('url',
help="The base URL of the zuul server and tenant. "
"E.g., https://zuul.example.com/tenant-name")
- # TODO(jeblair,mordred): When projects have canonical names, use that here.
# TODO(jeblair): Throw a fit if SSL is not used.
- parser.add_argument('source',
- help="The Zuul source of the project.")
parser.add_argument('project',
help="The name of the project.")
parser.add_argument('--infile',
@@ -61,8 +58,7 @@
"to standard output.")
args = parser.parse_args()
- req = Request("%s/keys/%s/%s.pub" % (
- args.url, args.source, args.project))
+ req = Request("%s/%s.pub" % (args.url, args.project))
pubkey = urlopen(req)
if args.infile:
diff --git a/zuul/cmd/fingergw.py b/zuul/cmd/fingergw.py
new file mode 100644
index 0000000..920eed8
--- /dev/null
+++ b/zuul/cmd/fingergw.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import signal
+import sys
+
+import zuul.cmd
+import zuul.lib.fingergw
+
+from zuul.lib.config import get_default
+
+
+class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
+ '''
+ Class for the daemon that will distribute any finger requests to the
+ appropriate Zuul executor handling the specified build UUID.
+ '''
+ app_name = 'fingergw'
+ app_description = 'The Zuul finger gateway.'
+
+ def __init__(self):
+ super(FingerGatewayApp, self).__init__()
+ self.gateway = None
+
+ def createParser(self):
+ parser = super(FingerGatewayApp, self).createParser()
+ parser.add_argument('command',
+ choices=zuul.lib.fingergw.COMMANDS,
+ nargs='?')
+ return parser
+
+ def parseArguments(self, args=None):
+ super(FingerGatewayApp, self).parseArguments()
+ if self.args.command:
+ self.args.nodaemon = True
+
+ def run(self):
+ '''
+ Main entry point for the FingerGatewayApp.
+
+ Called by the main() method of the parent class.
+ '''
+ if self.args.command in zuul.lib.fingergw.COMMANDS:
+ self.send_command(self.args.command)
+ sys.exit(0)
+
+ self.setup_logging('fingergw', 'log_config')
+ self.log = logging.getLogger('zuul.fingergw')
+
+ # Get values from configuration file
+ host = get_default(self.config, 'fingergw', 'listen_address', '::')
+ port = int(get_default(self.config, 'fingergw', 'port', 79))
+ user = get_default(self.config, 'fingergw', 'user', 'zuul')
+ cmdsock = get_default(
+ self.config, 'fingergw', 'command_socket',
+ '/var/lib/zuul/%s.socket' % self.app_name)
+ gear_server = get_default(self.config, 'gearman', 'server')
+ gear_port = get_default(self.config, 'gearman', 'port', 4730)
+ ssl_key = get_default(self.config, 'gearman', 'ssl_key')
+ ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
+ ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
+
+ self.gateway = zuul.lib.fingergw.FingerGateway(
+ (gear_server, gear_port, ssl_key, ssl_cert, ssl_ca),
+ (host, port),
+ user,
+ cmdsock,
+ self.getPidFile(),
+ )
+
+ self.log.info('Starting Zuul finger gateway app')
+ self.gateway.start()
+
+ if self.args.nodaemon:
+ # NOTE(Shrews): When running in non-daemon mode, although sending
+ # the 'stop' command via the command socket will shutdown the
+ # gateway, it's still necessary to Ctrl+C to stop the app.
+ while True:
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print("Ctrl + C: asking gateway to exit nicely...\n")
+ self.stop()
+ break
+ else:
+ self.gateway.wait()
+
+ self.log.info('Stopped Zuul finger gateway app')
+
+ def stop(self):
+ if self.gateway:
+ self.gateway.stop()
+
+
+def main():
+ FingerGatewayApp().main()
diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py
new file mode 100644
index 0000000..c89ed0f
--- /dev/null
+++ b/zuul/lib/fingergw.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import logging
+import socket
+import threading
+
+import zuul.rpcclient
+
+from zuul.lib import commandsocket
+from zuul.lib import streamer_utils
+
+
+COMMANDS = ['stop']
+
+
+class RequestHandler(streamer_utils.BaseFingerRequestHandler):
+ '''
+ Class implementing the logic for handling a single finger request.
+ '''
+
+ log = logging.getLogger("zuul.fingergw")
+
+ def __init__(self, *args, **kwargs):
+ self.rpc = kwargs.pop('rpc')
+ super(RequestHandler, self).__init__(*args, **kwargs)
+
+ def _fingerClient(self, server, port, build_uuid):
+ '''
+ Open a finger connection and return all streaming results.
+
+ :param server: The remote server.
+ :param port: The remote port.
+ :param build_uuid: The build UUID to stream.
+
+ Both IPv4 and IPv6 are supported.
+ '''
+ with socket.create_connection((server, port), timeout=10) as s:
+ msg = "%s\n" % build_uuid # Must have a trailing newline!
+ s.sendall(msg.encode('utf-8'))
+ while True:
+ data = s.recv(1024)
+ if data:
+ self.request.sendall(data)
+ else:
+ break
+
+ def handle(self):
+ '''
+ This method is called by the socketserver framework to handle an
+ incoming request.
+ '''
+ try:
+ build_uuid = self.getCommand()
+ port_location = self.rpc.get_job_log_stream_address(build_uuid)
+ self._fingerClient(
+ port_location['server'],
+ port_location['port'],
+ build_uuid,
+ )
+ except Exception:
+ self.log.exception('Finger request handling exception:')
+ msg = 'Internal streaming error'
+ self.request.sendall(msg.encode('utf-8'))
+ return
+
+
+class FingerGateway(object):
+ '''
+ Class implementing the finger multiplexing/gateway logic.
+
+ For each incoming finger request, a new thread is started that will
+ be responsible for finding which Zuul executor is executing the
+ requested build (by asking Gearman), forwarding the request to that
+ executor, and streaming the results back to our client.
+ '''
+
+ log = logging.getLogger("zuul.fingergw")
+
+ def __init__(self, gearman, address, user, command_socket, pid_file):
+ '''
+ Initialize the finger gateway.
+
+ :param tuple gearman: Gearman connection information. This should
+ include the server, port, SSL key, SSL cert, and SSL CA.
+ :param tuple address: The address and port to bind to for our gateway.
+ :param str user: The user to which we should drop privileges after
+ binding to our address.
+ :param str command_socket: Path to the daemon command socket.
+ :param str pid_file: Path to the daemon PID file.
+ '''
+ self.gear_server = gearman[0]
+ self.gear_port = gearman[1]
+ self.gear_ssl_key = gearman[2]
+ self.gear_ssl_cert = gearman[3]
+ self.gear_ssl_ca = gearman[4]
+ self.address = address
+ self.user = user
+ self.pid_file = pid_file
+
+ self.rpc = None
+ self.server = None
+ self.server_thread = None
+
+ self.command_thread = None
+ self.command_running = False
+ self.command_socket = command_socket
+
+ self.command_map = dict(
+ stop=self.stop,
+ )
+
+ def _runCommand(self):
+ while self.command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ else:
+ return
+ except Exception:
+ self.log.exception("Exception while processing command")
+
+ def _run(self):
+ try:
+ self.server.serve_forever()
+ except Exception:
+ self.log.exception('Abnormal termination:')
+ raise
+
+ def start(self):
+ self.rpc = zuul.rpcclient.RPCClient(
+ self.gear_server,
+ self.gear_port,
+ self.gear_ssl_key,
+ self.gear_ssl_cert,
+ self.gear_ssl_ca)
+
+ self.server = streamer_utils.CustomThreadingTCPServer(
+ self.address,
+ functools.partial(RequestHandler, rpc=self.rpc),
+ user=self.user,
+ pid_file=self.pid_file)
+
+ # Start the command processor after the server and privilege drop
+ if self.command_socket:
+ self.log.debug("Starting command processor")
+ self.command_socket = commandsocket.CommandSocket(
+ self.command_socket)
+ self.command_socket.start()
+ self.command_running = True
+ self.command_thread = threading.Thread(
+ target=self._runCommand, name='command')
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
+ # The socketserver shutdown() call will hang unless the call
+ # to server_forever() happens in another thread. So let's do that.
+ self.server_thread = threading.Thread(target=self._run)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+ self.log.info("Finger gateway is started")
+
+ def stop(self):
+ if self.command_socket:
+ self.command_running = False
+ try:
+ self.command_socket.stop()
+ except Exception:
+ self.log.exception("Error stopping command socket:")
+
+ if self.server:
+ try:
+ self.server.shutdown()
+ self.server.server_close()
+ self.server = None
+ except Exception:
+ self.log.exception("Error stopping TCP server:")
+
+ if self.rpc:
+ try:
+ self.rpc.shutdown()
+ self.rpc = None
+ except Exception:
+ self.log.exception("Error stopping RCP client:")
+
+ self.log.info("Finger gateway is stopped")
+
+ def wait(self):
+ '''
+ Wait on the gateway to shutdown.
+ '''
+ self.server_thread.join()
diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py
index 1906be7..5c894b4 100644
--- a/zuul/lib/log_streamer.py
+++ b/zuul/lib/log_streamer.py
@@ -18,14 +18,13 @@
import logging
import os
import os.path
-import pwd
import re
import select
-import socket
-import socketserver
import threading
import time
+from zuul.lib import streamer_utils
+
class Log(object):
@@ -38,7 +37,7 @@
self.size = self.stat.st_size
-class RequestHandler(socketserver.BaseRequestHandler):
+class RequestHandler(streamer_utils.BaseFingerRequestHandler):
'''
Class to handle a single log streaming request.
@@ -46,47 +45,13 @@
the (class/method/attribute) names were changed to protect the innocent.
'''
- MAX_REQUEST_LEN = 1024
- REQUEST_TIMEOUT = 10
-
- # NOTE(Shrews): We only use this to log exceptions since a new process
- # is used per-request (and having multiple processes write to the same
- # log file constantly is bad).
- log = logging.getLogger("zuul.log_streamer.RequestHandler")
-
- def get_command(self):
- poll = select.poll()
- bitmask = (select.POLLIN | select.POLLERR |
- select.POLLHUP | select.POLLNVAL)
- poll.register(self.request, bitmask)
- buffer = b''
- ret = None
- start = time.time()
- while True:
- elapsed = time.time() - start
- timeout = max(self.REQUEST_TIMEOUT - elapsed, 0)
- if not timeout:
- raise Exception("Timeout while waiting for input")
- for fd, event in poll.poll(timeout):
- if event & select.POLLIN:
- buffer += self.request.recv(self.MAX_REQUEST_LEN)
- else:
- raise Exception("Received error event")
- if len(buffer) >= self.MAX_REQUEST_LEN:
- raise Exception("Request too long")
- try:
- ret = buffer.decode('utf-8')
- x = ret.find('\n')
- if x > 0:
- return ret[:x]
- except UnicodeDecodeError:
- pass
+ log = logging.getLogger("zuul.log_streamer")
def handle(self):
try:
- build_uuid = self.get_command()
+ build_uuid = self.getCommand()
except Exception:
- self.log.exception("Failure during get_command:")
+ self.log.exception("Failure during getCommand:")
msg = 'Internal streaming error'
self.request.sendall(msg.encode("utf-8"))
return
@@ -182,59 +147,11 @@
return False
-class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
- '''
- Custom version that allows us to drop privileges after port binding.
- '''
- address_family = socket.AF_INET6
+class LogStreamerServer(streamer_utils.CustomThreadingTCPServer):
def __init__(self, *args, **kwargs):
- self.user = kwargs.pop('user')
self.jobdir_root = kwargs.pop('jobdir_root')
- # For some reason, setting custom attributes does not work if we
- # call the base class __init__ first. Wha??
- socketserver.ThreadingTCPServer.__init__(self, *args, **kwargs)
-
- def change_privs(self):
- '''
- Drop our privileges to the zuul user.
- '''
- if os.getuid() != 0:
- return
- pw = pwd.getpwnam(self.user)
- os.setgroups([])
- os.setgid(pw.pw_gid)
- os.setuid(pw.pw_uid)
- os.umask(0o022)
-
- def server_bind(self):
- self.allow_reuse_address = True
- socketserver.ThreadingTCPServer.server_bind(self)
- if self.user:
- self.change_privs()
-
- def server_close(self):
- '''
- Overridden from base class to shutdown the socket immediately.
- '''
- try:
- self.socket.shutdown(socket.SHUT_RD)
- self.socket.close()
- except socket.error as e:
- # If it's already closed, don't error.
- if e.errno == socket.EBADF:
- return
- raise
-
- def process_request(self, request, client_address):
- '''
- Overridden from the base class to name the thread.
- '''
- t = threading.Thread(target=self.process_request_thread,
- name='FingerStreamer',
- args=(request, client_address))
- t.daemon = self.daemon_threads
- t.start()
+ super(LogStreamerServer, self).__init__(*args, **kwargs)
class LogStreamer(object):
@@ -243,12 +160,12 @@
'''
def __init__(self, user, host, port, jobdir_root):
- self.log = logging.getLogger('zuul.lib.LogStreamer')
+ self.log = logging.getLogger('zuul.log_streamer')
self.log.debug("LogStreamer starting on port %s", port)
- self.server = CustomThreadingTCPServer((host, port),
- RequestHandler,
- user=user,
- jobdir_root=jobdir_root)
+ self.server = LogStreamerServer((host, port),
+ RequestHandler,
+ user=user,
+ jobdir_root=jobdir_root)
# We start the actual serving within a thread so we can return to
# the owner.
diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py
new file mode 100644
index 0000000..985f3c3
--- /dev/null
+++ b/zuul/lib/streamer_utils.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''
+This file contains code common to finger log streaming functionality.
+The log streamer process within each executor, the finger gateway service,
+and the web interface will all make use of this module.
+'''
+
+import os
+import pwd
+import select
+import socket
+import socketserver
+import threading
+import time
+
+
+class BaseFingerRequestHandler(socketserver.BaseRequestHandler):
+ '''
+ Base class for common methods for handling finger requests.
+ '''
+
+ MAX_REQUEST_LEN = 1024
+ REQUEST_TIMEOUT = 10
+
+ def getCommand(self):
+ poll = select.poll()
+ bitmask = (select.POLLIN | select.POLLERR |
+ select.POLLHUP | select.POLLNVAL)
+ poll.register(self.request, bitmask)
+ buffer = b''
+ ret = None
+ start = time.time()
+ while True:
+ elapsed = time.time() - start
+ timeout = max(self.REQUEST_TIMEOUT - elapsed, 0)
+ if not timeout:
+ raise Exception("Timeout while waiting for input")
+ for fd, event in poll.poll(timeout):
+ if event & select.POLLIN:
+ buffer += self.request.recv(self.MAX_REQUEST_LEN)
+ else:
+ raise Exception("Received error event")
+ if len(buffer) >= self.MAX_REQUEST_LEN:
+ raise Exception("Request too long")
+ try:
+ ret = buffer.decode('utf-8')
+ x = ret.find('\n')
+ if x > 0:
+ return ret[:x]
+ except UnicodeDecodeError:
+ pass
+
+
+class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
+ '''
+ Custom version that allows us to drop privileges after port binding.
+ '''
+
+ address_family = socket.AF_INET6
+
+ def __init__(self, *args, **kwargs):
+ self.user = kwargs.pop('user')
+ self.pid_file = kwargs.pop('pid_file', None)
+ socketserver.ThreadingTCPServer.__init__(self, *args, **kwargs)
+
+ def change_privs(self):
+ '''
+ Drop our privileges to another user.
+ '''
+ if os.getuid() != 0:
+ return
+
+ pw = pwd.getpwnam(self.user)
+
+ # Change owner on our pid file so it can be removed by us after
+ # dropping privileges. May not exist if not a daemon.
+ if self.pid_file and os.path.exists(self.pid_file):
+ os.chown(self.pid_file, pw.pw_uid, pw.pw_gid)
+
+ os.setgroups([])
+ os.setgid(pw.pw_gid)
+ os.setuid(pw.pw_uid)
+ os.umask(0o022)
+
+ def server_bind(self):
+ '''
+ Overridden from the base class to allow address reuse and to drop
+ privileges after binding to the listening socket.
+ '''
+ self.allow_reuse_address = True
+ socketserver.ThreadingTCPServer.server_bind(self)
+ if self.user:
+ self.change_privs()
+
+ def server_close(self):
+ '''
+ Overridden from base class to shutdown the socket immediately.
+ '''
+ try:
+ self.socket.shutdown(socket.SHUT_RD)
+ self.socket.close()
+ except socket.error as e:
+ # If it's already closed, don't error.
+ if e.errno == socket.EBADF:
+ return
+ raise
+
+ def process_request(self, request, client_address):
+ '''
+ Overridden from the base class to name the thread.
+ '''
+ t = threading.Thread(target=self.process_request_thread,
+ name='socketserver_Thread',
+ args=(request, client_address))
+ t.daemon = self.daemon_threads
+ t.start()
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index d40505e..e5016df 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -303,8 +303,7 @@
def handle_key_get(self, job):
args = json.loads(job.arguments)
- source_name, project_name = args.get("source"), args.get("project")
- source = self.sched.connections.getSource(source_name)
- project = source.getProject(project_name)
+ tenant = self.sched.abide.tenants.get(args.get("tenant"))
+ (trusted, project) = tenant.getProject(args.get("project"))
job.sendWorkComplete(
encryption.serialize_rsa_public_key(project.public_key))
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 4885243..1c45092 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -42,17 +42,6 @@
def setEventLoop(self, event_loop):
self.event_loop = event_loop
- def _getPortLocation(self, job_uuid):
- """
- Query Gearman for the executor running the given job.
-
- :param str job_uuid: The job UUID we want to stream.
- """
- # TODO: Fetch the entire list of uuid/file/server/ports once and
- # share that, and fetch a new list on cache misses perhaps?
- ret = self.rpc.get_job_log_stream_address(job_uuid)
- return ret
-
async def _fingerClient(self, ws, server, port, job_uuid):
"""
Create a client to connect to the finger streamer and pull results.
@@ -94,7 +83,10 @@
# Schedule the blocking gearman work in an Executor
gear_task = self.event_loop.run_in_executor(
- None, self._getPortLocation, request['uuid'])
+ None,
+ self.rpc.get_job_log_stream_address,
+ request['uuid'],
+ )
try:
port_location = await asyncio.wait_for(gear_task, 10)
@@ -195,9 +187,9 @@
return resp
def key_get(self, request):
- source = request.match_info["source"]
+ tenant = request.match_info["tenant"]
project = request.match_info["project"]
- job = self.rpc.submitJob('zuul:key_get', {'source': source,
+ job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
@@ -378,7 +370,7 @@
('GET', '/{tenant}/status.json', self._handleStatusRequest),
('GET', '/{tenant}/jobs.json', self._handleJobsRequest),
('GET', '/{tenant}/console-stream', self._handleWebsocket),
- ('GET', '/{source}/{project}.pub', self._handleKeyRequest),
+ ('GET', '/{tenant}/{project}.pub', self._handleKeyRequest),
('GET', '/{tenant}/status.html', self._handleStaticRequest),
('GET', '/{tenant}/jobs.html', self._handleStaticRequest),
('GET', '/{tenant}/stream.html', self._handleStaticRequest),