Merge "Add command socket support to zuul-scheduler" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 126e4e2..86b01ef 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -224,6 +224,11 @@
 
 .. attr:: scheduler
 
+   .. attr:: command_socket
+      :default: /var/lib/zuul/scheduler.socket
+
+      Path to command socket file for the scheduler process.
+
    .. attr:: tenant_config
       :required:
 
diff --git a/tests/base.py b/tests/base.py
index 931771f..ea01d20 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2066,6 +2066,9 @@
                             FIXTURE_DIR,
                             self.config.get('scheduler', 'tenant_config')))
         self.config.set('scheduler', 'state_dir', self.state_root)
+        self.config.set(
+            'scheduler', 'command_socket',
+            os.path.join(self.test_root, 'scheduler.socket'))
         self.config.set('merger', 'git_dir', self.merger_src_root)
         self.config.set('executor', 'git_dir', self.executor_src_root)
         self.config.set('executor', 'private_key_file', self.private_key_file)
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 539d55b..7722d6e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -22,6 +22,7 @@
 import zuul.cmd
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd_config
+import zuul.scheduler
 
 # No zuul imports here because they pull in paramiko which must not be
 # imported until after the daemonization.
@@ -37,6 +38,18 @@
         super(Scheduler, self).__init__()
         self.gear_server_pid = None
 
+    def createParser(self):
+        parser = super(Scheduler, self).createParser()
+        parser.add_argument('command',
+                            choices=zuul.scheduler.COMMANDS,
+                            nargs='?')
+        return parser
+
+    def parseArguments(self, args=None):
+        super(Scheduler, self).parseArguments()
+        if self.args.command:
+            self.args.nodaemon = True
+
     def reconfigure_handler(self, signum, frame):
         signal.signal(signal.SIGHUP, signal.SIG_IGN)
         self.log.debug("Reconfiguration triggered")
@@ -48,8 +61,7 @@
             self.log.exception("Reconfiguration failed:")
         signal.signal(signal.SIGHUP, self.reconfigure_handler)
 
-    def exit_handler(self, signum, frame):
-        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+    def exit_handler(self):
         self.sched.exit()
         self.sched.join()
         self.stop_gear_server()
@@ -104,6 +116,10 @@
     def run(self):
         # See comment at top of file about zuul imports
         import zuul.scheduler
+        if self.args.command in zuul.scheduler.COMMANDS:
+            self.send_command(self.args.command)
+            sys.exit(0)
+        # See comment at top of file about zuul imports
         import zuul.executor.client
         import zuul.merger.client
         import zuul.nodepool
@@ -162,14 +178,17 @@
         webapp.start()
 
         signal.signal(signal.SIGHUP, self.reconfigure_handler)
-        signal.signal(signal.SIGUSR1, self.exit_handler)
-        signal.signal(signal.SIGTERM, self.term_handler)
-        while True:
-            try:
-                signal.pause()
-            except KeyboardInterrupt:
-                print("Ctrl + C: asking scheduler to exit nicely...\n")
-                self.exit_handler(signal.SIGINT, None)
+
+        if self.args.nodaemon:
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking scheduler to exit nicely...\n")
+                    self.exit_handler()
+                    sys.exit(0)
+        else:
+            self.sched.join()
 
 
 def main():
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7dee00d..b978979 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,10 +30,13 @@
 from zuul import exceptions
 from zuul import version as zuul_version
 from zuul import rpclistener
+from zuul.lib import commandsocket
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd
 import zuul.lib.queue
 
+COMMANDS = ['stop']
+
 
 class ManagementEvent(object):
     """An event that should be processed within the main queue run loop"""
@@ -215,6 +218,9 @@
         self.wake_event = threading.Event()
         self.layout_lock = threading.Lock()
         self.run_handler_lock = threading.Lock()
+        self.command_map = dict(
+            stop=self.stop,
+        )
         self._pause = False
         self._exit = False
         self._stopped = False
@@ -243,6 +249,11 @@
             time_dir = self._get_time_database_dir()
             self.time_database = model.TimeDataBase(time_dir)
 
+        command_socket = get_default(
+            self.config, 'scheduler', 'command_socket',
+            '/var/lib/zuul/scheduler.socket')
+        self.command_socket = commandsocket.CommandSocket(command_socket)
+
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
         self.tenant_last_reconfigured = {}
@@ -250,6 +261,14 @@
 
     def start(self):
         super(Scheduler, self).start()
+        self._command_running = True
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand,
+                                               name='command')
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
         self.rpc.start()
         self.stats_thread.start()
 
@@ -261,6 +280,17 @@
         self.stats_thread.join()
         self.rpc.stop()
         self.rpc.join()
+        self._command_running = False
+        self.command_socket.stop()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
 
     def registerConnections(self, connections, webapp, load=True):
         # load: whether or not to trigger the onLoad for the connection. This