Merge "Rename ssh_port to connection_port" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index b3c2e44..86b01ef 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -224,6 +224,11 @@
 
 .. attr:: scheduler
 
+   .. attr:: command_socket
+      :default: /var/lib/zuul/scheduler.socket
+
+      Path to command socket file for the scheduler process.
+
    .. attr:: tenant_config
       :required:
 
@@ -282,6 +287,11 @@
 
 .. attr:: merger
 
+   ,, attr:: command_socket
+      :default: /var/lib/zuul/merger.socket
+
+      Path to command socket file for the merger process.
+
    .. attr:: git_dir
 
       Directory in which Zuul should clone git repositories.
@@ -392,6 +402,11 @@
 
 .. attr:: executor
 
+   .. attr:: command_socket
+      :default: /var/lib/zuul/executor.socket
+
+      Path to command socket file for the executor process.
+
    .. attr:: finger_port
       :default: 79
 
diff --git a/tests/base.py b/tests/base.py
index 210f03b..ea01d20 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2066,10 +2066,16 @@
                             FIXTURE_DIR,
                             self.config.get('scheduler', 'tenant_config')))
         self.config.set('scheduler', 'state_dir', self.state_root)
+        self.config.set(
+            'scheduler', 'command_socket',
+            os.path.join(self.test_root, 'scheduler.socket'))
         self.config.set('merger', 'git_dir', self.merger_src_root)
         self.config.set('executor', 'git_dir', self.executor_src_root)
         self.config.set('executor', 'private_key_file', self.private_key_file)
         self.config.set('executor', 'state_dir', self.executor_state_root)
+        self.config.set(
+            'executor', 'command_socket',
+            os.path.join(self.test_root, 'executor.socket'))
 
         self.statsd = FakeStatsd()
         if self.config.has_section('statsd'):
@@ -2256,13 +2262,13 @@
                                      branch='master', tag='init')
             if 'job' in item:
                 if 'run' in item['job']:
-                    files['%s.yaml' % item['job']['run']] = ''
+                    files['%s' % item['job']['run']] = ''
                 for fn in zuul.configloader.as_list(
                         item['job'].get('pre-run', [])):
-                    files['%s.yaml' % fn] = ''
+                    files['%s' % fn] = ''
                 for fn in zuul.configloader.as_list(
                         item['job'].get('post-run', [])):
-                    files['%s.yaml' % fn] = ''
+                    files['%s' % fn] = ''
 
         root = os.path.join(self.test_root, "config")
         if not os.path.exists(root):
diff --git a/tests/fixtures/config/ansible/git/common-config/zuul.yaml b/tests/fixtures/config/ansible/git/common-config/zuul.yaml
index 28bfce1..d0a8f7b 100644
--- a/tests/fixtures/config/ansible/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/ansible/git/common-config/zuul.yaml
@@ -129,10 +129,10 @@
     parent: base-urls
     name: hello
     run: playbooks/hello-post.yaml
-    post-run: playbooks/hello-post
+    post-run: playbooks/hello-post.yaml
 
 - job:
     parent: python27
     name: failpost
     run: playbooks/post-broken.yaml
-    post-run: playbooks/post-broken
+    post-run: playbooks/post-broken.yaml
diff --git a/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
index 161e5a1..48da2d4 100644
--- a/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
+++ b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
@@ -34,10 +34,10 @@
 - job:
     name: base
     parent: null
-    pre-run: playbooks/base/pre
+    pre-run: playbooks/base/pre.yaml
     post-run:
-      - playbooks/base/post-ssh
-      - playbooks/base/post-logs
+      - playbooks/base/post-ssh.yaml
+      - playbooks/base/post-logs.yaml
 
 - project:
     name: project-config
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
index 322927f..7e9cbc3 100644
--- a/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
@@ -1,16 +1,16 @@
 - job:
     name: puppet-base
-    pre-run: playbooks/prepare-node-common
+    pre-run: playbooks/prepare-node-common.yaml
 
 - job:
     name: puppet-module-base
     parent: puppet-base
-    pre-run: playbooks/prepare-node-unit
+    pre-run: playbooks/prepare-node-unit.yaml
 
 - job:
     name: puppet-lint
     parent: puppet-module-base
