blob: e9627385c50ca81f2909abb03095fc89ce24b80e [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
import os
import time
import uvloop
import aiohttp
from aiohttp import web
import zuul.rpcclient
from zuul.web.handler import StaticHandler
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
def __init__(self, rpc):
self.rpc = rpc
def setEventLoop(self, event_loop):
self.event_loop = event_loop
async def _fingerClient(self, ws, server, port, job_uuid):
"""
Create a client to connect to the finger streamer and pull results.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str job_uuid: The job UUID to stream.
"""
self.log.debug("Connecting to finger server %s:%s", server, port)
reader, writer = await asyncio.open_connection(host=server, port=port,
loop=self.event_loop)
self.log.debug("Sending finger request for %s", job_uuid)
msg = "%s\n" % job_uuid # Must have a trailing newline!
writer.write(msg.encode('utf8'))
await writer.drain()
while True:
data = await reader.read(1024)
if data:
await ws.send_str(data.decode('utf8'))
else:
writer.close()
return
async def _streamLog(self, ws, request):
"""
Stream the log for the requested job back to the client.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param dict request: The client request parameters.
"""
for key in ('uuid', 'logfile'):
if key not in request:
return (4000, "'{key}' missing from request payload".format(
key=key))
# Schedule the blocking gearman work in an Executor
gear_task = self.event_loop.run_in_executor(
None,
self.rpc.get_job_log_stream_address,
request['uuid'],
)
try:
port_location = await asyncio.wait_for(gear_task, 10)
except asyncio.TimeoutError:
return (4010, "Gearman timeout")
if not port_location:
return (4011, "Error with Gearman")
try:
await self._fingerClient(
ws, port_location['server'], port_location['port'],
request['uuid']
)
except Exception as e:
self.log.exception("Finger client exception:")
await ws.send_str("Failure from finger client: %s" % e)
return (1000, "No more data")
async def processRequest(self, request):
"""
Handle a client websocket request for log streaming.
:param aiohttp.web.Request request: The client request.
"""
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
req = json.loads(msg.data)
self.log.debug("Websocket request: %s", req)
code, msg = await self._streamLog(ws, req)
# We expect to process only a single message. I.e., we
# can stream only a single file at a time.
await ws.close(code=code, message=msg)
break
elif msg.type == aiohttp.WSMsgType.ERROR:
self.log.error(
"Websocket connection closed with exception %s",
ws.exception()
)
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
except asyncio.CancelledError:
self.log.debug("Websocket request handling cancelled")
pass
except Exception as e:
self.log.exception("Websocket exception:")
await ws.close(code=4009, message=str(e).encode('utf-8'))
return ws
class GearmanHandler(object):
log = logging.getLogger("zuul.web.GearmanHandler")
# Tenant status cache expiry
cache_expiry = 1
def __init__(self, rpc):
self.rpc = rpc
self.cache = {}
self.cache_time = {}
self.controllers = {
'tenant_list': self.tenant_list,
'status_get': self.status_get,
'job_list': self.job_list,
'key_get': self.key_get,
}
async def tenant_list(self, request):
job = self.rpc.submitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
async def status_get(self, request):
tenant = request.match_info["tenant"]
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
job = self.rpc.submitJob('zuul:status_get', {'tenant': tenant})
self.cache[tenant] = json.loads(job.data[0])
self.cache_time[tenant] = time.time()
resp = web.json_response(self.cache[tenant])
resp.headers['Access-Control-Allow-Origin'] = '*'
resp.headers["Cache-Control"] = "public, max-age=%d" % \
self.cache_expiry
resp.last_modified = self.cache_time[tenant]
return resp
async def job_list(self, request):
tenant = request.match_info["tenant"]
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
resp = web.json_response(json.loads(job.data[0]))
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
async def key_get(self, request):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
async def processRequest(self, request, action):
try:
resp = await self.controllers[action](request)
except asyncio.CancelledError:
self.log.debug("request handling cancelled")
except Exception as e:
self.log.exception("exception:")
resp = web.json_response({'error_description': 'Internal error'},
status=500)
return resp
class ZuulWeb(object):
log = logging.getLogger("zuul.web.ZuulWeb")
def __init__(self, listen_address, listen_port,
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None,
static_cache_expiry=3600,
connections=None):
self.listen_address = listen_address
self.listen_port = listen_port
self.event_loop = None
self.term = None
self.server = None
self.static_cache_expiry = static_cache_expiry
# instanciate handlers
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
self.gearman_handler = GearmanHandler(self.rpc)
self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler]
connections = connections or []
for connection in connections:
self._plugin_routes.extend(connection.getWebHandlers(self))
async def _handleWebsocket(self, request):
return await self.log_streaming_handler.processRequest(
request)
async def _handleTenantsRequest(self, request):
return await self.gearman_handler.processRequest(request,
'tenant_list')
async def _handleStatusRequest(self, request):
return await self.gearman_handler.processRequest(request, 'status_get')
async def _handleJobsRequest(self, request):
return await self.gearman_handler.processRequest(request, 'job_list')
async def _handleKeyRequest(self, request):
return await self.gearman_handler.processRequest(request, 'key_get')
def run(self, loop=None):
"""
Run the websocket daemon.
Because this method can be the target of a new thread, we need to
set the thread event loop here, rather than in __init__().
:param loop: The event loop to use. If not supplied, the default main
thread event loop is used. This should be supplied if ZuulWeb
is run within a separate (non-main) thread.
"""
routes = [
('GET', '/tenants.json', self._handleTenantsRequest),
('GET', '/{tenant}/status.json', self._handleStatusRequest),
('GET', '/{tenant}/jobs.json', self._handleJobsRequest),
('GET', '/{tenant}/console-stream', self._handleWebsocket),
('GET', '/{tenant}/{project:.*}.pub', self._handleKeyRequest),
]
static_routes = [
StaticHandler(self, '/{tenant}/status.html'),
StaticHandler(self, '/{tenant}/jobs.html'),
StaticHandler(self, '/{tenant}/stream.html'),
StaticHandler(self, '/tenants.html', 'index.html'),
StaticHandler(self, '/', 'index.html'),
]
for route in static_routes + self._plugin_routes:
routes.append((route.method, route.path, route.handleRequest))
self.log.debug("ZuulWeb starting")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
user_supplied_loop = loop is not None
if not loop:
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
self.event_loop = loop
self.log_streaming_handler.setEventLoop(loop)
app = web.Application()
for method, path, handler in routes:
app.router.add_route(method, path, handler)
app.router.add_static('/static', STATIC_DIR)
handler = app.make_handler(loop=self.event_loop)
# create the server
coro = self.event_loop.create_server(handler,
self.listen_address,
self.listen_port)
self.server = self.event_loop.run_until_complete(coro)
self.term = asyncio.Future()
# start the server
self.event_loop.run_until_complete(self.term)
# cleanup
self.log.debug("ZuulWeb stopping")
self.server.close()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.run_until_complete(app.shutdown())
self.event_loop.run_until_complete(handler.shutdown(60.0))
self.event_loop.run_until_complete(app.cleanup())
self.log.debug("ZuulWeb stopped")
# Only run these if we are controlling the loop - they need to be
# run from the main thread
if not user_supplied_loop:
loop.stop()
loop.close()
self.rpc.shutdown()
def stop(self):
if self.event_loop and self.term:
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
z = ZuulWeb(listen_address="127.0.0.1", listen_port=9000,
gear_server="127.0.0.1", gear_port=4730)
z.run(loop)