| #!/usr/bin/env python |
| # Copyright (c) 2017 Red Hat |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import asyncio |
| import copy |
| import json |
| import logging |
| import os |
| import time |
| import uvloop |
| |
| import aiohttp |
| from aiohttp import web |
| |
| import zuul.model |
| import zuul.rpcclient |
| from zuul.web.handler import StaticHandler |
| |
| STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') |
| |
| |
| class LogStreamingHandler(object): |
| log = logging.getLogger("zuul.web.LogStreamingHandler") |
| |
| def __init__(self, rpc): |
| self.rpc = rpc |
| |
| def setEventLoop(self, event_loop): |
| self.event_loop = event_loop |
| |
| async def _fingerClient(self, ws, server, port, job_uuid): |
| """ |
| Create a client to connect to the finger streamer and pull results. |
| |
| :param aiohttp.web.WebSocketResponse ws: The websocket response object. |
| :param str server: The executor server running the job. |
| :param str port: The executor server port. |
| :param str job_uuid: The job UUID to stream. |
| """ |
| self.log.debug("Connecting to finger server %s:%s", server, port) |
| reader, writer = await asyncio.open_connection(host=server, port=port, |
| loop=self.event_loop) |
| |
| self.log.debug("Sending finger request for %s", job_uuid) |
| msg = "%s\n" % job_uuid # Must have a trailing newline! |
| |
| writer.write(msg.encode('utf8')) |
| await writer.drain() |
| |
| while True: |
| data = await reader.read(1024) |
| if data: |
| await ws.send_str(data.decode('utf8')) |
| else: |
| writer.close() |
| return |
| |
| async def _streamLog(self, ws, request): |
| """ |
| Stream the log for the requested job back to the client. |
| |
| :param aiohttp.web.WebSocketResponse ws: The websocket response object. |
| :param dict request: The client request parameters. |
| """ |
| for key in ('uuid', 'logfile'): |
| if key not in request: |
| return (4000, "'{key}' missing from request payload".format( |
| key=key)) |
| |
| # Schedule the blocking gearman work in an Executor |
| gear_task = self.event_loop.run_in_executor( |
| None, |
| self.rpc.get_job_log_stream_address, |
| request['uuid'], |
| ) |
| |
| try: |
| port_location = await asyncio.wait_for(gear_task, 10) |
| except asyncio.TimeoutError: |
| return (4010, "Gearman timeout") |
| |
| if not port_location: |
| return (4011, "Error with Gearman") |
| |
| try: |
| await self._fingerClient( |
| ws, port_location['server'], port_location['port'], |
| request['uuid'] |
| ) |
| except Exception as e: |
| self.log.exception("Finger client exception:") |
| await ws.send_str("Failure from finger client: %s" % e) |
| |
| return (1000, "No more data") |
| |
| async def processRequest(self, request): |
| """ |
| Handle a client websocket request for log streaming. |
| |
| :param aiohttp.web.Request request: The client request. |
| """ |
| try: |
| ws = web.WebSocketResponse() |
| await ws.prepare(request) |
| async for msg in ws: |
| if msg.type == aiohttp.WSMsgType.TEXT: |
| req = json.loads(msg.data) |
| self.log.debug("Websocket request: %s", req) |
| code, msg = await self._streamLog(ws, req) |
| |
| # We expect to process only a single message. I.e., we |
| # can stream only a single file at a time. |
| await ws.close(code=code, message=msg) |
| break |
| elif msg.type == aiohttp.WSMsgType.ERROR: |
| self.log.error( |
| "Websocket connection closed with exception %s", |
| ws.exception() |
| ) |
| break |
| elif msg.type == aiohttp.WSMsgType.CLOSED: |
| break |
| except asyncio.CancelledError: |
| self.log.debug("Websocket request handling cancelled") |
| pass |
| except Exception as e: |
| self.log.exception("Websocket exception:") |
| await ws.close(code=4009, message=str(e).encode('utf-8')) |
| return ws |
| |
| |
| class GearmanHandler(object): |
| log = logging.getLogger("zuul.web.GearmanHandler") |
| |
| # Tenant status cache expiry |
| cache_expiry = 1 |
| |
| def __init__(self, rpc): |
| self.rpc = rpc |
| self.cache = {} |
| self.cache_time = {} |
| self.controllers = { |
| 'tenant_list': self.tenant_list, |
| 'status_get': self.status_get, |
| 'job_list': self.job_list, |
| 'key_get': self.key_get, |
| } |
| |
| async def tenant_list(self, request, result_filter=None): |
| job = self.rpc.submitJob('zuul:tenant_list', {}) |
| return web.json_response(json.loads(job.data[0])) |
| |
| async def status_get(self, request, result_filter=None): |
| tenant = request.match_info["tenant"] |
| if tenant not in self.cache or \ |
| (time.time() - self.cache_time[tenant]) > self.cache_expiry: |
| job = self.rpc.submitJob('zuul:status_get', {'tenant': tenant}) |
| self.cache[tenant] = json.loads(job.data[0]) |
| self.cache_time[tenant] = time.time() |
| payload = self.cache[tenant] |
| if payload.get('code') == 404: |
| return web.HTTPNotFound(reason=payload['message']) |
| if result_filter: |
| payload = result_filter.filterPayload(payload) |
| resp = web.json_response(payload) |
| resp.headers['Access-Control-Allow-Origin'] = '*' |
| resp.headers["Cache-Control"] = "public, max-age=%d" % \ |
| self.cache_expiry |
| resp.last_modified = self.cache_time[tenant] |
| return resp |
| |
| async def job_list(self, request, result_filter=None): |
| tenant = request.match_info["tenant"] |
| job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant}) |
| resp = web.json_response(json.loads(job.data[0])) |
| resp.headers['Access-Control-Allow-Origin'] = '*' |
| return resp |
| |
| async def key_get(self, request, result_filter=None): |
| tenant = request.match_info["tenant"] |
| project = request.match_info["project"] |
| job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant, |
| 'project': project}) |
| return web.Response(body=job.data[0]) |
| |
| async def processRequest(self, request, action, result_filter=None): |
| resp = None |
| try: |
| resp = await self.controllers[action](request, result_filter) |
| except asyncio.CancelledError: |
| self.log.debug("request handling cancelled") |
| except Exception as e: |
| self.log.exception("exception:") |
| resp = web.json_response({'error_description': 'Internal error'}, |
| status=500) |
| return resp |
| |
| |
| class ChangeFilter(object): |
| def __init__(self, desired): |
| self.desired = desired |
| |
| def filterPayload(self, payload): |
| status = [] |
| for pipeline in payload['pipelines']: |
| for change_queue in pipeline['change_queues']: |
| for head in change_queue['heads']: |
| for change in head: |
| if self.wantChange(change): |
| status.append(copy.deepcopy(change)) |
| return status |
| |
| def wantChange(self, change): |
| return change['id'] == self.desired |
| |
| |
| class ZuulWeb(object): |
| |
| log = logging.getLogger("zuul.web.ZuulWeb") |
| |
| def __init__(self, listen_address, listen_port, |
| gear_server, gear_port, |
| ssl_key=None, ssl_cert=None, ssl_ca=None, |
| static_cache_expiry=3600, |
| connections=None, |
| info=None): |
| self.start_time = time.time() |
| self.listen_address = listen_address |
| self.listen_port = listen_port |
| self.event_loop = None |
| self.term = None |
| self.server = None |
| self.static_cache_expiry = static_cache_expiry |
| self.info = info |
| # instanciate handlers |
| self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port, |
| ssl_key, ssl_cert, ssl_ca) |
| self.log_streaming_handler = LogStreamingHandler(self.rpc) |
| self.gearman_handler = GearmanHandler(self.rpc) |
| self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler] |
| connections = connections or [] |
| for connection in connections: |
| self._plugin_routes.extend( |
| connection.getWebHandlers(self, self.info)) |
| |
| async def _handleWebsocket(self, request): |
| return await self.log_streaming_handler.processRequest( |
| request) |
| |
| def _handleRootInfo(self, request): |
| return self._handleInfo(self.info) |
| |
| def _handleTenantInfo(self, request): |
| info = self.info.copy() |
| info.tenant = request.match_info["tenant"] |
| return self._handleInfo(info) |
| |
| def _handleInfo(self, info): |
| resp = web.json_response({'info': info.toDict()}, status=200) |
| resp.headers['Access-Control-Allow-Origin'] = '*' |
| if self.static_cache_expiry: |
| resp.headers['Cache-Control'] = "public, max-age=%d" % \ |
| self.static_cache_expiry |
| resp.last_modified = self.start_time |
| return resp |
| |
| async def _handleTenantsRequest(self, request): |
| return await self.gearman_handler.processRequest(request, |
| 'tenant_list') |
| |
| async def _handleStatusRequest(self, request): |
| return await self.gearman_handler.processRequest(request, 'status_get') |
| |
| async def _handleStatusChangeRequest(self, request): |
| change = request.match_info["change"] |
| return await self.gearman_handler.processRequest( |
| request, 'status_get', ChangeFilter(change)) |
| |
| async def _handleJobsRequest(self, request): |
| return await self.gearman_handler.processRequest(request, 'job_list') |
| |
| async def _handleKeyRequest(self, request): |
| return await self.gearman_handler.processRequest(request, 'key_get') |
| |
| def run(self, loop=None): |
| """ |
| Run the websocket daemon. |
| |
| Because this method can be the target of a new thread, we need to |
| set the thread event loop here, rather than in __init__(). |
| |
| :param loop: The event loop to use. If not supplied, the default main |
| thread event loop is used. This should be supplied if ZuulWeb |
| is run within a separate (non-main) thread. |
| """ |
| routes = [ |
| ('GET', '/info', self._handleRootInfo), |
| ('GET', '/{tenant}/info', self._handleTenantInfo), |
| ('GET', '/tenants', self._handleTenantsRequest), |
| ('GET', '/{tenant}/status', self._handleStatusRequest), |
| ('GET', '/{tenant}/jobs', self._handleJobsRequest), |
| ('GET', '/{tenant}/status/change/{change}', |
| self._handleStatusChangeRequest), |
| ('GET', '/{tenant}/console-stream', self._handleWebsocket), |
| ('GET', '/{tenant}/{project:.*}.pub', self._handleKeyRequest), |
| ] |
| |
| static_routes = [ |
| StaticHandler(self, '/{tenant}/status.html'), |
| StaticHandler(self, '/{tenant}/jobs.html'), |
| StaticHandler(self, '/{tenant}/stream.html'), |
| StaticHandler(self, '/tenants.html', 'index.html'), |
| StaticHandler(self, '/', 'index.html'), |
| ] |
| |
| for route in static_routes + self._plugin_routes: |
| routes.append((route.method, route.path, route.handleRequest)) |
| |
| self.log.debug("ZuulWeb starting") |
| asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
| user_supplied_loop = loop is not None |
| if not loop: |
| loop = asyncio.get_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| self.event_loop = loop |
| self.log_streaming_handler.setEventLoop(loop) |
| |
| app = web.Application() |
| for method, path, handler in routes: |
| app.router.add_route(method, path, handler) |
| app.router.add_static('/static', STATIC_DIR) |
| handler = app.make_handler(loop=self.event_loop) |
| |
| # create the server |
| coro = self.event_loop.create_server(handler, |
| self.listen_address, |
| self.listen_port) |
| self.server = self.event_loop.run_until_complete(coro) |
| |
| self.term = asyncio.Future() |
| |
| # start the server |
| self.event_loop.run_until_complete(self.term) |
| |
| # cleanup |
| self.log.debug("ZuulWeb stopping") |
| self.server.close() |
| self.event_loop.run_until_complete(self.server.wait_closed()) |
| self.event_loop.run_until_complete(app.shutdown()) |
| self.event_loop.run_until_complete(handler.shutdown(60.0)) |
| self.event_loop.run_until_complete(app.cleanup()) |
| self.log.debug("ZuulWeb stopped") |
| |
| # Only run these if we are controlling the loop - they need to be |
| # run from the main thread |
| if not user_supplied_loop: |
| loop.stop() |
| loop.close() |
| |
| self.rpc.shutdown() |
| |
| def stop(self): |
| if self.event_loop and self.term: |
| self.event_loop.call_soon_threadsafe(self.term.set_result, True) |
| |
| |
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.DEBUG) |
| loop = asyncio.get_event_loop() |
| loop.set_debug(True) |
| z = ZuulWeb(listen_address="127.0.0.1", listen_port=9000, |
| gear_server="127.0.0.1", gear_port=4730) |
| z.run(loop) |