Merge "zuul-web: refactor LogStreamingHandler to keep a single rpcclient" into feature/zuulv3
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
old mode 100644
new mode 100755
index faf22b5..308c0c9
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -32,14 +32,11 @@
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
- def __init__(self, loop, gear_server, gear_port,
- ssl_key=None, ssl_cert=None, ssl_ca=None):
- self.event_loop = loop
- self.gear_server = gear_server
- self.gear_port = gear_port
- self.ssl_key = ssl_key
- self.ssl_cert = ssl_cert
- self.ssl_ca = ssl_ca
+ def __init__(self, rpc):
+ self.rpc = rpc
+
+ def setEventLoop(self, event_loop):
+ self.event_loop = event_loop
def _getPortLocation(self, job_uuid):
"""
@@ -49,12 +46,7 @@
"""
# TODO: Fetch the entire list of uuid/file/server/ports once and
# share that, and fetch a new list on cache misses perhaps?
- # TODO: Avoid recreating a client for each request.
- rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
- self.ssl_key, self.ssl_cert,
- self.ssl_ca)
- ret = rpc.get_job_log_stream_address(job_uuid)
- rpc.shutdown()
+ ret = self.rpc.get_job_log_stream_address(job_uuid)
return ret
async def _fingerClient(self, ws, server, port, job_uuid):
@@ -159,19 +151,16 @@
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.listen_address = listen_address
self.listen_port = listen_port
- self.gear_server = gear_server
- self.gear_port = gear_port
- self.ssl_key = ssl_key
- self.ssl_cert = ssl_cert
- self.ssl_ca = ssl_ca
self.event_loop = None
self.term = None
+ # instanciate handlers
+ self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
+ ssl_key, ssl_cert, ssl_ca)
+ self.log_streaming_handler = LogStreamingHandler(self.rpc)
async def _handleWebsocket(self, request):
- handler = LogStreamingHandler(self.event_loop,
- self.gear_server, self.gear_port,
- self.ssl_key, self.ssl_cert, self.ssl_ca)
- return await handler.processRequest(request)
+ return await self.log_streaming_handler.processRequest(
+ request)
def run(self, loop=None):
"""
@@ -196,6 +185,7 @@
asyncio.set_event_loop(loop)
self.event_loop = loop
+ self.log_streaming_handler.setEventLoop(loop)
app = web.Application()
for method, path, handler in routes:
@@ -229,6 +219,8 @@
loop.stop()
loop.close()
+ self.rpc.shutdown()
+
def stop(self):
if self.event_loop and self.term:
self.event_loop.call_soon_threadsafe(self.term.set_result, True)