blob: b0ef2c212a0f1d54bd7520cddd5d27af8e7ad30c [file] [log] [blame]
David Shrewsburyeb856472017-04-13 14:23:04 -04001#!/usr/bin/env python
2
3# Copyright 2017 Red Hat, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
David Shrewsbury21454182017-06-05 14:15:18 -040017import os
18import os.path
David Shrewsburyeb856472017-04-13 14:23:04 -040019import socket
20import tempfile
David Shrewsbury21454182017-06-05 14:15:18 -040021import threading
22import time
David Shrewsburyeb856472017-04-13 14:23:04 -040023
24import zuul.lib.log_streamer
25import tests.base
26
27
28class TestLogStreamer(tests.base.BaseTestCase):
29
David Shrewsburyeb856472017-04-13 14:23:04 -040030 def setUp(self):
31 super(TestLogStreamer, self).setUp()
32 self.host = '0.0.0.0'
33
34 def startStreamer(self, port, root=None):
35 if not root:
36 root = tempfile.gettempdir()
37 return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
38
39 def test_start_stop(self):
40 port = 7900
41 streamer = self.startStreamer(port)
42 self.addCleanup(streamer.stop)
43
44 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 self.addCleanup(s.close)
46 self.assertEqual(0, s.connect_ex((self.host, port)))
47 s.close()
48
49 streamer.stop()
50
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 self.addCleanup(s.close)
53 self.assertNotEqual(0, s.connect_ex((self.host, port)))
54 s.close()
David Shrewsbury21454182017-06-05 14:15:18 -040055
56
57class TestStreaming(tests.base.AnsibleZuulTestCase):
58
59 tenant_config_file = 'config/streamer/main.yaml'
60
61 def setUp(self):
62 super(TestStreaming, self).setUp()
63 self.host = '0.0.0.0'
64 self.streamer = None
65 self.stop_streamer = False
66 self.streaming_data = ''
David Shrewsburyf8c73c32017-06-19 15:41:22 -040067 self.test_streaming_event = threading.Event()
David Shrewsbury21454182017-06-05 14:15:18 -040068
69 def stopStreamer(self):
70 self.stop_streamer = True
71
72 def startStreamer(self, port, build_uuid, root=None):
73 if not root:
74 root = tempfile.gettempdir()
75 self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
76 port, root)
77 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
78 s.connect((self.host, port))
79 self.addCleanup(s.close)
80
81 req = '%s\n' % build_uuid
82 s.sendall(req.encode('utf-8'))
David Shrewsburyf8c73c32017-06-19 15:41:22 -040083 self.test_streaming_event.set()
David Shrewsbury21454182017-06-05 14:15:18 -040084
85 while not self.stop_streamer:
86 data = s.recv(2048)
87 if not data:
88 break
89 self.streaming_data += data.decode('utf-8')
90
91 s.shutdown(socket.SHUT_RDWR)
92 s.close()
93 self.streamer.stop()
94
95 def test_streaming(self):
96 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
97 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
98
99 # We don't have any real synchronization for the ansible jobs, so
100 # just wait until we get our running build.
101 while not len(self.builds):
102 time.sleep(0.1)
103 build = self.builds[0]
104 self.assertEqual(build.name, 'python27')
105
106 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
107 while not os.path.exists(build_dir):
108 time.sleep(0.1)
109
110 # Need to wait to make sure that jobdir gets set
111 while build.jobdir is None:
112 time.sleep(0.1)
113 build = self.builds[0]
114
115 # Wait for the job to begin running and create the ansible log file.
116 # The job waits to complete until the flag file exists, so we can
117 # safely access the log here. We only open it (to force a file handle
118 # to be kept open for it after the job finishes) but wait to read the
119 # contents until the job is done.
120 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
121 while not os.path.exists(ansible_log):
122 time.sleep(0.1)
123 logfile = open(ansible_log, 'r')
124 self.addCleanup(logfile.close)
125
126 # Create a thread to stream the log. We need this to be happening
127 # before we create the flag file to tell the job to complete.
128 port = 7901
129 streamer_thread = threading.Thread(
130 target=self.startStreamer,
131 args=(port, build.uuid, self.executor_server.jobdir_root,)
132 )
133 streamer_thread.start()
134 self.addCleanup(self.stopStreamer)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400135 self.test_streaming_event.wait()
David Shrewsbury21454182017-06-05 14:15:18 -0400136
137 # Allow the job to complete, which should close the streaming
138 # connection (and terminate the thread) as well since the log file
139 # gets closed/deleted.
140 flag_file = os.path.join(build_dir, 'test_wait')
141 open(flag_file, 'w').close()
142 self.waitUntilSettled()
143 streamer_thread.join()
144
145 # Now that the job is finished, the log file has been closed by the
146 # job and deleted. However, we still have a file handle to it, so we
147 # can make sure that we read the entire contents at this point.
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400148 # Compact the returned lines into a single string for easy comparison.
149 file_contents = ''.join(logfile.readlines())
David Shrewsbury21454182017-06-05 14:15:18 -0400150 logfile.close()
151
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400152 self.log.debug("\n\nFile contents: %s\n\n", file_contents)
David Shrewsbury21454182017-06-05 14:15:18 -0400153 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
David Shrewsburyf8c73c32017-06-19 15:41:22 -0400154 self.assertEqual(file_contents, self.streaming_data)