Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # Copyright (c) 2017 Red Hat |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 13 | # implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | |
| 18 | import asyncio |
| 19 | import json |
| 20 | import logging |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 21 | import os |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 22 | import uvloop |
| 23 | |
| 24 | import aiohttp |
| 25 | from aiohttp import web |
| 26 | |
| 27 | import zuul.rpcclient |
| 28 | |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 29 | STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') |
| 30 | |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 31 | |
| 32 | class LogStreamingHandler(object): |
| 33 | log = logging.getLogger("zuul.web.LogStreamingHandler") |
| 34 | |
Tristan Cacqueray | 41fa9ea | 2017-09-22 12:10:48 +0000 | [diff] [blame] | 35 | def __init__(self, rpc): |
| 36 | self.rpc = rpc |
| 37 | |
| 38 | def setEventLoop(self, event_loop): |
| 39 | self.event_loop = event_loop |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 40 | |
| 41 | def _getPortLocation(self, job_uuid): |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 42 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 43 | Query Gearman for the executor running the given job. |
| 44 | |
| 45 | :param str job_uuid: The job UUID we want to stream. |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 46 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 47 | # TODO: Fetch the entire list of uuid/file/server/ports once and |
| 48 | # share that, and fetch a new list on cache misses perhaps? |
Tristan Cacqueray | 41fa9ea | 2017-09-22 12:10:48 +0000 | [diff] [blame] | 49 | ret = self.rpc.get_job_log_stream_address(job_uuid) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 50 | return ret |
| 51 | |
| 52 | async def _fingerClient(self, ws, server, port, job_uuid): |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 53 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 54 | Create a client to connect to the finger streamer and pull results. |
| 55 | |
| 56 | :param aiohttp.web.WebSocketResponse ws: The websocket response object. |
| 57 | :param str server: The executor server running the job. |
| 58 | :param str port: The executor server port. |
| 59 | :param str job_uuid: The job UUID to stream. |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 60 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 61 | self.log.debug("Connecting to finger server %s:%s", server, port) |
| 62 | reader, writer = await asyncio.open_connection(host=server, port=port, |
| 63 | loop=self.event_loop) |
| 64 | |
| 65 | self.log.debug("Sending finger request for %s", job_uuid) |
| 66 | msg = "%s\n" % job_uuid # Must have a trailing newline! |
| 67 | |
| 68 | writer.write(msg.encode('utf8')) |
| 69 | await writer.drain() |
| 70 | |
| 71 | while True: |
| 72 | data = await reader.read(1024) |
| 73 | if data: |
| 74 | await ws.send_str(data.decode('utf8')) |
| 75 | else: |
| 76 | writer.close() |
| 77 | return |
| 78 | |
| 79 | async def _streamLog(self, ws, request): |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 80 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 81 | Stream the log for the requested job back to the client. |
| 82 | |
| 83 | :param aiohttp.web.WebSocketResponse ws: The websocket response object. |
| 84 | :param dict request: The client request parameters. |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 85 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 86 | for key in ('uuid', 'logfile'): |
| 87 | if key not in request: |
| 88 | return (4000, "'{key}' missing from request payload".format( |
| 89 | key=key)) |
| 90 | |
| 91 | # Schedule the blocking gearman work in an Executor |
| 92 | gear_task = self.event_loop.run_in_executor( |
| 93 | None, self._getPortLocation, request['uuid']) |
| 94 | |
| 95 | try: |
| 96 | port_location = await asyncio.wait_for(gear_task, 10) |
| 97 | except asyncio.TimeoutError: |
| 98 | return (4010, "Gearman timeout") |
| 99 | |
| 100 | if not port_location: |
| 101 | return (4011, "Error with Gearman") |
| 102 | |
David Shrewsbury | 4e22930 | 2017-09-26 15:47:48 -0400 | [diff] [blame] | 103 | try: |
| 104 | await self._fingerClient( |
| 105 | ws, port_location['server'], port_location['port'], |
| 106 | request['uuid'] |
| 107 | ) |
| 108 | except Exception as e: |
| 109 | self.log.exception("Finger client exception:") |
| 110 | msg = "Failure from finger client: %s" % e |
David Shrewsbury | 95bb2e8 | 2017-09-26 16:31:55 -0400 | [diff] [blame] | 111 | await ws.send_str(msg.decode('utf8')) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 112 | |
| 113 | return (1000, "No more data") |
| 114 | |
| 115 | async def processRequest(self, request): |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 116 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 117 | Handle a client websocket request for log streaming. |
| 118 | |
| 119 | :param aiohttp.web.Request request: The client request. |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 120 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 121 | try: |
| 122 | ws = web.WebSocketResponse() |
| 123 | await ws.prepare(request) |
| 124 | async for msg in ws: |
| 125 | if msg.type == aiohttp.WSMsgType.TEXT: |
| 126 | req = json.loads(msg.data) |
| 127 | self.log.debug("Websocket request: %s", req) |
| 128 | code, msg = await self._streamLog(ws, req) |
| 129 | |
| 130 | # We expect to process only a single message. I.e., we |
| 131 | # can stream only a single file at a time. |
| 132 | await ws.close(code=code, message=msg) |
| 133 | break |
| 134 | elif msg.type == aiohttp.WSMsgType.ERROR: |
| 135 | self.log.error( |
| 136 | "Websocket connection closed with exception %s", |
| 137 | ws.exception() |
| 138 | ) |
| 139 | break |
| 140 | elif msg.type == aiohttp.WSMsgType.CLOSED: |
| 141 | break |
David Shrewsbury | 0fa1cdd | 2017-07-11 09:18:32 -0400 | [diff] [blame] | 142 | except asyncio.CancelledError: |
| 143 | self.log.debug("Websocket request handling cancelled") |
| 144 | pass |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 145 | except Exception as e: |
| 146 | self.log.exception("Websocket exception:") |
| 147 | await ws.close(code=4009, message=str(e).encode('utf-8')) |
| 148 | return ws |
| 149 | |
| 150 | |
| 151 | class ZuulWeb(object): |
| 152 | |
| 153 | log = logging.getLogger("zuul.web.ZuulWeb") |
| 154 | |
| 155 | def __init__(self, listen_address, listen_port, |
| 156 | gear_server, gear_port, |
| 157 | ssl_key=None, ssl_cert=None, ssl_ca=None): |
| 158 | self.listen_address = listen_address |
| 159 | self.listen_port = listen_port |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 160 | self.event_loop = None |
| 161 | self.term = None |
Tristan Cacqueray | 41fa9ea | 2017-09-22 12:10:48 +0000 | [diff] [blame] | 162 | # instanciate handlers |
| 163 | self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port, |
| 164 | ssl_key, ssl_cert, ssl_ca) |
| 165 | self.log_streaming_handler = LogStreamingHandler(self.rpc) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 166 | |
| 167 | async def _handleWebsocket(self, request): |
Tristan Cacqueray | 41fa9ea | 2017-09-22 12:10:48 +0000 | [diff] [blame] | 168 | return await self.log_streaming_handler.processRequest( |
| 169 | request) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 170 | |
| 171 | def run(self, loop=None): |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 172 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 173 | Run the websocket daemon. |
| 174 | |
| 175 | Because this method can be the target of a new thread, we need to |
| 176 | set the thread event loop here, rather than in __init__(). |
| 177 | |
| 178 | :param loop: The event loop to use. If not supplied, the default main |
| 179 | thread event loop is used. This should be supplied if ZuulWeb |
| 180 | is run within a separate (non-main) thread. |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 181 | """ |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 182 | routes = [ |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 183 | ('GET', '/console-stream', self._handleWebsocket), |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 184 | ] |
| 185 | |
| 186 | self.log.debug("ZuulWeb starting") |
| 187 | asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
| 188 | user_supplied_loop = loop is not None |
| 189 | if not loop: |
| 190 | loop = asyncio.get_event_loop() |
| 191 | asyncio.set_event_loop(loop) |
| 192 | |
| 193 | self.event_loop = loop |
Tristan Cacqueray | 41fa9ea | 2017-09-22 12:10:48 +0000 | [diff] [blame] | 194 | self.log_streaming_handler.setEventLoop(loop) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 195 | |
| 196 | app = web.Application() |
| 197 | for method, path, handler in routes: |
| 198 | app.router.add_route(method, path, handler) |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 199 | app.router.add_static('/static', STATIC_DIR) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 200 | handler = app.make_handler(loop=self.event_loop) |
| 201 | |
| 202 | # create the server |
| 203 | coro = self.event_loop.create_server(handler, |
| 204 | self.listen_address, |
| 205 | self.listen_port) |
| 206 | self.server = self.event_loop.run_until_complete(coro) |
| 207 | |
| 208 | self.term = asyncio.Future() |
| 209 | |
| 210 | # start the server |
| 211 | self.event_loop.run_until_complete(self.term) |
| 212 | |
| 213 | # cleanup |
| 214 | self.log.debug("ZuulWeb stopping") |
| 215 | self.server.close() |
| 216 | self.event_loop.run_until_complete(self.server.wait_closed()) |
| 217 | self.event_loop.run_until_complete(app.shutdown()) |
| 218 | self.event_loop.run_until_complete(handler.shutdown(60.0)) |
| 219 | self.event_loop.run_until_complete(app.cleanup()) |
| 220 | self.log.debug("ZuulWeb stopped") |
| 221 | |
| 222 | # Only run these if we are controlling the loop - they need to be |
| 223 | # run from the main thread |
| 224 | if not user_supplied_loop: |
| 225 | loop.stop() |
| 226 | loop.close() |
| 227 | |
Tristan Cacqueray | 41fa9ea | 2017-09-22 12:10:48 +0000 | [diff] [blame] | 228 | self.rpc.shutdown() |
| 229 | |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 230 | def stop(self): |
Tobias Henkel | b4407fc | 2017-07-07 13:52:56 +0200 | [diff] [blame] | 231 | if self.event_loop and self.term: |
| 232 | self.event_loop.call_soon_threadsafe(self.term.set_result, True) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 233 | |
| 234 | |
| 235 | if __name__ == "__main__": |
| 236 | logging.basicConfig(level=logging.DEBUG) |
| 237 | loop = asyncio.get_event_loop() |
| 238 | loop.set_debug(True) |
David Shrewsbury | 0ee98fc | 2017-07-10 17:20:27 -0400 | [diff] [blame] | 239 | z = ZuulWeb(listen_address="127.0.0.1", listen_port=9000, |
| 240 | gear_server="127.0.0.1", gear_port=4730) |
Monty Taylor | 51139a0 | 2016-05-24 11:28:10 -0500 | [diff] [blame] | 241 | z.run(loop) |