Merge "Rename ssh_port to connection_port" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index b3c2e44..86b01ef 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -224,6 +224,11 @@
.. attr:: scheduler
+ .. attr:: command_socket
+ :default: /var/lib/zuul/scheduler.socket
+
+ Path to command socket file for the scheduler process.
+
.. attr:: tenant_config
:required:
@@ -282,6 +287,11 @@
.. attr:: merger
+ ,, attr:: command_socket
+ :default: /var/lib/zuul/merger.socket
+
+ Path to command socket file for the merger process.
+
.. attr:: git_dir
Directory in which Zuul should clone git repositories.
@@ -392,6 +402,11 @@
.. attr:: executor
+ .. attr:: command_socket
+ :default: /var/lib/zuul/executor.socket
+
+ Path to command socket file for the executor process.
+
.. attr:: finger_port
:default: 79
diff --git a/tests/base.py b/tests/base.py
index 210f03b..ea01d20 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2066,10 +2066,16 @@
FIXTURE_DIR,
self.config.get('scheduler', 'tenant_config')))
self.config.set('scheduler', 'state_dir', self.state_root)
+ self.config.set(
+ 'scheduler', 'command_socket',
+ os.path.join(self.test_root, 'scheduler.socket'))
self.config.set('merger', 'git_dir', self.merger_src_root)
self.config.set('executor', 'git_dir', self.executor_src_root)
self.config.set('executor', 'private_key_file', self.private_key_file)
self.config.set('executor', 'state_dir', self.executor_state_root)
+ self.config.set(
+ 'executor', 'command_socket',
+ os.path.join(self.test_root, 'executor.socket'))
self.statsd = FakeStatsd()
if self.config.has_section('statsd'):
@@ -2256,13 +2262,13 @@
branch='master', tag='init')
if 'job' in item:
if 'run' in item['job']:
- files['%s.yaml' % item['job']['run']] = ''
+ files['%s' % item['job']['run']] = ''
for fn in zuul.configloader.as_list(
item['job'].get('pre-run', [])):
- files['%s.yaml' % fn] = ''
+ files['%s' % fn] = ''
for fn in zuul.configloader.as_list(
item['job'].get('post-run', [])):
- files['%s.yaml' % fn] = ''
+ files['%s' % fn] = ''
root = os.path.join(self.test_root, "config")
if not os.path.exists(root):
diff --git a/tests/fixtures/config/ansible/git/common-config/zuul.yaml b/tests/fixtures/config/ansible/git/common-config/zuul.yaml
index 28bfce1..d0a8f7b 100644
--- a/tests/fixtures/config/ansible/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/ansible/git/common-config/zuul.yaml
@@ -129,10 +129,10 @@
parent: base-urls
name: hello
run: playbooks/hello-post.yaml
- post-run: playbooks/hello-post
+ post-run: playbooks/hello-post.yaml
- job:
parent: python27
name: failpost
run: playbooks/post-broken.yaml
- post-run: playbooks/post-broken
+ post-run: playbooks/post-broken.yaml
diff --git a/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
index 161e5a1..48da2d4 100644
--- a/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
+++ b/tests/fixtures/config/branch-variants/git/project-config/zuul.yaml
@@ -34,10 +34,10 @@
- job:
name: base
parent: null
- pre-run: playbooks/base/pre
+ pre-run: playbooks/base/pre.yaml
post-run:
- - playbooks/base/post-ssh
- - playbooks/base/post-logs
+ - playbooks/base/post-ssh.yaml
+ - playbooks/base/post-logs.yaml
- project:
name: project-config
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
index 322927f..7e9cbc3 100644
--- a/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/.zuul.yaml
@@ -1,16 +1,16 @@
- job:
name: puppet-base
- pre-run: playbooks/prepare-node-common
+ pre-run: playbooks/prepare-node-common.yaml
- job:
name: puppet-module-base
parent: puppet-base
- pre-run: playbooks/prepare-node-unit
+ pre-run: playbooks/prepare-node-unit.yaml
- job:
name: puppet-lint
parent: puppet-module-base
- run: playbooks/run-lint
+ run: playbooks/run-lint.yaml
tags:
- master
diff --git a/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml b/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml
index 4701b80..74704a0 100644
--- a/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml
+++ b/tests/fixtures/config/branch-variants/git/puppet-integration/stable.zuul.yaml
@@ -1,16 +1,16 @@
- job:
name: puppet-base
- pre-run: playbooks/prepare-node-common
+ pre-run: playbooks/prepare-node-common.yaml
- job:
name: puppet-module-base
parent: puppet-base
- pre-run: playbooks/prepare-node-unit
+ pre-run: playbooks/prepare-node-unit.yaml
- job:
name: puppet-lint
parent: puppet-module-base
- run: playbooks/run-lint
+ run: playbooks/run-lint.yaml
tags:
- stable
diff --git a/tests/fixtures/config/job-output/git/common-config/zuul.yaml b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
index 4df0020..9373038 100644
--- a/tests/fixtures/config/job-output/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/job-output/git/common-config/zuul.yaml
@@ -23,8 +23,8 @@
- job:
name: job-output-failure
- run: playbooks/job-output
- post-run: playbooks/job-output-failure-post
+ run: playbooks/job-output.yaml
+ post-run: playbooks/job-output-failure-post.yaml
- project:
name: org/project
diff --git a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
index 16d7dee..b00d4c2 100644
--- a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
@@ -18,8 +18,8 @@
- job:
name: python27
- pre-run: playbooks/pre
- post-run: playbooks/post
+ pre-run: playbooks/pre.yaml
+ post-run: playbooks/post.yaml
vars:
waitpath: '{{zuul._test.test_root}}/{{zuul.build}}/test_wait'
run: playbooks/python27.yaml
diff --git a/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml
index 7817745..16f48b1 100644
--- a/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/pre-playbook/git/common-config/zuul.yaml
@@ -18,6 +18,6 @@
- job:
name: python27
- pre-run: playbooks/pre
- post-run: playbooks/post
+ pre-run: playbooks/pre.yaml
+ post-run: playbooks/post.yaml
run: playbooks/python27.yaml
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 54cf111..b9c9b32 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1935,8 +1935,8 @@
name: parent
roles:
- zuul: bare-role
- pre-run: playbooks/parent-pre
- post-run: playbooks/parent-post
+ pre-run: playbooks/parent-pre.yaml
+ post-run: playbooks/parent-post.yaml
- job:
name: project-test
diff --git a/zuul/ansible/callback/zuul_stream.py b/zuul/ansible/callback/zuul_stream.py
index 8845e9b..df28a57 100644
--- a/zuul/ansible/callback/zuul_stream.py
+++ b/zuul/ansible/callback/zuul_stream.py
@@ -150,7 +150,7 @@
buff += more
if buff:
self._log_streamline(
- host, line.decode("utf-8", "backslashreplace"))
+ host, buff.decode("utf-8", "backslashreplace"))
def _log_streamline(self, host, line):
if "[Zuul] Task exit code" in line:
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index e150f9c..236fd9f 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -23,6 +23,7 @@
import logging.config
import os
import signal
+import socket
import sys
import traceback
import threading
@@ -184,3 +185,12 @@
pass
with daemon.DaemonContext(pidfile=pid):
self.run()
+
+ def send_command(self, cmd):
+ command_socket = get_default(
+ self.config, self.app_name, 'command_socket',
+ '/var/lib/zuul/%s.socket' % self.app_name)
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(command_socket)
+ cmd = '%s\n' % cmd
+ s.sendall(cmd.encode('utf8'))
diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py
index aef8c95..ade9715 100755
--- a/zuul/cmd/executor.py
+++ b/zuul/cmd/executor.py
@@ -18,7 +18,6 @@
import logging
import os
import pwd
-import socket
import sys
import signal
import tempfile
@@ -52,15 +51,6 @@
if self.args.command:
self.args.nodaemon = True
- def send_command(self, cmd):
- state_dir = get_default(self.config, 'executor', 'state_dir',
- '/var/lib/zuul', expand_user=True)
- path = os.path.join(state_dir, 'executor.socket')
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.connect(path)
- cmd = '%s\n' % cmd
- s.sendall(cmd.encode('utf8'))
-
def exit_handler(self):
self.executor.stop()
self.executor.join()
diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py
index 56b6b44..7db1bee 100755
--- a/zuul/cmd/merger.py
+++ b/zuul/cmd/merger.py
@@ -15,8 +15,10 @@
# under the License.
import signal
+import sys
import zuul.cmd
+import zuul.merger.server
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
@@ -28,14 +30,28 @@
app_name = 'merger'
app_description = 'A standalone Zuul merger.'
- def exit_handler(self, signum, frame):
- signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+ def createParser(self):
+ parser = super(Merger, self).createParser()
+ parser.add_argument('command',
+ choices=zuul.merger.server.COMMANDS,
+ nargs='?')
+ return parser
+
+ def parseArguments(self, args=None):
+ super(Merger, self).parseArguments()
+ if self.args.command:
+ self.args.nodaemon = True
+
+ def exit_handler(self):
self.merger.stop()
self.merger.join()
def run(self):
# See comment at top of file about zuul imports
import zuul.merger.server
+ if self.args.command in zuul.merger.server.COMMANDS:
+ self.send_command(self.args.command)
+ sys.exit(0)
self.configure_connections(source_only=True)
@@ -45,14 +61,18 @@
self.connections)
self.merger.start()
- signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
- while True:
- try:
- signal.pause()
- except KeyboardInterrupt:
- print("Ctrl + C: asking merger to exit nicely...\n")
- self.exit_handler(signal.SIGINT, None)
+
+ if self.args.nodaemon:
+ while True:
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print("Ctrl + C: asking merger to exit nicely...\n")
+ self.exit_handler()
+ sys.exit(0)
+ else:
+ self.merger.join()
def main():
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 539d55b..7722d6e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -22,6 +22,7 @@
import zuul.cmd
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd_config
+import zuul.scheduler
# No zuul imports here because they pull in paramiko which must not be
# imported until after the daemonization.
@@ -37,6 +38,18 @@
super(Scheduler, self).__init__()
self.gear_server_pid = None
+ def createParser(self):
+ parser = super(Scheduler, self).createParser()
+ parser.add_argument('command',
+ choices=zuul.scheduler.COMMANDS,
+ nargs='?')
+ return parser
+
+ def parseArguments(self, args=None):
+ super(Scheduler, self).parseArguments()
+ if self.args.command:
+ self.args.nodaemon = True
+
def reconfigure_handler(self, signum, frame):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
self.log.debug("Reconfiguration triggered")
@@ -48,8 +61,7 @@
self.log.exception("Reconfiguration failed:")
signal.signal(signal.SIGHUP, self.reconfigure_handler)
- def exit_handler(self, signum, frame):
- signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+ def exit_handler(self):
self.sched.exit()
self.sched.join()
self.stop_gear_server()
@@ -104,6 +116,10 @@
def run(self):
# See comment at top of file about zuul imports
import zuul.scheduler
+ if self.args.command in zuul.scheduler.COMMANDS:
+ self.send_command(self.args.command)
+ sys.exit(0)
+ # See comment at top of file about zuul imports
import zuul.executor.client
import zuul.merger.client
import zuul.nodepool
@@ -162,14 +178,17 @@
webapp.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
- signal.signal(signal.SIGUSR1, self.exit_handler)
- signal.signal(signal.SIGTERM, self.term_handler)
- while True:
- try:
- signal.pause()
- except KeyboardInterrupt:
- print("Ctrl + C: asking scheduler to exit nicely...\n")
- self.exit_handler(signal.SIGINT, None)
+
+ if self.args.nodaemon:
+ while True:
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print("Ctrl + C: asking scheduler to exit nicely...\n")
+ self.exit_handler()
+ sys.exit(0)
+ else:
+ self.sched.join()
def main():
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 1b4165b..3a91995 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -1608,10 +1608,13 @@
self.merger = self._getMerger(self.merge_root)
self.update_queue = DeduplicateQueue()
+ command_socket = get_default(
+ self.config, 'executor', 'command_socket',
+ '/var/lib/zuul/executor.socket')
+ self.command_socket = commandsocket.CommandSocket(command_socket)
+
state_dir = get_default(self.config, 'executor', 'state_dir',
'/var/lib/zuul', expand_user=True)
- path = os.path.join(state_dir, 'executor.socket')
- self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
self.ansible_dir = ansible_dir
if os.path.exists(ansible_dir):
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 765d9e0..576d41e 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -19,10 +19,14 @@
import gear
+from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.merger import merger
+COMMANDS = ['stop']
+
+
class MergeServer(object):
log = logging.getLogger("zuul.MergeServer")
@@ -40,9 +44,16 @@
self.merger = merger.Merger(
merge_root, connections, merge_email, merge_name, speed_limit,
speed_time)
+ self.command_map = dict(
+ stop=self.stop)
+ command_socket = get_default(
+ self.config, 'merger', 'command_socket',
+ '/var/lib/zuul/merger.socket')
+ self.command_socket = commandsocket.CommandSocket(command_socket)
def start(self):
self._running = True
+ self._command_running = True
server = self.config.get('gearman', 'server')
port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
@@ -54,6 +65,13 @@
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(
+ target=self.runCommand, name='command')
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
self.log.debug("Starting worker")
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
@@ -67,12 +85,23 @@
def stop(self):
self.log.debug("Stopping")
self._running = False
+ self._command_running = False
+ self.command_socket.stop()
self.worker.shutdown()
self.log.debug("Stopped")
def join(self):
self.thread.join()
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
+
def run(self):
self.log.debug("Starting merge listener")
while self._running:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7dee00d..b978979 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,10 +30,13 @@
from zuul import exceptions
from zuul import version as zuul_version
from zuul import rpclistener
+from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd
import zuul.lib.queue
+COMMANDS = ['stop']
+
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
@@ -215,6 +218,9 @@
self.wake_event = threading.Event()
self.layout_lock = threading.Lock()
self.run_handler_lock = threading.Lock()
+ self.command_map = dict(
+ stop=self.stop,
+ )
self._pause = False
self._exit = False
self._stopped = False
@@ -243,6 +249,11 @@
time_dir = self._get_time_database_dir()
self.time_database = model.TimeDataBase(time_dir)
+ command_socket = get_default(
+ self.config, 'scheduler', 'command_socket',
+ '/var/lib/zuul/scheduler.socket')
+ self.command_socket = commandsocket.CommandSocket(command_socket)
+
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
self.tenant_last_reconfigured = {}
@@ -250,6 +261,14 @@
def start(self):
super(Scheduler, self).start()
+ self._command_running = True
+ self.log.debug("Starting command processor")
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand,
+ name='command')
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
self.rpc.start()
self.stats_thread.start()
@@ -261,6 +280,17 @@
self.stats_thread.join()
self.rpc.stop()
self.rpc.join()
+ self._command_running = False
+ self.command_socket.stop()
+
+ def runCommand(self):
+ while self._command_running:
+ try:
+ command = self.command_socket.get().decode('utf8')
+ if command != '_stop':
+ self.command_map[command]()
+ except Exception:
+ self.log.exception("Exception while processing command")
def registerConnections(self, connections, webapp, load=True):
# load: whether or not to trigger the onLoad for the connection. This