blob: f422e97b2674c86c0b897ab470dcd824339fd234 [file] [log] [blame]
David Shrewsburyeb856472017-04-13 14:23:04 -04001#!/usr/bin/env python
2
3# Copyright 2017 Red Hat, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
David Shrewsbury21454182017-06-05 14:15:18 -040017import os
18import os.path
David Shrewsburyeb856472017-04-13 14:23:04 -040019import socket
20import tempfile
David Shrewsbury21454182017-06-05 14:15:18 -040021import threading
22import time
David Shrewsburyeb856472017-04-13 14:23:04 -040023
24import zuul.lib.log_streamer
25import tests.base
26
27
28class TestLogStreamer(tests.base.BaseTestCase):
29
David Shrewsburyeb856472017-04-13 14:23:04 -040030 def setUp(self):
31 super(TestLogStreamer, self).setUp()
32 self.host = '0.0.0.0'
33
34 def startStreamer(self, port, root=None):
35 if not root:
36 root = tempfile.gettempdir()
37 return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
38
39 def test_start_stop(self):
40 port = 7900
41 streamer = self.startStreamer(port)
42 self.addCleanup(streamer.stop)
43
44 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 self.addCleanup(s.close)
46 self.assertEqual(0, s.connect_ex((self.host, port)))
47 s.close()
48
49 streamer.stop()
50
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 self.addCleanup(s.close)
53 self.assertNotEqual(0, s.connect_ex((self.host, port)))
54 s.close()
David Shrewsbury21454182017-06-05 14:15:18 -040055
56
57class TestStreaming(tests.base.AnsibleZuulTestCase):
58
59 tenant_config_file = 'config/streamer/main.yaml'
60
61 def setUp(self):
62 super(TestStreaming, self).setUp()
63 self.host = '0.0.0.0'
64 self.streamer = None
65 self.stop_streamer = False
66 self.streaming_data = ''
67
68 def stopStreamer(self):
69 self.stop_streamer = True
70
71 def startStreamer(self, port, build_uuid, root=None):
72 if not root:
73 root = tempfile.gettempdir()
74 self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host,
75 port, root)
76 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77 s.connect((self.host, port))
78 self.addCleanup(s.close)
79
80 req = '%s\n' % build_uuid
81 s.sendall(req.encode('utf-8'))
82
83 while not self.stop_streamer:
84 data = s.recv(2048)
85 if not data:
86 break
87 self.streaming_data += data.decode('utf-8')
88
89 s.shutdown(socket.SHUT_RDWR)
90 s.close()
91 self.streamer.stop()
92
93 def test_streaming(self):
94 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
95 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
96
97 # We don't have any real synchronization for the ansible jobs, so
98 # just wait until we get our running build.
99 while not len(self.builds):
100 time.sleep(0.1)
101 build = self.builds[0]
102 self.assertEqual(build.name, 'python27')
103
104 build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
105 while not os.path.exists(build_dir):
106 time.sleep(0.1)
107
108 # Need to wait to make sure that jobdir gets set
109 while build.jobdir is None:
110 time.sleep(0.1)
111 build = self.builds[0]
112
113 # Wait for the job to begin running and create the ansible log file.
114 # The job waits to complete until the flag file exists, so we can
115 # safely access the log here. We only open it (to force a file handle
116 # to be kept open for it after the job finishes) but wait to read the
117 # contents until the job is done.
118 ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
119 while not os.path.exists(ansible_log):
120 time.sleep(0.1)
121 logfile = open(ansible_log, 'r')
122 self.addCleanup(logfile.close)
123
124 # Create a thread to stream the log. We need this to be happening
125 # before we create the flag file to tell the job to complete.
126 port = 7901
127 streamer_thread = threading.Thread(
128 target=self.startStreamer,
129 args=(port, build.uuid, self.executor_server.jobdir_root,)
130 )
131 streamer_thread.start()
132 self.addCleanup(self.stopStreamer)
133
134 # Allow the job to complete, which should close the streaming
135 # connection (and terminate the thread) as well since the log file
136 # gets closed/deleted.
137 flag_file = os.path.join(build_dir, 'test_wait')
138 open(flag_file, 'w').close()
139 self.waitUntilSettled()
140 streamer_thread.join()
141
142 # Now that the job is finished, the log file has been closed by the
143 # job and deleted. However, we still have a file handle to it, so we
144 # can make sure that we read the entire contents at this point.
145 file_contents = logfile.readlines()
146 logfile.close()
147
148 # Compact the returned lines into a single string for easy comparison.
149 orig = ''.join(file_contents)
150 self.log.debug("\n\nFile contents: %s\n\n", orig)
151 self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
152 self.assertEqual(orig, self.streaming_data)