-    run: playbooks/run-lint
+    run: playbooks/run-lint.yaml
     tags:
       - master
 
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml
index 4701b80..74704a0 100644
--- a/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml
@@ -1,16 +1,16 @@
 - job:
     name: puppet-base
-    pre-run: playbooks/prepare-node-common
+    pre-run: playbooks/prepare-node-common.yaml
 
 - job:
     name: puppet-module-base
     parent: puppet-base
-    pre-run: playbooks/prepare-node-unit
+    pre-run: playbooks/prepare-node-unit.yaml
 
 - job:
     name: puppet-lint
     parent: puppet-module-base
-    run: playbooks/run-lint
+    run: playbooks/run-lint.yaml
     tags:
       - stable
 
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index 4df0020..9373038 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -23,8 +23,8 @@
 
 - job:
     name: job-output-failure
-    run: playbooks/job-output
-    post-run: playbooks/job-output-failure-post
+    run: playbooks/job-output.yaml
+    post-run: playbooks/job-output-failure-post.yaml
 
 - project:
     name: org/project
diff --git a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
index 16d7dee..b00d4c2 100644
--- a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
@@ -18,8 +18,8 @@
 
 - job:
     name: python27
-    pre-run: playbooks/pre
-    post-run: playbooks/post
+    pre-run: playbooks/pre.yaml
+    post-run: playbooks/post.yaml
     vars:
       waitpath: '{{zuul._test.test_root}}/{{zuul.build}}/test_wait'
     run: playbooks/python27.yaml
diff --git a/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml
index 7817745..16f48b1 100644
--- a/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml
@@ -18,6 +18,6 @@
 
 - job:
     name: python27
-    pre-run: playbooks/pre
-    post-run: playbooks/post
+    pre-run: playbooks/pre.yaml
+    post-run: playbooks/post.yaml
     run: playbooks/python27.yaml
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 54cf111..b9c9b32 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1935,8 +1935,8 @@
                 name: parent
                 roles:
                   - zuul: bare-role
-                pre-run: playbooks/parent-pre
-                post-run: playbooks/parent-post
+                pre-run: playbooks/parent-pre.yaml
+                post-run: playbooks/parent-post.yaml
 
             - job:
                 name: project-test
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 8845e9b..df28a57 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -150,7 +150,7 @@
                         buff += more
             if buff:
                 self._log_streamline(
-                    host, line.decode("utf-8", "backslashreplace"))
+                    host, buff.decode("utf-8", "backslashreplace"))
 
     def _log_streamline(self, host, line):
         if "[Zuul] Task exit code" in line:
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index e150f9c..236fd9f 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -23,6 +23,7 @@
 import logging.config
 import os
 import signal
+import socket
 import sys
 import traceback
 import threading
@@ -184,3 +185,12 @@
                 pass
             with daemon.DaemonContext(pidfile=pid):
                 self.run()
+
+    def send_command(self, cmd):
+        command_socket = get_default(
+            self.config, self.app_name, 'command_socket',
+            '/var/lib/zuul/%s.socket' % self.app_name)
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(command_socket)
+        cmd = '%s\n' % cmd
+        s.sendall(cmd.encode('utf8'))
diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py
index aef8c95..ade9715 100755
--- a/zuul/cmd/executor.py
+++ b/zuul/cmd/executor.py
@@ -18,7 +18,6 @@
 import logging
 import os
 import pwd
-import socket
 import sys
 import signal
 import tempfile
@@ -52,15 +51,6 @@
         if self.args.command:
             self.args.nodaemon = True
 
-    def send_command(self, cmd):
-        state_dir = get_default(self.config, 'executor', 'state_dir',
-                                '/var/lib/zuul', expand_user=True)
-        path = os.path.join(state_dir, 'executor.socket')
-        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        s.connect(path)
-        cmd = '%s\n' % cmd
-        s.sendall(cmd.encode('utf8'))
-
     def exit_handler(self):
         self.executor.stop()
         self.executor.join()
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index 56b6b44..7db1bee 100755
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -15,8 +15,10 @@
 # under the License.
 
 import signal
+import sys
 
 import zuul.cmd
+import zuul.merger.server
 
 # No zuul imports here because they pull in paramiko which must not be
 # imported until after the daemonization.
