blob: 89f5efe20ae0fed2248df1c97de989af3eb51d6a [file] [log] [blame]
Monty Taylor51139a02016-05-24 11:28:10 -05001#!/usr/bin/env python
2# Copyright (c) 2017 Red Hat
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17
18import asyncio
19import json
20import logging
Tobias Henkelb4407fc2017-07-07 13:52:56 +020021import os
Monty Taylor51139a02016-05-24 11:28:10 -050022import uvloop
23
24import aiohttp
25from aiohttp import web
26
27import zuul.rpcclient
28
Tobias Henkelb4407fc2017-07-07 13:52:56 +020029STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
30
Monty Taylor51139a02016-05-24 11:28:10 -050031
32class LogStreamingHandler(object):
33 log = logging.getLogger("zuul.web.LogStreamingHandler")
34
Tristan Cacqueray41fa9ea2017-09-22 12:10:48 +000035 def __init__(self, rpc):
36 self.rpc = rpc
37
38 def setEventLoop(self, event_loop):
39 self.event_loop = event_loop
Monty Taylor51139a02016-05-24 11:28:10 -050040
41 def _getPortLocation(self, job_uuid):
Tobias Henkelb4407fc2017-07-07 13:52:56 +020042 """
Monty Taylor51139a02016-05-24 11:28:10 -050043 Query Gearman for the executor running the given job.
44
45 :param str job_uuid: The job UUID we want to stream.
Tobias Henkelb4407fc2017-07-07 13:52:56 +020046 """
Monty Taylor51139a02016-05-24 11:28:10 -050047 # TODO: Fetch the entire list of uuid/file/server/ports once and
48 # share that, and fetch a new list on cache misses perhaps?
Tristan Cacqueray41fa9ea2017-09-22 12:10:48 +000049 ret = self.rpc.get_job_log_stream_address(job_uuid)
Monty Taylor51139a02016-05-24 11:28:10 -050050 return ret
51
52 async def _fingerClient(self, ws, server, port, job_uuid):
Tobias Henkelb4407fc2017-07-07 13:52:56 +020053 """
Monty Taylor51139a02016-05-24 11:28:10 -050054 Create a client to connect to the finger streamer and pull results.
55
56 :param aiohttp.web.WebSocketResponse ws: The websocket response object.
57 :param str server: The executor server running the job.
58 :param str port: The executor server port.
59 :param str job_uuid: The job UUID to stream.
Tobias Henkelb4407fc2017-07-07 13:52:56 +020060 """
Monty Taylor51139a02016-05-24 11:28:10 -050061 self.log.debug("Connecting to finger server %s:%s", server, port)
62 reader, writer = await asyncio.open_connection(host=server, port=port,
63 loop=self.event_loop)
64
65 self.log.debug("Sending finger request for %s", job_uuid)
66 msg = "%s\n" % job_uuid # Must have a trailing newline!
67
68 writer.write(msg.encode('utf8'))
69 await writer.drain()
70
71 while True:
72 data = await reader.read(1024)
73 if data:
74 await ws.send_str(data.decode('utf8'))
75 else:
76 writer.close()
77 return
78
79 async def _streamLog(self, ws, request):
Tobias Henkelb4407fc2017-07-07 13:52:56 +020080 """
Monty Taylor51139a02016-05-24 11:28:10 -050081 Stream the log for the requested job back to the client.
82
83 :param aiohttp.web.WebSocketResponse ws: The websocket response object.
84 :param dict request: The client request parameters.
Tobias Henkelb4407fc2017-07-07 13:52:56 +020085 """
Monty Taylor51139a02016-05-24 11:28:10 -050086 for key in ('uuid', 'logfile'):
87 if key not in request:
88 return (4000, "'{key}' missing from request payload".format(
89 key=key))
90
91 # Schedule the blocking gearman work in an Executor
92 gear_task = self.event_loop.run_in_executor(
93 None, self._getPortLocation, request['uuid'])
94
95 try:
96 port_location = await asyncio.wait_for(gear_task, 10)
97 except asyncio.TimeoutError:
98 return (4010, "Gearman timeout")
99
100 if not port_location:
101 return (4011, "Error with Gearman")
102
David Shrewsbury4e229302017-09-26 15:47:48 -0400103 try:
104 await self._fingerClient(
105 ws, port_location['server'], port_location['port'],
106 request['uuid']
107 )
108 except Exception as e:
109 self.log.exception("Finger client exception:")
110 msg = "Failure from finger client: %s" % e
David Shrewsbury95bb2e82017-09-26 16:31:55 -0400111 await ws.send_str(msg.decode('utf8'))
Monty Taylor51139a02016-05-24 11:28:10 -0500112
113 return (1000, "No more data")
114
115 async def processRequest(self, request):
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200116 """
Monty Taylor51139a02016-05-24 11:28:10 -0500117 Handle a client websocket request for log streaming.
118
119 :param aiohttp.web.Request request: The client request.
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200120 """
Monty Taylor51139a02016-05-24 11:28:10 -0500121 try:
122 ws = web.WebSocketResponse()
123 await ws.prepare(request)
124 async for msg in ws:
125 if msg.type == aiohttp.WSMsgType.TEXT:
126 req = json.loads(msg.data)
127 self.log.debug("Websocket request: %s", req)
128 code, msg = await self._streamLog(ws, req)
129
130 # We expect to process only a single message. I.e., we
131 # can stream only a single file at a time.
132 await ws.close(code=code, message=msg)
133 break
134 elif msg.type == aiohttp.WSMsgType.ERROR:
135 self.log.error(
136 "Websocket connection closed with exception %s",
137 ws.exception()
138 )
139 break
140 elif msg.type == aiohttp.WSMsgType.CLOSED:
141 break
David Shrewsbury0fa1cdd2017-07-11 09:18:32 -0400142 except asyncio.CancelledError:
143 self.log.debug("Websocket request handling cancelled")
144 pass
Monty Taylor51139a02016-05-24 11:28:10 -0500145 except Exception as e:
146 self.log.exception("Websocket exception:")
147 await ws.close(code=4009, message=str(e).encode('utf-8'))
148 return ws
149
150
151class ZuulWeb(object):
152
153 log = logging.getLogger("zuul.web.ZuulWeb")
154
155 def __init__(self, listen_address, listen_port,
156 gear_server, gear_port,
157 ssl_key=None, ssl_cert=None, ssl_ca=None):
158 self.listen_address = listen_address
159 self.listen_port = listen_port
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200160 self.event_loop = None
161 self.term = None
Tristan Cacqueray41fa9ea2017-09-22 12:10:48 +0000162 # instanciate handlers
163 self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
164 ssl_key, ssl_cert, ssl_ca)
165 self.log_streaming_handler = LogStreamingHandler(self.rpc)
Monty Taylor51139a02016-05-24 11:28:10 -0500166
167 async def _handleWebsocket(self, request):
Tristan Cacqueray41fa9ea2017-09-22 12:10:48 +0000168 return await self.log_streaming_handler.processRequest(
169 request)
Monty Taylor51139a02016-05-24 11:28:10 -0500170
171 def run(self, loop=None):
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200172 """
Monty Taylor51139a02016-05-24 11:28:10 -0500173 Run the websocket daemon.
174
175 Because this method can be the target of a new thread, we need to
176 set the thread event loop here, rather than in __init__().
177
178 :param loop: The event loop to use. If not supplied, the default main
179 thread event loop is used. This should be supplied if ZuulWeb
180 is run within a separate (non-main) thread.
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200181 """
Monty Taylor51139a02016-05-24 11:28:10 -0500182 routes = [
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200183 ('GET', '/console-stream', self._handleWebsocket),
Monty Taylor51139a02016-05-24 11:28:10 -0500184 ]
185
186 self.log.debug("ZuulWeb starting")
187 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
188 user_supplied_loop = loop is not None
189 if not loop:
190 loop = asyncio.get_event_loop()
191 asyncio.set_event_loop(loop)
192
193 self.event_loop = loop
Tristan Cacqueray41fa9ea2017-09-22 12:10:48 +0000194 self.log_streaming_handler.setEventLoop(loop)
Monty Taylor51139a02016-05-24 11:28:10 -0500195
196 app = web.Application()
197 for method, path, handler in routes:
198 app.router.add_route(method, path, handler)
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200199 app.router.add_static('/static', STATIC_DIR)
Monty Taylor51139a02016-05-24 11:28:10 -0500200 handler = app.make_handler(loop=self.event_loop)
201
202 # create the server
203 coro = self.event_loop.create_server(handler,
204 self.listen_address,
205 self.listen_port)
206 self.server = self.event_loop.run_until_complete(coro)
207
208 self.term = asyncio.Future()
209
210 # start the server
211 self.event_loop.run_until_complete(self.term)
212
213 # cleanup
214 self.log.debug("ZuulWeb stopping")
215 self.server.close()
216 self.event_loop.run_until_complete(self.server.wait_closed())
217 self.event_loop.run_until_complete(app.shutdown())
218 self.event_loop.run_until_complete(handler.shutdown(60.0))
219 self.event_loop.run_until_complete(app.cleanup())
220 self.log.debug("ZuulWeb stopped")
221
222 # Only run these if we are controlling the loop - they need to be
223 # run from the main thread
224 if not user_supplied_loop:
225 loop.stop()
226 loop.close()
227
Tristan Cacqueray41fa9ea2017-09-22 12:10:48 +0000228 self.rpc.shutdown()
229
Monty Taylor51139a02016-05-24 11:28:10 -0500230 def stop(self):
Tobias Henkelb4407fc2017-07-07 13:52:56 +0200231 if self.event_loop and self.term:
232 self.event_loop.call_soon_threadsafe(self.term.set_result, True)
Monty Taylor51139a02016-05-24 11:28:10 -0500233
234
235if __name__ == "__main__":
236 logging.basicConfig(level=logging.DEBUG)
237 loop = asyncio.get_event_loop()
238 loop.set_debug(True)
David Shrewsbury0ee98fc2017-07-10 17:20:27 -0400239 z = ZuulWeb(listen_address="127.0.0.1", listen_port=9000,
240 gear_server="127.0.0.1", gear_port=4730)
Monty Taylor51139a02016-05-24 11:28:10 -0500241 z.run(loop)