blob: 59dd8b01694349be7afb6a8a8a32ebab8f3ce443 [file] [log] [blame]
David Shrewsburyeb856472017-04-13 14:23:04 -04001#!/usr/bin/env python
2
3# Copyright 2017 Red Hat, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Monty Taylor51139a02016-05-24 11:28:10 -050017import aiohttp
18import asyncio
19import logging
20import json
David Shrewsbury21454182017-06-05 14:15:18 -040021import os
22import os.path
David Shrewsburyeb856472017-04-13 14:23:04 -040023import socket
24import tempfile
Monty Taylor211883d2017-09-06 08:40:47 -050025import testtools
David Shrewsbury21454182017-06-05 14:15:18 -040026import threading
27import time
David Shrewsburyeb856472017-04-13 14:23:04 -040028
Monty Taylor51139a02016-05-24 11:28:10 -050029import zuul.web
David Shrewsburyeb856472017-04-13 14:23:04 -040030import zuul.lib.log_streamer
David Shrewsburyfe1f1942017-12-04 13:57:46 -050031import zuul.lib.fingergw
David Shrewsburyeb856472017-04-13 14:23:04 -040032import tests.base
33
34
35class TestLogStreamer(tests.base.BaseTestCase):
36
David Shrewsburyeb856472017-04-13 14:23:04 -040037 def setUp(self):
38 super(TestLogStreamer, self).setUp()
David Shrewsburyd664da42017-09-08 14:45:20 -040039 self.host = '::'
David Shrewsburyeb856472017-04-13 14:23:04 -040040
41 def startStreamer(self, port, root=None):
42 if not root:
43 root = tempfile.gettempdir()
44 return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
45
46 def test_start_stop(self):
47 port = 7900
48 streamer = self.startStreamer(port)
49 self.addCleanup(streamer.stop)
50
Monty Taylor211883d2017-09-06 08:40:47 -050051 s = socket.create_connection((self.host, port))
David Shrewsburyeb856472017-04-13 14:23:04 -040052 s.close()
53
54 streamer.stop()
55
Monty Taylor211883d2017-09-06 08:40:47 -050056 with testtools.ExpectedException(ConnectionRefusedError):
57 s = socket.create_connection((self.host, port))
David Shrewsburyeb856472017-04-13 14:23:04 -040058 s.close()
David Shrewsbury21454182017-06-05 14:15:18 -040059
60
61class TestStreaming(tests.base.AnsibleZuulTestCase):
62
63 tenant_config_file = 'config/streamer/main.yaml'
David Shrewsburyfe1f1942017-12-04 13:57:46 -050064 log = logging.getLogger("zuul.test_streaming")
David Shrewsbury21454182017-06-05 14:15:18 -040065
66 def setUp(self):
67 super(TestStreaming, self).setUp()
David Shrewsburyd664da42017-09-08 14:45:20 -040068 self.host = '::'
David Shrewsbury21454182017-06-05 14:15:18 -040069 self.streamer = None
70 self.stop_streamer = False
71 self.streaming_data = ''
David Shrewsburyf8c73c32017-06-19 15:41:22 -040072 self.test_streaming_event = threading.Event()
David Shrewsbury21454182017-06-05 14:15:18 -040073
74 def stopStreamer(self):
75 self.stop_streamer = True
76
77 def startStreamer(self, port, build_uuid, root=None):
78 if not root:
79 root = tempfile.gettempdir()
80 self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
81 port, root)
Monty Taylor211883d2017-09-06 08:40:47 -050082 s = socket.create_connection((self.host, port))
David Shrewsbury21454182017-06-05 14:15:18 -040083 self.addCleanup(s.close)
84
David Shrewsbury79a66dd2018-01-03 12:08:30 -050085 req = '%s\r\n' % build_uuid
David Shrewsbury21454182017-06-05 14:15:18 -040086 s.sendall(req.encode('utf-8'))
David Shrewsburyf8c73c32017-06-19 15:41:22 -040087 self.test_streaming_event.set()
David Shrewsbury21454182017-06-05 14:15:18 -040088
89 while not self.stop_streamer:
90 data = s.recv(2048)
91 if not data:
92 break
93 self.streaming_data += data.decode('utf-8')
94
95 s.shutdown(socket.SHUT_RDWR)
96 s.close()
97 self.streamer.stop()
98
99 def test_streaming(self):
100 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
101 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
102
103 # We don't have any real synchronization for the ansible jobs, so
104 # just wait until we get our running build.
105 while not len(self.builds):
106 time.sleep(0.1)
107 build = self.builds[0]
108 self.assertEqual(build.name, 'python27')
109
110 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
111 while not os.path.exists(build_dir):
112 time.sleep(0.1)
113
114 # Need to wait to make sure that jobdir gets set
115 while build.jobdir is None:
116 time.sleep(0.1)
117 build = self.builds[0]
118
119 # Wait for the job to begin running and create the ansible log file.
120 # The job waits to complete until the flag file exists, so we can
121 # safely access the log here. We only open it (to force a file handle
122 # to be kept open for it after the job finishes) but wait to read the
123 # contents until the job is done.
124 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
125 while not os.path.exists(ansible_log):
126 time.sleep(0.1)
127 logfile = open(ansible_log, 'r')
128 self.addCleanup(logfile.close)
129
130 # Create a thread to stream the log. We need this to be happening
131 # before we create the flag file to tell the job to complete.
132 port = 7901
133 streamer_thread = threading.Thread(
134 target=self.startStreamer,
135 args=(port, build.uuid, self.executor_server.jobdir_root,)
136 )
137 streamer_thread.start()
138 self.addCleanup(self.stopStreamer)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400139 self.test_streaming_event.wait()
David Shrewsbury21454182017-06-05 14:15:18 -0400140
141 # Allow the job to complete, which should close the streaming
142 # connection (and terminate the thread) as well since the log file
143 # gets closed/deleted.
144 flag_file = os.path.join(build_dir, 'test_wait')
145 open(flag_file, 'w').close()
146 self.waitUntilSettled()
147 streamer_thread.join()
148
149 # Now that the job is finished, the log file has been closed by the
150 # job and deleted. However, we still have a file handle to it, so we
151 # can make sure that we read the entire contents at this point.
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400152 # Compact the returned lines into a single string for easy comparison.
Monty Taylor51139a02016-05-24 11:28:10 -0500153 file_contents = logfile.read()
David Shrewsbury21454182017-06-05 14:15:18 -0400154 logfile.close()
155
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400156 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
David Shrewsbury21454182017-06-05 14:15:18 -0400157 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400158 self.assertEqual(file_contents, self.streaming_data)
Monty Taylor51139a02016-05-24 11:28:10 -0500159
160 def runWSClient(self, build_uuid, event):
161 async def client(loop, build_uuid, event):
Tristan Cacqueraya4996f12017-09-20 01:00:28 +0000162 uri = 'http://[::1]:9000/tenant-one/console-stream'
Monty Taylor51139a02016-05-24 11:28:10 -0500163 try:
164 session = aiohttp.ClientSession(loop=loop)
165 async with session.ws_connect(uri) as ws:
166 req = {'uuid': build_uuid, 'logfile': None}
167 ws.send_str(json.dumps(req))
168 event.set() # notify we are connected and req sent
169 async for msg in ws:
170 if msg.type == aiohttp.WSMsgType.TEXT:
171 self.ws_client_results += msg.data
172 elif msg.type == aiohttp.WSMsgType.CLOSED:
173 break
174 elif msg.type == aiohttp.WSMsgType.ERROR:
175 break
176 session.close()
177 except Exception as e:
178 self.log.exception("client exception:")
179
180 loop = asyncio.new_event_loop()
181 loop.set_debug(True)
182 loop.run_until_complete(client(loop, build_uuid, event))
183 loop.close()
184
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500185 def runFingerClient(self, build_uuid, gateway_address, event):
186 # Wait until the gateway is started
187 while True:
188 try:
189 # NOTE(Shrews): This causes the gateway to begin to handle
190 # a request for which it never receives data, and thus
191 # causes the getCommand() method to timeout (seen in the
192 # test results, but is harmless).
193 with socket.create_connection(gateway_address) as s:
194 break
195 except ConnectionRefusedError:
196 time.sleep(0.1)
197
198 with socket.create_connection(gateway_address) as s:
David Shrewsbury79a66dd2018-01-03 12:08:30 -0500199 msg = "%s\r\n" % build_uuid
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500200 s.sendall(msg.encode('utf-8'))
201 event.set() # notify we are connected and req sent
202 while True:
203 data = s.recv(1024)
204 if not data:
205 break
206 self.streaming_data += data.decode('utf-8')
207 s.shutdown(socket.SHUT_RDWR)
208
Monty Taylor51139a02016-05-24 11:28:10 -0500209 def test_websocket_streaming(self):
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500210 # Start the finger streamer daemon
211 streamer = zuul.lib.log_streamer.LogStreamer(
212 None, self.host, 0, self.executor_server.jobdir_root)
213 self.addCleanup(streamer.stop)
214
Monty Taylor51139a02016-05-24 11:28:10 -0500215 # Need to set the streaming port before submitting the job
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500216 finger_port = streamer.server.socket.getsockname()[1]
Monty Taylor51139a02016-05-24 11:28:10 -0500217 self.executor_server.log_streaming_port = finger_port
218
219 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
220 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
221
222 # We don't have any real synchronization for the ansible jobs, so
223 # just wait until we get our running build.
224 while not len(self.builds):
225 time.sleep(0.1)
226 build = self.builds[0]
227 self.assertEqual(build.name, 'python27')
228
229 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
230 while not os.path.exists(build_dir):
231 time.sleep(0.1)
232
233 # Need to wait to make sure that jobdir gets set
234 while build.jobdir is None:
235 time.sleep(0.1)
236 build = self.builds[0]
237
238 # Wait for the job to begin running and create the ansible log file.
239 # The job waits to complete until the flag file exists, so we can
240 # safely access the log here. We only open it (to force a file handle
241 # to be kept open for it after the job finishes) but wait to read the
242 # contents until the job is done.
243 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
244 while not os.path.exists(ansible_log):
245 time.sleep(0.1)
246 logfile = open(ansible_log, 'r')
247 self.addCleanup(logfile.close)
248
Monty Taylor51139a02016-05-24 11:28:10 -0500249 # Start the web server
250 web_server = zuul.web.ZuulWeb(
David Shrewsburyd664da42017-09-08 14:45:20 -0400251 listen_address='::', listen_port=9000,
Monty Taylor51139a02016-05-24 11:28:10 -0500252 gear_server='127.0.0.1', gear_port=self.gearman_server.port)
253 loop = asyncio.new_event_loop()
254 loop.set_debug(True)
255 ws_thread = threading.Thread(target=web_server.run, args=(loop,))
256 ws_thread.start()
257 self.addCleanup(loop.close)
258 self.addCleanup(ws_thread.join)
259 self.addCleanup(web_server.stop)
260
261 # Wait until web server is started
Monty Taylor211883d2017-09-06 08:40:47 -0500262 while True:
263 try:
264 with socket.create_connection((self.host, 9000)):
265 break
266 except ConnectionRefusedError:
Monty Taylor51139a02016-05-24 11:28:10 -0500267 time.sleep(0.1)
268
269 # Start a thread with the websocket client
270 ws_client_event = threading.Event()
271 self.ws_client_results = ''
272 ws_client_thread = threading.Thread(
273 target=self.runWSClient, args=(build.uuid, ws_client_event)
274 )
275 ws_client_thread.start()
276 ws_client_event.wait()
277
278 # Allow the job to complete
279 flag_file = os.path.join(build_dir, 'test_wait')
280 open(flag_file, 'w').close()
281
282 # Wait for the websocket client to complete, which it should when
283 # it's received the full log.
284 ws_client_thread.join()
285
286 self.waitUntilSettled()
287
288 file_contents = logfile.read()
289 logfile.close()
290 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
291 self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
292 self.assertEqual(file_contents, self.ws_client_results)
David Shrewsburyfe1f1942017-12-04 13:57:46 -0500293
294 def test_finger_gateway(self):
295 # Start the finger streamer daemon
296 streamer = zuul.lib.log_streamer.LogStreamer(
297 None, self.host, 0, self.executor_server.jobdir_root)
298 self.addCleanup(streamer.stop)
299 finger_port = streamer.server.socket.getsockname()[1]
300
301 # Need to set the streaming port before submitting the job
302 self.executor_server.log_streaming_port = finger_port
303
304 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
305 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
306
307 # We don't have any real synchronization for the ansible jobs, so
308 # just wait until we get our running build.
309 while not len(self.builds):
310 time.sleep(0.1)
311 build = self.builds[0]
312 self.assertEqual(build.name, 'python27')
313
314 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
315 while not os.path.exists(build_dir):
316 time.sleep(0.1)
317
318 # Need to wait to make sure that jobdir gets set
319 while build.jobdir is None:
320 time.sleep(0.1)
321 build = self.builds[0]
322
323 # Wait for the job to begin running and create the ansible log file.
324 # The job waits to complete until the flag file exists, so we can
325 # safely access the log here. We only open it (to force a file handle
326 # to be kept open for it after the job finishes) but wait to read the
327 # contents until the job is done.
328 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
329 while not os.path.exists(ansible_log):
330 time.sleep(0.1)
331 logfile = open(ansible_log, 'r')
332 self.addCleanup(logfile.close)
333
334 # Start the finger gateway daemon
335 gateway = zuul.lib.fingergw.FingerGateway(
336 ('127.0.0.1', self.gearman_server.port, None, None, None),
337 (self.host, 0),
338 user=None,
339 command_socket=None,
340 pid_file=None
341 )
342 gateway.start()
343 self.addCleanup(gateway.stop)
344
345 gateway_port = gateway.server.socket.getsockname()[1]
346 gateway_address = (self.host, gateway_port)
347
348 # Start a thread with the finger client
349 finger_client_event = threading.Event()
350 self.finger_client_results = ''
351 finger_client_thread = threading.Thread(
352 target=self.runFingerClient,
353 args=(build.uuid, gateway_address, finger_client_event)
354 )
355 finger_client_thread.start()
356 finger_client_event.wait()
357
358 # Allow the job to complete
359 flag_file = os.path.join(build_dir, 'test_wait')
360 open(flag_file, 'w').close()
361
362 # Wait for the finger client to complete, which it should when
363 # it's received the full log.
364 finger_client_thread.join()
365
366 self.waitUntilSettled()
367
368 file_contents = logfile.read()
369 logfile.close()
370 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
371 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
372 self.assertEqual(file_contents, self.streaming_data)