blob: f47a8c87f8e94176ad280c88a57f9541d1b0d377 [file] [log] [blame]
David Shrewsburyeb856472017-04-13 14:23:04 -04001#!/usr/bin/env python
2
3# Copyright 2017 Red Hat, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Monty Taylor51139a02016-05-24 11:28:10 -050017import aiohttp
18import asyncio
19import logging
20import json
David Shrewsbury21454182017-06-05 14:15:18 -040021import os
22import os.path
David Shrewsburyeb856472017-04-13 14:23:04 -040023import socket
24import tempfile
David Shrewsbury21454182017-06-05 14:15:18 -040025import threading
26import time
David Shrewsburyeb856472017-04-13 14:23:04 -040027
Monty Taylor51139a02016-05-24 11:28:10 -050028import zuul.web
David Shrewsburyeb856472017-04-13 14:23:04 -040029import zuul.lib.log_streamer
30import tests.base
31
32
33class TestLogStreamer(tests.base.BaseTestCase):
34
David Shrewsburyeb856472017-04-13 14:23:04 -040035 def setUp(self):
36 super(TestLogStreamer, self).setUp()
37 self.host = '0.0.0.0'
38
39 def startStreamer(self, port, root=None):
40 if not root:
41 root = tempfile.gettempdir()
42 return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
43
44 def test_start_stop(self):
45 port = 7900
46 streamer = self.startStreamer(port)
47 self.addCleanup(streamer.stop)
48
49 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.addCleanup(s.close)
51 self.assertEqual(0, s.connect_ex((self.host, port)))
52 s.close()
53
54 streamer.stop()
55
56 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57 self.addCleanup(s.close)
58 self.assertNotEqual(0, s.connect_ex((self.host, port)))
59 s.close()
David Shrewsbury21454182017-06-05 14:15:18 -040060
61
62class TestStreaming(tests.base.AnsibleZuulTestCase):
63
64 tenant_config_file = 'config/streamer/main.yaml'
Monty Taylor51139a02016-05-24 11:28:10 -050065 log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
David Shrewsbury21454182017-06-05 14:15:18 -040066
67 def setUp(self):
68 super(TestStreaming, self).setUp()
69 self.host = '0.0.0.0'
70 self.streamer = None
71 self.stop_streamer = False
72 self.streaming_data = ''
David Shrewsburyf8c73c32017-06-19 15:41:22 -040073 self.test_streaming_event = threading.Event()
David Shrewsbury21454182017-06-05 14:15:18 -040074
75 def stopStreamer(self):
76 self.stop_streamer = True
77
78 def startStreamer(self, port, build_uuid, root=None):
79 if not root:
80 root = tempfile.gettempdir()
81 self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
82 port, root)
83 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
84 s.connect((self.host, port))
85 self.addCleanup(s.close)
86
87 req = '%s\n' % build_uuid
88 s.sendall(req.encode('utf-8'))
David Shrewsburyf8c73c32017-06-19 15:41:22 -040089 self.test_streaming_event.set()
David Shrewsbury21454182017-06-05 14:15:18 -040090
91 while not self.stop_streamer:
92 data = s.recv(2048)
93 if not data:
94 break
95 self.streaming_data += data.decode('utf-8')
96
97 s.shutdown(socket.SHUT_RDWR)
98 s.close()
99 self.streamer.stop()
100
101 def test_streaming(self):
102 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
103 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
104
105 # We don't have any real synchronization for the ansible jobs, so
106 # just wait until we get our running build.
107 while not len(self.builds):
108 time.sleep(0.1)
109 build = self.builds[0]
110 self.assertEqual(build.name, 'python27')
111
112 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
113 while not os.path.exists(build_dir):
114 time.sleep(0.1)
115
116 # Need to wait to make sure that jobdir gets set
117 while build.jobdir is None:
118 time.sleep(0.1)
119 build = self.builds[0]
120
121 # Wait for the job to begin running and create the ansible log file.
122 # The job waits to complete until the flag file exists, so we can
123 # safely access the log here. We only open it (to force a file handle
124 # to be kept open for it after the job finishes) but wait to read the
125 # contents until the job is done.
126 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
127 while not os.path.exists(ansible_log):
128 time.sleep(0.1)
129 logfile = open(ansible_log, 'r')
130 self.addCleanup(logfile.close)
131
132 # Create a thread to stream the log. We need this to be happening
133 # before we create the flag file to tell the job to complete.
134 port = 7901
135 streamer_thread = threading.Thread(
136 target=self.startStreamer,
137 args=(port, build.uuid, self.executor_server.jobdir_root,)
138 )
139 streamer_thread.start()
140 self.addCleanup(self.stopStreamer)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400141 self.test_streaming_event.wait()
David Shrewsbury21454182017-06-05 14:15:18 -0400142
143 # Allow the job to complete, which should close the streaming
144 # connection (and terminate the thread) as well since the log file
145 # gets closed/deleted.
146 flag_file = os.path.join(build_dir, 'test_wait')
147 open(flag_file, 'w').close()
148 self.waitUntilSettled()
149 streamer_thread.join()
150
151 # Now that the job is finished, the log file has been closed by the
152 # job and deleted. However, we still have a file handle to it, so we
153 # can make sure that we read the entire contents at this point.
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400154 # Compact the returned lines into a single string for easy comparison.
Monty Taylor51139a02016-05-24 11:28:10 -0500155 file_contents = logfile.read()
David Shrewsbury21454182017-06-05 14:15:18 -0400156 logfile.close()
157
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400158 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
David Shrewsbury21454182017-06-05 14:15:18 -0400159 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400160 self.assertEqual(file_contents, self.streaming_data)
Monty Taylor51139a02016-05-24 11:28:10 -0500161
162 def runWSClient(self, build_uuid, event):
163 async def client(loop, build_uuid, event):
164 uri = 'http://127.0.0.1:9000/console-stream'
165 try:
166 session = aiohttp.ClientSession(loop=loop)
167 async with session.ws_connect(uri) as ws:
168 req = {'uuid': build_uuid, 'logfile': None}
169 ws.send_str(json.dumps(req))
170 event.set() # notify we are connected and req sent
171 async for msg in ws:
172 if msg.type == aiohttp.WSMsgType.TEXT:
173 self.ws_client_results += msg.data
174 elif msg.type == aiohttp.WSMsgType.CLOSED:
175 break
176 elif msg.type == aiohttp.WSMsgType.ERROR:
177 break
178 session.close()
179 except Exception as e:
180 self.log.exception("client exception:")
181
182 loop = asyncio.new_event_loop()
183 loop.set_debug(True)
184 loop.run_until_complete(client(loop, build_uuid, event))
185 loop.close()
186
187 def test_websocket_streaming(self):
188 # Need to set the streaming port before submitting the job
189 finger_port = 7902
190 self.executor_server.log_streaming_port = finger_port
191
192 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
193 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
194
195 # We don't have any real synchronization for the ansible jobs, so
196 # just wait until we get our running build.
197 while not len(self.builds):
198 time.sleep(0.1)
199 build = self.builds[0]
200 self.assertEqual(build.name, 'python27')
201
202 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
203 while not os.path.exists(build_dir):
204 time.sleep(0.1)
205
206 # Need to wait to make sure that jobdir gets set
207 while build.jobdir is None:
208 time.sleep(0.1)
209 build = self.builds[0]
210
211 # Wait for the job to begin running and create the ansible log file.
212 # The job waits to complete until the flag file exists, so we can
213 # safely access the log here. We only open it (to force a file handle
214 # to be kept open for it after the job finishes) but wait to read the
215 # contents until the job is done.
216 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
217 while not os.path.exists(ansible_log):
218 time.sleep(0.1)
219 logfile = open(ansible_log, 'r')
220 self.addCleanup(logfile.close)
221
222 # Start the finger streamer daemon
223 streamer = zuul.lib.log_streamer.LogStreamer(
224 None, self.host, finger_port, self.executor_server.jobdir_root)
225 self.addCleanup(streamer.stop)
226
227 # Start the web server
228 web_server = zuul.web.ZuulWeb(
229 listen_address='127.0.0.1', listen_port=9000,
230 gear_server='127.0.0.1', gear_port=self.gearman_server.port)
231 loop = asyncio.new_event_loop()
232 loop.set_debug(True)
233 ws_thread = threading.Thread(target=web_server.run, args=(loop,))
234 ws_thread.start()
235 self.addCleanup(loop.close)
236 self.addCleanup(ws_thread.join)
237 self.addCleanup(web_server.stop)
238
239 # Wait until web server is started
240 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
241 while s.connect_ex((self.host, 9000)):
242 time.sleep(0.1)
243
244 # Start a thread with the websocket client
245 ws_client_event = threading.Event()
246 self.ws_client_results = ''
247 ws_client_thread = threading.Thread(
248 target=self.runWSClient, args=(build.uuid, ws_client_event)
249 )
250 ws_client_thread.start()
251 ws_client_event.wait()
252
253 # Allow the job to complete
254 flag_file = os.path.join(build_dir, 'test_wait')
255 open(flag_file, 'w').close()
256
257 # Wait for the websocket client to complete, which it should when
258 # it's received the full log.
259 ws_client_thread.join()
260
261 self.waitUntilSettled()
262
263 file_contents = logfile.read()
264 logfile.close()
265 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
266 self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
267 self.assertEqual(file_contents, self.ws_client_results)