blob: 03d3563d06eeadd827c83b009633b45aac87ebdd [file] [log] [blame]
David Shrewsburyeb856472017-04-13 14:23:04 -04001#!/usr/bin/env python
2
3# Copyright 2017 Red Hat, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Monty Taylor51139a02016-05-24 11:28:10 -050017import aiohttp
18import asyncio
19import logging
20import json
David Shrewsbury21454182017-06-05 14:15:18 -040021import os
22import os.path
David Shrewsburyeb856472017-04-13 14:23:04 -040023import socket
24import tempfile
Monty Taylor211883d2017-09-06 08:40:47 -050025import testtools
David Shrewsbury21454182017-06-05 14:15:18 -040026import threading
27import time
David Shrewsburyeb856472017-04-13 14:23:04 -040028
Monty Taylor51139a02016-05-24 11:28:10 -050029import zuul.web
David Shrewsburyeb856472017-04-13 14:23:04 -040030import zuul.lib.log_streamer
31import tests.base
32
33
34class TestLogStreamer(tests.base.BaseTestCase):
35
David Shrewsburyeb856472017-04-13 14:23:04 -040036 def setUp(self):
37 super(TestLogStreamer, self).setUp()
38 self.host = '0.0.0.0'
39
40 def startStreamer(self, port, root=None):
41 if not root:
42 root = tempfile.gettempdir()
43 return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
44
45 def test_start_stop(self):
46 port = 7900
47 streamer = self.startStreamer(port)
48 self.addCleanup(streamer.stop)
49
Monty Taylor211883d2017-09-06 08:40:47 -050050 s = socket.create_connection((self.host, port))
David Shrewsburyeb856472017-04-13 14:23:04 -040051 s.close()
52
53 streamer.stop()
54
Monty Taylor211883d2017-09-06 08:40:47 -050055 with testtools.ExpectedException(ConnectionRefusedError):
56 s = socket.create_connection((self.host, port))
David Shrewsburyeb856472017-04-13 14:23:04 -040057 s.close()
David Shrewsbury21454182017-06-05 14:15:18 -040058
59
60class TestStreaming(tests.base.AnsibleZuulTestCase):
61
62 tenant_config_file = 'config/streamer/main.yaml'
Monty Taylor51139a02016-05-24 11:28:10 -050063 log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
David Shrewsbury21454182017-06-05 14:15:18 -040064
65 def setUp(self):
66 super(TestStreaming, self).setUp()
67 self.host = '0.0.0.0'
68 self.streamer = None
69 self.stop_streamer = False
70 self.streaming_data = ''
David Shrewsburyf8c73c32017-06-19 15:41:22 -040071 self.test_streaming_event = threading.Event()
David Shrewsbury21454182017-06-05 14:15:18 -040072
73 def stopStreamer(self):
74 self.stop_streamer = True
75
76 def startStreamer(self, port, build_uuid, root=None):
77 if not root:
78 root = tempfile.gettempdir()
79 self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
80 port, root)
Monty Taylor211883d2017-09-06 08:40:47 -050081 s = socket.create_connection((self.host, port))
David Shrewsbury21454182017-06-05 14:15:18 -040082 self.addCleanup(s.close)
83
84 req = '%s\n' % build_uuid
85 s.sendall(req.encode('utf-8'))
David Shrewsburyf8c73c32017-06-19 15:41:22 -040086 self.test_streaming_event.set()
David Shrewsbury21454182017-06-05 14:15:18 -040087
88 while not self.stop_streamer:
89 data = s.recv(2048)
90 if not data:
91 break
92 self.streaming_data += data.decode('utf-8')
93
94 s.shutdown(socket.SHUT_RDWR)
95 s.close()
96 self.streamer.stop()
97
98 def test_streaming(self):
99 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
100 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
101
102 # We don't have any real synchronization for the ansible jobs, so
103 # just wait until we get our running build.
104 while not len(self.builds):
105 time.sleep(0.1)
106 build = self.builds[0]
107 self.assertEqual(build.name, 'python27')
108
109 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
110 while not os.path.exists(build_dir):
111 time.sleep(0.1)
112
113 # Need to wait to make sure that jobdir gets set
114 while build.jobdir is None:
115 time.sleep(0.1)
116 build = self.builds[0]
117
118 # Wait for the job to begin running and create the ansible log file.
119 # The job waits to complete until the flag file exists, so we can
120 # safely access the log here. We only open it (to force a file handle
121 # to be kept open for it after the job finishes) but wait to read the
122 # contents until the job is done.
123 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
124 while not os.path.exists(ansible_log):
125 time.sleep(0.1)
126 logfile = open(ansible_log, 'r')
127 self.addCleanup(logfile.close)
128
129 # Create a thread to stream the log. We need this to be happening
130 # before we create the flag file to tell the job to complete.
131 port = 7901
132 streamer_thread = threading.Thread(
133 target=self.startStreamer,
134 args=(port, build.uuid, self.executor_server.jobdir_root,)
135 )
136 streamer_thread.start()
137 self.addCleanup(self.stopStreamer)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400138 self.test_streaming_event.wait()
David Shrewsbury21454182017-06-05 14:15:18 -0400139
140 # Allow the job to complete, which should close the streaming
141 # connection (and terminate the thread) as well since the log file
142 # gets closed/deleted.
143 flag_file = os.path.join(build_dir, 'test_wait')
144 open(flag_file, 'w').close()
145 self.waitUntilSettled()
146 streamer_thread.join()
147
148 # Now that the job is finished, the log file has been closed by the
149 # job and deleted. However, we still have a file handle to it, so we
150 # can make sure that we read the entire contents at this point.
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400151 # Compact the returned lines into a single string for easy comparison.
Monty Taylor51139a02016-05-24 11:28:10 -0500152 file_contents = logfile.read()
David Shrewsbury21454182017-06-05 14:15:18 -0400153 logfile.close()
154
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400155 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
David Shrewsbury21454182017-06-05 14:15:18 -0400156 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400157 self.assertEqual(file_contents, self.streaming_data)
Monty Taylor51139a02016-05-24 11:28:10 -0500158
159 def runWSClient(self, build_uuid, event):
160 async def client(loop, build_uuid, event):
161 uri = 'http://127.0.0.1:9000/console-stream'
162 try:
163 session = aiohttp.ClientSession(loop=loop)
164 async with session.ws_connect(uri) as ws:
165 req = {'uuid': build_uuid, 'logfile': None}
166 ws.send_str(json.dumps(req))
167 event.set() # notify we are connected and req sent
168 async for msg in ws:
169 if msg.type == aiohttp.WSMsgType.TEXT:
170 self.ws_client_results += msg.data
171 elif msg.type == aiohttp.WSMsgType.CLOSED:
172 break
173 elif msg.type == aiohttp.WSMsgType.ERROR:
174 break
175 session.close()
176 except Exception as e:
177 self.log.exception("client exception:")
178
179 loop = asyncio.new_event_loop()
180 loop.set_debug(True)
181 loop.run_until_complete(client(loop, build_uuid, event))
182 loop.close()
183
184 def test_websocket_streaming(self):
185 # Need to set the streaming port before submitting the job
186 finger_port = 7902
187 self.executor_server.log_streaming_port = finger_port
188
189 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
190 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
191
192 # We don't have any real synchronization for the ansible jobs, so
193 # just wait until we get our running build.
194 while not len(self.builds):
195 time.sleep(0.1)
196 build = self.builds[0]
197 self.assertEqual(build.name, 'python27')
198
199 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
200 while not os.path.exists(build_dir):
201 time.sleep(0.1)
202
203 # Need to wait to make sure that jobdir gets set
204 while build.jobdir is None:
205 time.sleep(0.1)
206 build = self.builds[0]
207
208 # Wait for the job to begin running and create the ansible log file.
209 # The job waits to complete until the flag file exists, so we can
210 # safely access the log here. We only open it (to force a file handle
211 # to be kept open for it after the job finishes) but wait to read the
212 # contents until the job is done.
213 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
214 while not os.path.exists(ansible_log):
215 time.sleep(0.1)
216 logfile = open(ansible_log, 'r')
217 self.addCleanup(logfile.close)
218
219 # Start the finger streamer daemon
220 streamer = zuul.lib.log_streamer.LogStreamer(
221 None, self.host, finger_port, self.executor_server.jobdir_root)
222 self.addCleanup(streamer.stop)
223
224 # Start the web server
225 web_server = zuul.web.ZuulWeb(
226 listen_address='127.0.0.1', listen_port=9000,
227 gear_server='127.0.0.1', gear_port=self.gearman_server.port)
228 loop = asyncio.new_event_loop()
229 loop.set_debug(True)
230 ws_thread = threading.Thread(target=web_server.run, args=(loop,))
231 ws_thread.start()
232 self.addCleanup(loop.close)
233 self.addCleanup(ws_thread.join)
234 self.addCleanup(web_server.stop)
235
236 # Wait until web server is started
Monty Taylor211883d2017-09-06 08:40:47 -0500237 while True:
238 try:
239 with socket.create_connection((self.host, 9000)):
240 break
241 except ConnectionRefusedError:
Monty Taylor51139a02016-05-24 11:28:10 -0500242 time.sleep(0.1)
243
244 # Start a thread with the websocket client
245 ws_client_event = threading.Event()
246 self.ws_client_results = ''
247 ws_client_thread = threading.Thread(
248 target=self.runWSClient, args=(build.uuid, ws_client_event)
249 )
250 ws_client_thread.start()
251 ws_client_event.wait()
252
253 # Allow the job to complete
254 flag_file = os.path.join(build_dir, 'test_wait')
255 open(flag_file, 'w').close()
256
257 # Wait for the websocket client to complete, which it should when
258 # it's received the full log.
259 ws_client_thread.join()
260
261 self.waitUntilSettled()
262
263 file_contents = logfile.read()
264 logfile.close()
265 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
266 self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
267 self.assertEqual(file_contents, self.ws_client_results)