@@ -28,14 +30,28 @@
     app_name = 'merger'
     app_description = 'A standalone Zuul merger.'
 
-    def exit_handler(self, signum, frame):
-        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+    def createParser(self):
+        parser = super(Merger, self).createParser()
+        parser.add_argument('command',
+                            choices=zuul.merger.server.COMMANDS,
+                            nargs='?')
+        return parser
+
+    def parseArguments(self, args=None):
+        super(Merger, self).parseArguments()
+        if self.args.command:
+            self.args.nodaemon = True
+
+    def exit_handler(self):
         self.merger.stop()
         self.merger.join()
 
     def run(self):
         # See comment at top of file about zuul imports
         import zuul.merger.server
+        if self.args.command in zuul.merger.server.COMMANDS:
+            self.send_command(self.args.command)
+            sys.exit(0)
 
         self.configure_connections(source_only=True)
 
@@ -45,14 +61,18 @@
                                                      self.connections)
         self.merger.start()
 
-        signal.signal(signal.SIGUSR1, self.exit_handler)
         signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
-        while True:
-            try:
-                signal.pause()
-            except KeyboardInterrupt:
-                print("Ctrl + C: asking merger to exit nicely...\n")
-                self.exit_handler(signal.SIGINT, None)
+
+        if self.args.nodaemon:
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking merger to exit nicely...\n")
+                    self.exit_handler()
+                    sys.exit(0)
+        else:
+            self.merger.join()
 
 
 def main():
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 539d55b..7722d6e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -22,6 +22,7 @@
 import zuul.cmd
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd_config
+import zuul.scheduler
 
 # No zuul imports here because they pull in paramiko which must not be
 # imported until after the daemonization.
@@ -37,6 +38,18 @@
         super(Scheduler, self).__init__()
         self.gear_server_pid = None
 
+    def createParser(self):
+        parser = super(Scheduler, self).createParser()
+        parser.add_argument('command',
+                            choices=zuul.scheduler.COMMANDS,
+                            nargs='?')
+        return parser
+
+    def parseArguments(self, args=None):
+        super(Scheduler, self).parseArguments()
+        if self.args.command:
+            self.args.nodaemon = True
+
     def reconfigure_handler(self, signum, frame):
         signal.signal(signal.SIGHUP, signal.SIG_IGN)
         self.log.debug("Reconfiguration triggered")
@@ -48,8 +61,7 @@
             self.log.exception("Reconfiguration failed:")
         signal.signal(signal.SIGHUP, self.reconfigure_handler)
 
-    def exit_handler(self, signum, frame):
-        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+    def exit_handler(self):
         self.sched.exit()
         self.sched.join()
         self.stop_gear_server()
@@ -104,6 +116,10 @@
     def run(self):
         # See comment at top of file about zuul imports
         import zuul.scheduler
+        if self.args.command in zuul.scheduler.COMMANDS:
+            self.send_command(self.args.command)
+            sys.exit(0)
+        # See comment at top of file about zuul imports
         import zuul.executor.client
         import zuul.merger.client
         import zuul.nodepool
@@ -162,14 +178,17 @@
         webapp.start()
 
         signal.signal(signal.SIGHUP, self.reconfigure_handler)
-        signal.signal(signal.SIGUSR1, self.exit_handler)
-        signal.signal(signal.SIGTERM, self.term_handler)
-        while True:
-            try:
-                signal.pause()
-            except KeyboardInterrupt:
-                print("Ctrl + C: asking scheduler to exit nicely...\n")
-                self.exit_handler(signal.SIGINT, None)
+
+        if self.args.nodaemon:
+            while True:
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    print("Ctrl + C: asking scheduler to exit nicely...\n")
+                    self.exit_handler()
+                    sys.exit(0)
+        else:
+            self.sched.join()
 
 
 def main():
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 1b4165b..3a91995 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -1608,10 +1608,13 @@
         self.merger = self._getMerger(self.merge_root)
         self.update_queue = DeduplicateQueue()
 
+        command_socket = get_default(
+            self.config, 'executor', 'command_socket',
+            '/var/lib/zuul/executor.socket')
+        self.command_socket = commandsocket.CommandSocket(command_socket)
+
         state_dir = get_default(self.config, 'executor', 'state_dir',
                                 '/var/lib/zuul', expand_user=True)
