blob: cce7d1b3d21271b867776396c7654af1592e2597 [file] [log] [blame]
David Shrewsburyeb856472017-04-13 14:23:04 -04001#!/usr/bin/env python
2
3# Copyright 2017 Red Hat, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Monty Taylor51139a02016-05-24 11:28:10 -050017import aiohttp
18import asyncio
19import logging
20import json
David Shrewsbury21454182017-06-05 14:15:18 -040021import os
22import os.path
David Shrewsburyeb856472017-04-13 14:23:04 -040023import socket
24import tempfile
Monty Taylor211883d2017-09-06 08:40:47 -050025import testtools
David Shrewsbury21454182017-06-05 14:15:18 -040026import threading
27import time
David Shrewsburyeb856472017-04-13 14:23:04 -040028
Monty Taylor51139a02016-05-24 11:28:10 -050029import zuul.web
David Shrewsburyeb856472017-04-13 14:23:04 -040030import zuul.lib.log_streamer
David Shrewsburyfe1f1942017-12-04 13:57:46 -050031import zuul.lib.fingergw
David Shrewsburyeb856472017-04-13 14:23:04 -040032import tests.base
33
34
35class TestLogStreamer(tests.base.BaseTestCase):
36
David Shrewsburyeb856472017-04-13 14:23:04 -040037 def setUp(self):
38 super(TestLogStreamer, self).setUp()
David Shrewsburyd664da42017-09-08 14:45:20 -040039 self.host = '::'
David Shrewsburyeb856472017-04-13 14:23:04 -040040
41 def startStreamer(self, port, root=None):
42 if not root:
43 root = tempfile.gettempdir()
David Shrewsbury93eb56d2018-01-10 11:50:03 -050044 return zuul.lib.log_streamer.LogStreamer(self.host, port, root)
David Shrewsburyeb856472017-04-13 14:23:04 -040045
46 def test_start_stop(self):
David Shrewsbury93eb56d2018-01-10 11:50:03 -050047 streamer = self.startStreamer(0)
David Shrewsburyeb856472017-04-13 14:23:04 -040048 self.addCleanup(streamer.stop)
49
David Shrewsbury93eb56d2018-01-10 11:50:03 -050050 port = streamer.server.socket.getsockname()[1]
Monty Taylor211883d2017-09-06 08:40:47 -050051 s = socket.create_connection((self.host, port))
David Shrewsburyeb856472017-04-13 14:23:04 -040052 s.close()
53
54 streamer.stop()
55
Monty Taylor211883d2017-09-06 08:40:47 -050056 with testtools.ExpectedException(ConnectionRefusedError):
57 s = socket.create_connection((self.host, port))
David Shrewsburyeb856472017-04-13 14:23:04 -040058 s.close()
David Shrewsbury21454182017-06-05 14:15:18 -040059
60
61class TestStreaming(tests.base.AnsibleZuulTestCase):
62
63 tenant_config_file = 'config/streamer/main.yaml'
David Shrewsburyfe1f1942017-12-04 13:57:46 -050064 log = logging.getLogger("zuul.test_streaming")
David Shrewsbury21454182017-06-05 14:15:18 -040065
66 def setUp(self):
67 super(TestStreaming, self).setUp()
David Shrewsburyd664da42017-09-08 14:45:20 -040068 self.host = '::'
David Shrewsbury21454182017-06-05 14:15:18 -040069 self.streamer = None
70 self.stop_streamer = False
71 self.streaming_data = ''
David Shrewsburyf8c73c32017-06-19 15:41:22 -040072 self.test_streaming_event = threading.Event()
David Shrewsbury21454182017-06-05 14:15:18 -040073
74 def stopStreamer(self):
75 self.stop_streamer = True
76
77 def startStreamer(self, port, build_uuid, root=None):
78 if not root:
79 root = tempfile.gettempdir()
David Shrewsbury93eb56d2018-01-10 11:50:03 -050080 self.streamer = zuul.lib.log_streamer.LogStreamer(self.host,
David Shrewsbury21454182017-06-05 14:15:18 -040081 port, root)
David Shrewsbury93eb56d2018-01-10 11:50:03 -050082 port = self.streamer.server.socket.getsockname()[1]
Monty Taylor211883d2017-09-06 08:40:47 -050083 s = socket.create_connection((self.host, port))
David Shrewsbury21454182017-06-05 14:15:18 -040084 self.addCleanup(s.close)
85
David Shrewsbury79a66dd2018-01-03 12:08:30 -050086 req = '%s\r\n' % build_uuid
David Shrewsbury21454182017-06-05 14:15:18 -040087 s.sendall(req.encode('utf-8'))
David Shrewsburyf8c73c32017-06-19 15:41:22 -040088 self.test_streaming_event.set()
David Shrewsbury21454182017-06-05 14:15:18 -040089
90 while not self.stop_streamer:
91 data = s.recv(2048)
92 if not data:
93 break
94 self.streaming_data += data.decode('utf-8')
95
96 s.shutdown(socket.SHUT_RDWR)
97 s.close()
98 self.streamer.stop()
99
100 def test_streaming(self):
101 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
102 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
103
104 # We don't have any real synchronization for the ansible jobs, so
105 # just wait until we get our running build.
106 while not len(self.builds):
107 time.sleep(0.1)
108 build = self.builds[0]
109 self.assertEqual(build.name, 'python27')
110
111 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
112 while not os.path.exists(build_dir):
113 time.sleep(0.1)
114
115 # Need to wait to make sure that jobdir gets set
116 while build.jobdir is None:
117 time.sleep(0.1)
118 build = self.builds[0]
119
120 # Wait for the job to begin running and create the ansible log file.
121 # The job waits to complete until the flag file exists, so we can
122 # safely access the log here. We only open it (to force a file handle
123 # to be kept open for it after the job finishes) but wait to read the
124 # contents until the job is done.
125 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
126 while not os.path.exists(ansible_log):
127 time.sleep(0.1)
128 logfile = open(ansible_log, 'r')
129 self.addCleanup(logfile.close)
130
131 # Create a thread to stream the log. We need this to be happening
132 # before we create the flag file to tell the job to complete.
David Shrewsbury21454182017-06-05 14:15:18 -0400133 streamer_thread = threading.Thread(
134 target=self.startStreamer,
David Shrewsbury93eb56d2018-01-10 11:50:03 -0500135 args=(0, build.uuid, self.executor_server.jobdir_root,)
David Shrewsbury21454182017-06-05 14:15:18 -0400136 )
137 streamer_thread.start()
138 self.addCleanup(self.stopStreamer)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400139 self.test_streaming_event.wait()
David Shrewsbury21454182017-06-05 14:15:18 -0400140
141 # Allow the job to complete, which should close the streaming
142 # connection (and terminate the thread) as well since the log file
143 # gets closed/deleted.
144 flag_file = os.path.join(build_dir, 'test_wait')
145 open(flag_file, 'w').close()
146 self.waitUntilSettled()
147 streamer_thread.join()
148
149 # Now that the job is finished, the log file has been closed by the
150 # job and deleted. However, we still have a file handle to it, so we
151 # can make sure that we read the entire contents at this point.
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400152 # Compact the returned lines into a single string for easy comparison.
Monty Taylor51139a02016-05-24 11:28:10 -0500153 file_contents = logfile.read()
David Shrewsbury21454182017-06-05 14:15:18 -0400154 logfile.close()
155
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400156 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
David Shrewsbury21454182017-06-05 14:15:18 -0400157 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400158 self.assertEqual(file_contents, self.streaming_data)
Monty Taylor51139a02016-05-24 11:28:10 -0500159
160 def runWSClient(self, build_uuid, event):
161 async def client(loop, build_uuid, event):
Tristan Cacqueraya4996f12017-09-20 01:00:28 +0000162 uri = 'http://[::1]:9000/tenant-one/console-stream'
Monty Taylor51139a02016-05-24 11:28:10 -0500163 try:
164 session = aiohttp.ClientSession(loop=loop)
165 async with session.ws_connect(uri) as ws:
166 req = {'uuid': build_uuid, 'logfile': None}
167 ws.send_str(json.dumps(req))
168 event.set() # notify we are connected and req sent
169 async for msg in ws:
170 if msg.type == aiohttp.WSMsgType.TEXT:
171 self.ws_client_results += msg.data
172 elif msg.type == aiohttp.WSMsgType.CLOSED:
173 break
174 elif msg.type == aiohttp.WSMsgType.ERROR:
175 break
176 session.close()
177 except Exception as e:
178 self.log.exception("client exception:")
179
180 loop = asyncio.new_event_loop()
181 loop.set_debug(True)
182 loop.run_until_complete(client(loop, build_uuid, event))
183 loop.close()
184
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500185 def runFingerClient(self, build_uuid, gateway_address, event):
186 # Wait until the gateway is started
187 while True:
188 try:
189 # NOTE(Shrews): This causes the gateway to begin to handle
190 # a request for which it never receives data, and thus
191 # causes the getCommand() method to timeout (seen in the
192 # test results, but is harmless).
193 with socket.create_connection(gateway_address) as s:
194 break
195 except ConnectionRefusedError:
196 time.sleep(0.1)
197
198 with socket.create_connection(gateway_address) as s:
David Shrewsbury79a66dd2018-01-03 12:08:30 -0500199 msg = "%s\r\n" % build_uuid
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500200 s.sendall(msg.encode('utf-8'))
201 event.set() # notify we are connected and req sent
202 while True:
203 data = s.recv(1024)
204 if not data:
205 break
206 self.streaming_data += data.decode('utf-8')
207 s.shutdown(socket.SHUT_RDWR)
208
Monty Taylor51139a02016-05-24 11:28:10 -0500209 def test_websocket_streaming(self):
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500210 # Start the finger streamer daemon
211 streamer = zuul.lib.log_streamer.LogStreamer(
David Shrewsbury93eb56d2018-01-10 11:50:03 -0500212 self.host, 0, self.executor_server.jobdir_root)
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500213 self.addCleanup(streamer.stop)
214
Monty Taylor51139a02016-05-24 11:28:10 -0500215 # Need to set the streaming port before submitting the job
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500216 finger_port = streamer.server.socket.getsockname()[1]
Monty Taylor51139a02016-05-24 11:28:10 -0500217 self.executor_server.log_streaming_port = finger_port
218
219 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
220 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
221
222 # We don't have any real synchronization for the ansible jobs, so
223 # just wait until we get our running build.
224 while not len(self.builds):
225 time.sleep(0.1)
226 build = self.builds[0]
227 self.assertEqual(build.name, 'python27')
228
229 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
230 while not os.path.exists(build_dir):
231 time.sleep(0.1)
232
233 # Need to wait to make sure that jobdir gets set
234 while build.jobdir is None:
235 time.sleep(0.1)
236 build = self.builds[0]
237
238 # Wait for the job to begin running and create the ansible log file.
239 # The job waits to complete until the flag file exists, so we can
240 # safely access the log here. We only open it (to force a file handle
241 # to be kept open for it after the job finishes) but wait to read the
242 # contents until the job is done.
243 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
244 while not os.path.exists(ansible_log):
245 time.sleep(0.1)
246 logfile = open(ansible_log, 'r')
247 self.addCleanup(logfile.close)
248
Monty Taylor51139a02016-05-24 11:28:10 -0500249 # Start the web server
250 web_server = zuul.web.ZuulWeb(
David Shrewsburyd664da42017-09-08 14:45:20 -0400251 listen_address='::', listen_port=9000,
Monty Taylor4a781a72017-07-25 07:28:04 -0400252 gear_server='127.0.0.1', gear_port=self.gearman_server.port,
253 static_path=tempfile.gettempdir())
Monty Taylor51139a02016-05-24 11:28:10 -0500254 loop = asyncio.new_event_loop()
255 loop.set_debug(True)
256 ws_thread = threading.Thread(target=web_server.run, args=(loop,))
257 ws_thread.start()
258 self.addCleanup(loop.close)
259 self.addCleanup(ws_thread.join)
260 self.addCleanup(web_server.stop)
261
262 # Wait until web server is started
Monty Taylor211883d2017-09-06 08:40:47 -0500263 while True:
264 try:
265 with socket.create_connection((self.host, 9000)):
266 break
267 except ConnectionRefusedError:
Monty Taylor51139a02016-05-24 11:28:10 -0500268 time.sleep(0.1)
269
270 # Start a thread with the websocket client
271 ws_client_event = threading.Event()
272 self.ws_client_results = ''
273 ws_client_thread = threading.Thread(
274 target=self.runWSClient, args=(build.uuid, ws_client_event)
275 )
276 ws_client_thread.start()
277 ws_client_event.wait()
278
279 # Allow the job to complete
280 flag_file = os.path.join(build_dir, 'test_wait')
281 open(flag_file, 'w').close()
282
283 # Wait for the websocket client to complete, which it should when
284 # it's received the full log.
285 ws_client_thread.join()
286
287 self.waitUntilSettled()
288
289 file_contents = logfile.read()
290 logfile.close()
291 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
292 self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
293 self.assertEqual(file_contents, self.ws_client_results)
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500294
295 def test_finger_gateway(self):
296 # Start the finger streamer daemon
297 streamer = zuul.lib.log_streamer.LogStreamer(
David Shrewsbury93eb56d2018-01-10 11:50:03 -0500298 self.host, 0, self.executor_server.jobdir_root)
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500299 self.addCleanup(streamer.stop)
300 finger_port = streamer.server.socket.getsockname()[1]
301
302 # Need to set the streaming port before submitting the job
303 self.executor_server.log_streaming_port = finger_port
304
305 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
306 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
307
308 # We don't have any real synchronization for the ansible jobs, so
309 # just wait until we get our running build.
310 while not len(self.builds):
311 time.sleep(0.1)
312 build = self.builds[0]
313 self.assertEqual(build.name, 'python27')
314
315 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
316 while not os.path.exists(build_dir):
317 time.sleep(0.1)
318
319 # Need to wait to make sure that jobdir gets set
320 while build.jobdir is None:
321 time.sleep(0.1)
322 build = self.builds[0]
323
324 # Wait for the job to begin running and create the ansible log file.
325 # The job waits to complete until the flag file exists, so we can
326 # safely access the log here. We only open it (to force a file handle
327 # to be kept open for it after the job finishes) but wait to read the
328 # contents until the job is done.
329 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
330 while not os.path.exists(ansible_log):
331 time.sleep(0.1)
332 logfile = open(ansible_log, 'r')
333 self.addCleanup(logfile.close)
334
335 # Start the finger gateway daemon
336 gateway = zuul.lib.fingergw.FingerGateway(
337 ('127.0.0.1', self.gearman_server.port, None, None, None),
338 (self.host, 0),
339 user=None,
340 command_socket=None,
341 pid_file=None
342 )
343 gateway.start()
344 self.addCleanup(gateway.stop)
345
346 gateway_port = gateway.server.socket.getsockname()[1]
347 gateway_address = (self.host, gateway_port)
348
349 # Start a thread with the finger client
350 finger_client_event = threading.Event()
351 self.finger_client_results = ''
352 finger_client_thread = threading.Thread(
353 target=self.runFingerClient,
354 args=(build.uuid, gateway_address, finger_client_event)
355 )
356 finger_client_thread.start()
357 finger_client_event.wait()
358
359 # Allow the job to complete
360 flag_file = os.path.join(build_dir, 'test_wait')
361 open(flag_file, 'w').close()
362
363 # Wait for the finger client to complete, which it should when
364 # it's received the full log.
365 finger_client_thread.join()
366
367 self.waitUntilSettled()
368
369 file_contents = logfile.read()
370 logfile.close()
371 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
372 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
373 self.assertEqual(file_contents, self.streaming_data)