Add command socket support to zuul-scheduler
Bring online commandsocket support for the scheduler.
Change-Id: Ia1719650623e79d40f239776eb770550bb73169b
Signed-off-by: Paul Belanger <pabelanger@redhat.com>
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7dee00d..b978979 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,10 +30,13 @@
from zuul import exceptions
from zuul import version as zuul_version
from zuul import rpclistener
+from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd
import zuul.lib.queue
+COMMANDS = ['stop']
+
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
@@ -215,6 +218,9 @@
self.wake_event = threading.Event()
self.layout_lock = threading.Lock()
self.run_handler_lock = threading.Lock()
+ self.command_map = dict(
+ stop=self.stop,
+ )
self._pause = False
self._exit = False
self._stopped = False
@@ -243,6 +249,11 @@
time_dir = self._get_time_database_dir()
self.time_database = model.TimeDataBase(time_dir)
+ command_socket = get_default(
+ self.config, 'scheduler', 'command_socket',
+ '/var/lib/zuul/scheduler.socket')
+ self.command_socket = commandsocket.CommandSocket(command_socket)
+
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
self.tenant_last_reconfigured = {}
@@ -250,6 +261,14 @@
def start(self):
super(Scheduler, self).start()
+ self._command_running = True
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand,
+ name='command')
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
self.rpc.start()
self.stats_thread.start()
@@ -261,6 +280,17 @@
self.stats_thread.join()
self.rpc.stop()
self.rpc.join()
+ self._command_running = False
+ self.command_socket.stop()
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
def registerConnections(self, connections, webapp, load=True):
# load: whether or not to trigger the onLoad for the connection. This