Add command socket support to zuul-scheduler

Bring online commandsocket support for the scheduler.

Change-Id: Ia1719650623e79d40f239776eb770550bb73169b
Signed-off-by: Paul Belanger <pabelanger@redhat.com>
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7dee00d..b978979 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,10 +30,13 @@
 from zuul import exceptions
 from zuul import version as zuul_version
 from zuul import rpclistener
+from zuul.lib import commandsocket
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd
 import zuul.lib.queue
 
+COMMANDS = ['stop']
+
 
 class ManagementEvent(object):
     """An event that should be processed within the main queue run loop"""
@@ -215,6 +218,9 @@
         self.wake_event = threading.Event()
         self.layout_lock = threading.Lock()
         self.run_handler_lock = threading.Lock()
+        self.command_map = dict(
+            stop=self.stop,
+        )
         self._pause = False
         self._exit = False
         self._stopped = False
@@ -243,6 +249,11 @@
             time_dir = self._get_time_database_dir()
             self.time_database = model.TimeDataBase(time_dir)
 
+        command_socket = get_default(
+            self.config, 'scheduler', 'command_socket',
+            '/var/lib/zuul/scheduler.socket')
+        self.command_socket = commandsocket.CommandSocket(command_socket)
+
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
         self.tenant_last_reconfigured = {}
@@ -250,6 +261,14 @@
 
     def start(self):
         super(Scheduler, self).start()
+        self._command_running = True
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand,
+                                               name='command')
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
         self.rpc.start()
         self.stats_thread.start()
 
@@ -261,6 +280,17 @@
         self.stats_thread.join()
         self.rpc.stop()
         self.rpc.join()
+        self._command_running = False
+        self.command_socket.stop()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
 
     def registerConnections(self, connections, webapp, load=True):
         # load: whether or not to trigger the onLoad for the connection. This