-        path = os.path.join(state_dir, 'executor.socket')
-        self.command_socket = commandsocket.CommandSocket(path)
         ansible_dir = os.path.join(state_dir, 'ansible')
         self.ansible_dir = ansible_dir
         if os.path.exists(ansible_dir):
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 765d9e0..576d41e 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,10 +19,14 @@
 
 import gear
 
+from zuul.lib import commandsocket
 from zuul.lib.config import get_default
 from zuul.merger import merger
 
 
+COMMANDS = ['stop']
+
+
 class MergeServer(object):
     log = logging.getLogger("zuul.MergeServer")
 
@@ -40,9 +44,16 @@
         self.merger = merger.Merger(
             merge_root, connections, merge_email, merge_name, speed_limit,
             speed_time)
+        self.command_map = dict(
+            stop=self.stop)
+        command_socket = get_default(
+            self.config, 'merger', 'command_socket',
+            '/var/lib/zuul/merger.socket')
+        self.command_socket = commandsocket.CommandSocket(command_socket)
 
     def start(self):
         self._running = True
+        self._command_running = True
         server = self.config.get('gearman', 'server')
         port = get_default(self.config, 'gearman', 'port', 4730)
         ssl_key = get_default(self.config, 'gearman', 'ssl_key')
@@ -54,6 +65,13 @@
         self.worker.waitForServer()
         self.log.debug("Registering")
         self.register()
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(
+            target=self.runCommand, name='command')
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
         self.log.debug("Starting worker")
         self.thread = threading.Thread(target=self.run)
         self.thread.daemon = True
@@ -67,12 +85,23 @@
     def stop(self):
         self.log.debug("Stopping")
         self._running = False
+        self._command_running = False
+        self.command_socket.stop()
         self.worker.shutdown()
         self.log.debug("Stopped")
 
     def join(self):
         self.thread.join()
 
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
+
     def run(self):
         self.log.debug("Starting merge listener")
         while self._running:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7dee00d..b978979 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,10 +30,13 @@
 from zuul import exceptions
 from zuul import version as zuul_version
 from zuul import rpclistener
+from zuul.lib import commandsocket
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd
 import zuul.lib.queue
 
+COMMANDS = ['stop']
+
 
 class ManagementEvent(object):
     """An event that should be processed within the main queue run loop"""
@@ -215,6 +218,9 @@
         self.wake_event = threading.Event()
         self.layout_lock = threading.Lock()
         self.run_handler_lock = threading.Lock()
+        self.command_map = dict(
+            stop=self.stop,
+        )
         self._pause = False
         self._exit = False
         self._stopped = False
@@ -243,6 +249,11 @@
             time_dir = self._get_time_database_dir()
             self.time_database = model.TimeDataBase(time_dir)
 
+        command_socket = get_default(
+            self.config, 'scheduler', 'command_socket',
+            '/var/lib/zuul/scheduler.socket')
+        self.command_socket = commandsocket.CommandSocket(command_socket)
+
         self.zuul_version = zuul_version.version_info.release_string()
         self.last_reconfigured = None
         self.tenant_last_reconfigured = {}
@@ -250,6 +261,14 @@
 
     def start(self):
         super(Scheduler, self).start()
+        self._command_running = True
+        self.log.debug("Starting command processor")
+        self.command_socket.start()
+        self.command_thread = threading.Thread(target=self.runCommand,
+                                               name='command')
+        self.command_thread.daemon = True
+        self.command_thread.start()
+
         self.rpc.start()
         self.stats_thread.start()
 
@@ -261,6 +280,17 @@
         self.stats_thread.join()
         self.rpc.stop()
         self.rpc.join()
+        self._command_running = False
+        self.command_socket.stop()
+
+    def runCommand(self):
+        while self._command_running:
+            try:
+                command = self.command_socket.get().decode('utf8')
+                if command != '_stop':
+                    self.command_map[command]()
+            except Exception:
+                self.log.exception("Exception while processing command")
 
     def registerConnections(self, connections, webapp, load=True):
         # load: whether or not to trigger the onLoad for the connection. This