blob: ae62204a87c65e7b18ba9fd8a5288f9e5009ce82 [file] [log] [blame]
James E. Blairc4b20412016-05-27 16:45:26 -07001# Copyright 2014 OpenStack Foundation
2# Copyright 2014 Hewlett-Packard Development Company, L.P.
3# Copyright 2016 Red Hat
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17import logging
18import os
19import socket
20import threading
Clint Byrumff97edb2017-05-10 21:30:14 -070021from six.moves import queue
James E. Blairc4b20412016-05-27 16:45:26 -070022
23
24class CommandSocket(object):
25 log = logging.getLogger("zuul.CommandSocket")
26
27 def __init__(self, path):
28 self.running = False
29 self.path = path
Clint Byrumff97edb2017-05-10 21:30:14 -070030 self.queue = queue.Queue()
James E. Blairc4b20412016-05-27 16:45:26 -070031
32 def start(self):
33 self.running = True
34 if os.path.exists(self.path):
35 os.unlink(self.path)
36 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
37 self.socket.bind(self.path)
38 self.socket.listen(1)
39 self.socket_thread = threading.Thread(target=self._socketListener)
40 self.socket_thread.daemon = True
41 self.socket_thread.start()
42
43 def stop(self):
44 # First, wake up our listener thread with a connection and
45 # tell it to stop running.
46 self.running = False
47 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
48 s.connect(self.path)
Clint Byrumf322fe22017-05-10 20:53:12 -070049 s.sendall(b'_stop\n')
James E. Blairc4b20412016-05-27 16:45:26 -070050 # The command '_stop' will be ignored by our listener, so
51 # directly inject it into the queue so that consumers of this
52 # class which are waiting in .get() are awakened. They can
53 # either handle '_stop' or just ignore the unknown command and
54 # then check to see if they should continue to run before
55 # re-entering their loop.
Clint Byrumf322fe22017-05-10 20:53:12 -070056 self.queue.put(b'_stop')
James E. Blairc4b20412016-05-27 16:45:26 -070057 self.socket_thread.join()
58
59 def _socketListener(self):
60 while self.running:
61 try:
62 s, addr = self.socket.accept()
63 self.log.debug("Accepted socket connection %s" % (s,))
Clint Byrumf322fe22017-05-10 20:53:12 -070064 buf = b''
James E. Blairc4b20412016-05-27 16:45:26 -070065 while True:
66 buf += s.recv(1)
Clint Byrumf322fe22017-05-10 20:53:12 -070067 if buf[-1:] == b'\n':
James E. Blairc4b20412016-05-27 16:45:26 -070068 break
69 buf = buf.strip()
70 self.log.debug("Received %s from socket" % (buf,))
71 s.close()
72 # Because we use '_stop' internally to wake up a
73 # waiting thread, don't allow it to actually be
74 # injected externally.
Clint Byrumf322fe22017-05-10 20:53:12 -070075 if buf != b'_stop':
James E. Blairc4b20412016-05-27 16:45:26 -070076 self.queue.put(buf)
77 except Exception:
78 self.log.exception("Exception in socket handler")
79
80 def get(self):
81 if not self.running:
82 raise Exception("CommandSocket.get called while stopped")
83 return self.queue.get()