Merge "Python 3 Fix: use proper octal notation"
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c61cea8..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
.. _`Turbo-Hipster`:
- http://git.openstack.org/cgit/stackforge/turbo-hipster/
+ https://git.openstack.org/cgit/openstack/turbo-hipster/
.. _`Turbo-Hipster Documentation`:
http://turbo-hipster.rtfd.org/
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index bc38376..f490162 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -19,6 +19,7 @@
import getpass
import os
import subprocess
+import threading
class Console(object):
@@ -50,6 +51,21 @@
return env
+def follow(fd):
+ newline_warning = False
+ with Console() as console:
+ while True:
+ line = fd.readline()
+ if not line:
+ break
+ if not line.endswith('\n'):
+ line += '\n'
+ newline_warning = True
+ console.addLine(line)
+ if newline_warning:
+ console.addLine('[Zuul] No trailing newline\n')
+
+
def run(cwd, cmd, args):
env = get_env()
env.update(args)
@@ -61,14 +77,20 @@
env=env,
)
- with Console() as console:
- while True:
- line = proc.stdout.readline()
- if not line:
- break
- console.addLine(line)
+ t = threading.Thread(target=follow, args=(proc.stdout,))
+ t.daemon = True
+ t.start()
- ret = proc.wait()
+ ret = proc.wait()
+ # Give the thread that is writing the console log up to 10 seconds
+ # to catch up and exit. If it hasn't done so by then, it is very
+ # likely stuck in readline() because it spawed a child that is
+ # holding stdout or stderr open.
+ t.join(10)
+ with Console() as console:
+ if t.isAlive():
+ console.addLine("[Zuul] standard output/error still open "
+ "after child exited")
console.addLine("[Zuul] Task exit code: %s\n" % ret)
return ret
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
index bbda87d..49643ae 100644
--- a/zuul/cmd/launcher.py
+++ b/zuul/cmd/launcher.py
@@ -29,8 +29,9 @@
import signal
import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
-# No zuul imports here because they pull in paramiko which must not be
+# No zuul imports that pull in paramiko here; it must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
# Similar situation with gear and statsd.
@@ -50,7 +51,8 @@
parser.add_argument('--keep-jobdir', dest='keep_jobdir',
action='store_true',
help='keep local jobdirs after run completes')
- parser.add_argument('command', choices=['reconfigure', 'stop'],
+ parser.add_argument('command',
+ choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
nargs='?')
self.args = parser.parse_args()
@@ -66,21 +68,12 @@
s.connect(path)
s.sendall('%s\n' % cmd)
- def send_reconfigure(self):
- self.send_command('reconfigure')
- sys.exit(0)
-
- def send_stop(self):
- self.send_command('stop')
- sys.exit(0)
-
def exit_handler(self):
self.launcher.stop()
self.launcher.join()
def main(self, daemon=True):
# See comment at top of file about zuul imports
- import zuul.launcher.ansiblelaunchserver
self.setup_logging('launcher', 'log_config')
@@ -109,11 +102,8 @@
server.parse_arguments()
server.read_config()
- if server.args.command == 'reconfigure':
- server.send_reconfigure()
- sys.exit(0)
- elif server.args.command == 'stop':
- server.send_stop()
+ if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+ server.send_command(server.args.command)
sys.exit(0)
server.configure_connections()
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 6fa78b9..ffbdbf0 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -22,6 +22,7 @@
import subprocess
import tempfile
import threading
+import time
import traceback
import Queue
import uuid
@@ -36,6 +37,13 @@
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release']
+
def boolify(x):
if isinstance(x, str):
@@ -43,12 +51,33 @@
return bool(x)
+class Watchdog(object):
+ def __init__(self, timeout, function, args):
+ self.timeout = timeout
+ self.function = function
+ self.args = args
+ self.thread = threading.Thread(target=self._run)
+ self.thread.daemon = True
+
+ def _run(self):
+ while self._running and time.time() < self.end:
+ time.sleep(10)
+ if self._running:
+ self.function(*self.args)
+
+ def start(self):
+ self._running = True
+ self.end = time.time() + self.timeout
+ self.thread.start()
+
+ def stop(self):
+ self._running = False
+
+
class JobDir(object):
def __init__(self, keep=False):
self.keep = keep
self.root = tempfile.mkdtemp()
- self.git_root = os.path.join(self.root, 'git')
- os.makedirs(self.git_root)
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
self.plugins_root = os.path.join(self.ansible_root, 'plugins')
@@ -77,6 +106,7 @@
self.config = config
self.keep_jobdir = keep_jobdir
self.hostname = socket.gethostname()
+ self.registered_functions = set()
self.node_workers = {}
self.jobs = {}
self.builds = {}
@@ -203,9 +233,17 @@
del self.jobs[name]
def register(self):
+ new_functions = set()
if self.accept_nodes:
- self.worker.registerFunction("node-assign:zuul")
- self.worker.registerFunction("stop:%s" % self.hostname)
+ new_functions.add("node-assign:zuul")
+ new_functions.add("stop:%s" % self.hostname)
+ new_functions.add("set_description:%s" % self.hostname)
+
+ for function in new_functions - self.registered_functions:
+ self.worker.registerFunction(function)
+ for function in self.registered_functions - new_functions:
+ self.worker.unRegisterFunction(function)
+ self.registered_functions = new_functions
def reconfigure(self):
self.log.debug("Reconfiguring")
@@ -219,6 +257,45 @@
"to worker:")
self.log.debug("Reconfiguration complete")
+ def pause(self):
+ self.log.debug("Pausing")
+ self.accept_nodes = False
+ self.register()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='pause'))
+ except Exception:
+ self.log.exception("Exception sending pause command "
+ "to worker:")
+ self.log.debug("Paused")
+
+ def unpause(self):
+ self.log.debug("Unpausing")
+ self.accept_nodes = True
+ self.register()
+ for node in self.node_workers.values():
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='unpause'))
+ except Exception:
+ self.log.exception("Exception sending unpause command "
+ "to worker:")
+ self.log.debug("Unpaused")
+
+ def release(self):
+ self.log.debug("Releasing idle nodes")
+ for node in self.node_workers.values():
+ if node.name in self.static_nodes:
+ continue
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='release'))
+ except Exception:
+ self.log.exception("Exception sending release command "
+ "to worker:")
+ self.log.debug("Finished releasing idle nodes")
+
def stop(self):
self.log.debug("Stopping")
# First, stop accepting new jobs
@@ -254,8 +331,14 @@
command = self.command_socket.get()
if command == 'reconfigure':
self.reconfigure()
- if command == 'stop':
+ elif command == 'stop':
self.stop()
+ elif command == 'pause':
+ self.pause()
+ elif command == 'unpause':
+ self.unpause()
+ elif command == 'release':
+ self.release()
except Exception:
self.log.exception("Exception while processing command")
@@ -283,6 +366,10 @@
elif job.name.startswith('stop:'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
+ elif job.name.startswith('set_description:'):
+ self.log.debug("Got set_description job: %s" %
+ job.unique)
+ job.sendWorkComplete()
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -376,6 +463,10 @@
self.labels = labels
self.thread = None
self.registered_functions = set()
+ # If the unpaused Event is set, that means we should run jobs.
+ # If it is clear, then we are paused and should not run jobs.
+ self.unpaused = threading.Event()
+ self.unpaused.set()
self._running = True
self.queue = Queue.Queue()
self.manager_name = manager_name
@@ -383,6 +474,8 @@
self.termination_queue = termination_queue
self.keep_jobdir = keep_jobdir
self.running_job_lock = threading.Lock()
+ self._get_job_lock = threading.Lock()
+ self._got_job = False
self._job_complete_event = threading.Event()
self._running_job = False
self._sent_complete_event = False
@@ -434,9 +527,31 @@
# will be set by the queue thread.
self.log.debug("Submitting stop request")
self._running = False
+ self.unpaused.set()
self.queue.put(dict(action='stop'))
self.queue.join()
+ def pause(self):
+ self.unpaused.clear()
+ self.worker.stopWaitingForJobs()
+
+ def unpause(self):
+ self.unpaused.set()
+
+ def release(self):
+ # If this node is idle, stop it.
+ old_unpaused = self.unpaused.is_set()
+ if old_unpaused:
+ self.pause()
+ with self._get_job_lock:
+ if self._got_job:
+ self.log.debug("This worker is not idle")
+ if old_unpaused:
+ self.unpause()
+ return
+ self.log.debug("Stopping due to release command")
+ self.queue.put(dict(action='stop'))
+
def _runQueue(self):
item = self.queue.get()
try:
@@ -449,6 +564,15 @@
else:
self._job_complete_event.wait()
self.worker.shutdown()
+ if item['action'] == 'pause':
+ self.log.debug("Received pause request")
+ self.pause()
+ if item['action'] == 'unpause':
+ self.log.debug("Received unpause request")
+ self.unpause()
+ if item['action'] == 'release':
+ self.log.debug("Received release request")
+ self.release()
elif item['action'] == 'reconfigure':
self.log.debug("Received reconfigure request")
self.register()
@@ -461,15 +585,21 @@
def runGearman(self):
while self._running:
try:
- self._runGearman()
+ self.unpaused.wait()
+ if self._running:
+ self._runGearman()
except Exception:
self.log.exception("Exception in gearman manager:")
+ with self._get_job_lock:
+ self._got_job = False
def _runGearman(self):
- try:
- job = self.worker.getJob()
- except gear.InterruptedError:
- return
+ with self._get_job_lock:
+ try:
+ job = self.worker.getJob()
+ self._got_job = True
+ except gear.InterruptedError:
+ return
self.log.debug("Node worker %s got job %s" % (self.name, job.name))
try:
if job.name not in self.registered_functions:
@@ -509,12 +639,14 @@
self.registered_functions = new_functions
def abortRunningJob(self):
+ return self.abortRunningProc(self.ansible_job_proc)
+
+ def abortRunningProc(self, proc):
aborted = False
self.log.debug("Abort: acquiring job lock")
with self.running_job_lock:
if self._running_job:
self.log.debug("Abort: a job is running")
- proc = self.ansible_proc
if proc:
self.log.debug("Abort: sending kill signal to job "
"process group")
@@ -610,7 +742,8 @@
'SUCCESS', {})
def runJob(self, job, args):
- self.ansible_proc = None
+ self.ansible_job_proc = None
+ self.ansible_post_proc = None
result = None
with self.running_job_lock:
if not self._running:
@@ -715,16 +848,20 @@
(dest,))
local_args = [
- 'command', '/usr/bin/rsync', '--delay-updates', '-F',
+ 'shell', '/usr/bin/rsync', '--delay-updates', '-F',
'--compress', '-rt', '--safe-links',
'--rsync-path="mkdir -p {dest} && rsync"',
'--rsh="/usr/bin/ssh -i {private_key_file} -S none '
'-o StrictHostKeyChecking=no -q"',
'--out-format="<<CHANGED>>%i %n%L"',
- '"{source}/"', '"{user}@{host}:{dest}"'
+ '{source}', '"{user}@{host}:{dest}"'
]
+ if scpfile.get('keep-hierarchy'):
+ source = '"%s/"' % scproot
+ else:
+ source = '`/usr/bin/find "%s" -type f`' % scproot
local_action = ' '.join(local_args).format(
- source=scproot,
+ source=source,
dest=dest,
private_key_file=self.private_key_file,
host=site['host'],
@@ -805,11 +942,8 @@
cwd=parameters['WORKSPACE'],
parameters=parameters)
task = dict(zuul_runner=runner)
- if timeout:
- task['when'] = '{{ timeout | int > 0 }}'
- task['async'] = '{{ timeout }}'
- else:
- task['async'] = 2 * 60 * 60 # 2 hour default timeout
+ task['when'] = '{{ timeout | int > 0 }}'
+ task['async'] = '{{ timeout }}'
task['poll'] = 5
tasks.append(task)
@@ -842,6 +976,8 @@
timeout = timeout.get('timeout')
if timeout:
timeout = timeout * 60
+ if not timeout:
+ timeout = ANSIBLE_DEFAULT_TIMEOUT
with open(jobdir.playbook, 'w') as playbook:
tasks = []
@@ -860,12 +996,6 @@
state='directory'))
main_block.append(task)
- # TODO: remove once zuul-worker DIB element has landed
- main_block.append(dict(shell="[ -f /usr/bin/yum ] && "
- "sudo /usr/bin/yum install "
- "libselinux-python || "
- "/bin/true"))
-
for builder in jjb_job.get('builders', []):
if 'shell' in builder:
main_block.extend(
@@ -876,6 +1006,7 @@
task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
error_block.append(task)
+ error_block.append(dict(fail=dict(msg='FAILURE')))
play = dict(hosts='node', name='Job body',
tasks=tasks)
@@ -912,43 +1043,61 @@
return timeout
+ def _ansibleTimeout(self, proc, msg):
+ self.log.warning(msg)
+ self.abortRunningProc(proc)
+
def runAnsiblePlaybook(self, jobdir, timeout):
- self.ansible_proc = subprocess.Popen(
+ self.ansible_job_proc = subprocess.Popen(
['ansible-playbook', jobdir.playbook,
'-e', 'timeout=%s' % timeout, '-v'],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
)
- (out, err) = self.ansible_proc.communicate()
- for line in out.split('\n'):
- line = line[:1024]
- self.log.debug("Ansible stdout: %s" % line)
- for line in err.split('\n'):
- line = line[:1024]
- self.log.debug("Ansible stderr: %s" % line)
- ret = self.ansible_proc.wait()
+ ret = None
+ watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+ self._ansibleTimeout,
+ (self.ansible_job_proc,
+ "Ansible timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_job_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible output: %s" % (line,))
+ ret = self.ansible_job_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible exit code: %s" % (ret,))
self.ansible_proc = None
return ret == 0
def runAnsiblePostPlaybook(self, jobdir, success):
- proc = subprocess.Popen(
+ self.ansible_post_proc = subprocess.Popen(
['ansible-playbook', jobdir.post_playbook,
'-e', 'success=%s' % success, '-v'],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
)
- (out, err) = proc.communicate()
- for line in out.split('\n'):
- line = line[:1024]
- self.log.debug("Ansible post stdout: %s" % line)
- for line in err.split('\n'):
- line = line[:1024]
- self.log.debug("Ansible post stderr: %s" % line)
- return proc.wait() == 0
+ ret = None
+ watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+ self._ansibleTimeout,
+ (self.ansible_post_proc,
+ "Ansible post timeout exceeded"))
+ watchdog.start()
+ try:
+ for line in iter(self.ansible_post_proc.stdout.readline, b''):
+ line = line[:1024].rstrip()
+ self.log.debug("Ansible post output: %s" % (line,))
+ ret = self.ansible_post_proc.wait()
+ finally:
+ watchdog.stop()
+ self.log.debug("Ansible post exit code: %s" % (ret,))
+ self.ansible_post_proc = None
+ return ret == 0
class JJB(jenkins_jobs.builder.Builder):