Ansible launcher: add zuul_runner module
This runs the commands asynchronously (but waits for their
completion). This is more robust for long-running commands
because it avoids the built-in ssh timeout.
This adds an ansible module to actually run the remote command
so that we can:
* process the console log
* use ansible async (the script module does not support it)
* control the environment variables of the script being run
It also adds a callback plugin to track the elapsed time so that
we can use the built-in timeout features of async commands.
Note that the module and plugin are GPL licensed.
Change-Id: I19b2b6a5c362bb9d843e7802aefe0eb5df9c5ed7
diff --git a/zuul/ansible/__init__.py b/zuul/ansible/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/__init__.py
diff --git a/zuul/ansible/library/__init__.py b/zuul/ansible/library/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/library/__init__.py
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
new file mode 100644
index 0000000..7554244
--- /dev/null
+++ b/zuul/ansible/library/zuul_runner.py
@@ -0,0 +1,74 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software. If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+import subprocess
+
+
+class Console(object):
+ def __enter__(self):
+ self.logfile = open('/tmp/console.log', 'w+')
+ return self
+
+ def __exit__(self, etype, value, tb):
+ self.logfile.close()
+
+ def addLine(self, ln):
+ ts = datetime.datetime.now()
+ outln = '%s %s' % (str(ts), ln)
+ self.logfile.write(outln)
+
+
+def run(cwd, cmd, args):
+ proc = subprocess.Popen(
+ [cmd],
+ cwd=cwd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=args,
+ )
+
+ with Console() as console:
+ while True:
+ line = proc.stdout.readline()
+ if not line:
+ break
+ console.addLine(line)
+
+ ret = proc.wait()
+ return ret
+
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ command=dict(required=True, default=None),
+ cwd=dict(required=True, default=None),
+ parameters=dict(default={}, type='dict')
+ )
+ )
+
+ p = module.params
+ ret = run(p['cwd'], p['command'], p['parameters'])
+ if ret == 0:
+ module.exit_json(changed=True, rc=ret)
+ else:
+ module.fail_json(msg="Exit code %s" % ret, rc=ret)
+
+from ansible.module_utils.basic import * # noqa
+
+main()
diff --git a/zuul/ansible/plugins/__init__.py b/zuul/ansible/plugins/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/plugins/__init__.py
diff --git a/zuul/ansible/plugins/callback_plugins/__init__.py b/zuul/ansible/plugins/callback_plugins/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/plugins/callback_plugins/__init__.py
diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py
new file mode 100644
index 0000000..245e988
--- /dev/null
+++ b/zuul/ansible/plugins/callback_plugins/timeout.py
@@ -0,0 +1,57 @@
+# Copyright 2016 IBM Corp.
+#
+# This file is part of Zuul
+#
+# This file is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This file is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this file. If not, see <http://www.gnu.org/licenses/>.
+
+import time
+
+from ansible.executor.task_result import TaskResult
+from ansible.plugins.callback import CallbackBase
+
+
+class CallbackModule(CallbackBase):
+ def __init__(self, *args, **kw):
+ super(CallbackModule, self).__init__(*args, **kw)
+ self._elapsed_time = 0.0
+ self._task_start_time = None
+ self._play = None
+
+ def v2_playbook_on_play_start(self, play):
+ self._play = play
+
+ def playbook_on_task_start(self, name, is_conditional):
+ self._task_start_time = time.time()
+
+ def v2_on_any(self, *args, **kw):
+ result = None
+ if args and isinstance(args[0], TaskResult):
+ result = args[0]
+ if not result:
+ return
+
+ if self._task_start_time is not None:
+ task_time = time.time() - self._task_start_time
+ self._elapsed_time += task_time
+ if self._play and result._host:
+ manager = self._play.get_variable_manager()
+ facts = dict(elapsed_time=self._elapsed_time)
+
+ overall_timeout = manager.extra_vars.get('timeout')
+ if overall_timeout is not None:
+ timeout = int(overall_timeout) - int(self._elapsed_time)
+ facts['timeout'] = timeout
+
+ manager.set_nonpersistent_facts(result._host, facts)
+ self._task_start_time = None
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 4e652d2..8eb0374 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -16,7 +16,6 @@
import logging
import multiprocessing
import os
-import Queue
import re
import shutil
import signal
@@ -24,7 +23,6 @@
import subprocess
import tempfile
import threading
-import time
import traceback
import uuid
@@ -33,6 +31,9 @@
import jenkins_jobs.builder
import zmq
+import zuul.ansible.library
+import zuul.ansible.plugins.callback_plugins
+
class JobDir(object):
def __init__(self):
@@ -41,6 +42,8 @@
os.makedirs(self.git_root)
self.ansible_root = os.path.join(self.root, 'ansible')
os.makedirs(self.ansible_root)
+ self.plugins_root = os.path.join(self.ansible_root, 'plugins')
+ os.makedirs(self.plugins_root)
self.inventory = os.path.join(self.ansible_root, 'inventory')
self.playbook = os.path.join(self.ansible_root, 'playbook')
self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
@@ -293,8 +296,7 @@
self._job_complete_event = threading.Event()
self._running_job = False
self._sent_complete_event = False
- self._job_timeout = None
- self._job_start_time = None
+ self.workspace_root = config.get('launcher', 'workspace_root')
def isAlive(self):
# Meant to be called from the manager
@@ -336,23 +338,7 @@
self.queue.join()
def _runQueue(self):
- # This also runs the timeout function if needed
- try:
- item = self.queue.get(True, 10) # 10 second resolution on timeout
- except Queue.Empty:
- # We don't need these in a critical section, but we do
- # need them not to change while we evaluate them, so make
- # local copies.
- running = self._running_job
- start = self._job_start_time
- timeout = self._job_timeout
- now = time.time()
- if (running and timeout and start
- and now - start >= timeout):
- self.log.info("Job timed out after %s seconds" %
- (now - start,))
- self.abortRunningJob()
- return
+ item = self.queue.get()
try:
if item['action'] == 'stop':
self.log.debug("Received stop request")
@@ -466,13 +452,11 @@
self.log.exception("Exception while sending job start event")
try:
- result = self.runJob(job)
+ result = self.runJob(job, args)
except Exception:
self.log.exception("Exception while launching job thread")
self._running_job = False
- self._job_timeout = None
- self._job_start_time = None
if not result:
result = b''
@@ -527,7 +511,7 @@
self.sendCompleteEvent('zuul:launcher-shutdown',
'SUCCESS', {})
- def runJob(self, job):
+ def runJob(self, job, args):
self.ansible_proc = None
result = None
with self.running_job_lock:
@@ -541,10 +525,7 @@
with JobDir() as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
-
- self.prepareAnsibleFiles(jobdir, job)
-
- self._job_start_time = time.time()
+ timeout = self.prepareAnsibleFiles(jobdir, job, args)
data = {
'manager': self.manager_name,
@@ -554,7 +535,7 @@
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
- job_status = self.runAnsiblePlaybook(jobdir)
+ job_status = self.runAnsiblePlaybook(jobdir, timeout)
post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
if job_status and post_status:
status = 'SUCCESS'
@@ -621,7 +602,46 @@
tasks.append(task)
return tasks
- def prepareAnsibleFiles(self, jobdir, gearman_job):
+ def _makeBuilderTask(self, jobdir, builder, parameters, timeout):
+ tasks = []
+ script_fn = '%s.sh' % str(uuid.uuid4().hex)
+ script_path = os.path.join(jobdir.script_root, script_fn)
+ with open(script_path, 'w') as script:
+ script.write(builder['shell'])
+
+ remote_path = os.path.join('/tmp', script_fn)
+ copy = dict(src=script_path,
+ dest=remote_path,
+ mode=0555)
+ task = dict(copy=copy)
+ tasks.append(task)
+
+ runner = dict(command=remote_path,
+ cwd=parameters['WORKSPACE'],
+ parameters=parameters)
+ task = dict(zuul_runner=runner)
+ if timeout:
+ task['when'] = '{{ timeout | int > 0 }}'
+ task['async'] = '{{ timeout }}'
+ else:
+ task['async'] = 2 * 60 * 60 # 2 hour default timeout
+ task['poll'] = 5
+ tasks.append(task)
+
+ filetask = dict(path=remote_path,
+ state='absent')
+ task = dict(file=filetask)
+ tasks.append(task)
+
+ return tasks
+
+ def prepareAnsibleFiles(self, jobdir, gearman_job, args):
+ job_name = gearman_job.name.split(':')[1]
+ jjb_job = self.jobs[job_name]
+
+ parameters = args.copy()
+ parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
+
with open(jobdir.inventory, 'w') as inventory:
for host_name, host_vars in self.getHostList():
inventory.write(host_name)
@@ -629,27 +649,30 @@
for k, v in host_vars.items():
inventory.write('%s=%s' % (k, v))
inventory.write('\n')
- job_name = gearman_job.name.split(':')[1]
- jjb_job = self.jobs[job_name]
+ timeout = None
for wrapper in jjb_job.get('wrappers', []):
if isinstance(wrapper, dict):
timeout = wrapper.get('build-timeout', {})
if isinstance(timeout, dict):
timeout = timeout.get('timeout')
if timeout:
- self._job_timeout = timeout * 60
+ timeout = timeout * 60
with open(jobdir.playbook, 'w') as playbook:
tasks = []
+
+ task = dict(file=dict(path='/tmp/console.log', state='absent'))
+ tasks.append(task)
+
+ task = dict(file=dict(path=parameters['WORKSPACE'],
+ state='directory'))
+ tasks.append(task)
+
for builder in jjb_job.get('builders', []):
if 'shell' in builder:
- script_fn = '%s.sh' % str(uuid.uuid4().hex)
- script_fn = os.path.join(jobdir.script_root, script_fn)
- with open(script_fn, 'w') as script:
- script.write(builder['shell'])
- tasks.append(dict(script='%s >> /tmp/console.log 2>&1' %
- script_fn))
+ tasks.extend(self._makeBuilderTask(jobdir, builder,
+ parameters, timeout))
play = dict(hosts='node', name='Job body',
tasks=tasks)
playbook.write(yaml.dump([play]))
@@ -670,15 +693,30 @@
config.write('hostfile = %s\n' % jobdir.inventory)
config.write('host_key_checking = False\n')
- def runAnsiblePlaybook(self, jobdir):
+ callback_path = zuul.ansible.plugins.callback_plugins.__file__
+ callback_path = os.path.abspath(callback_path)
+ callback_path = os.path.dirname(callback_path)
+ config.write('callback_plugins = %s\n' % callback_path)
+
+ library_path = zuul.ansible.library.__file__
+ library_path = os.path.abspath(library_path)
+ library_path = os.path.dirname(library_path)
+ config.write('library = %s\n' % library_path)
+
+ return timeout
+
+ def runAnsiblePlaybook(self, jobdir, timeout):
self.ansible_proc = subprocess.Popen(
- ['ansible-playbook', jobdir.playbook],
+ ['ansible-playbook', jobdir.playbook,
+ '-e', 'timeout=%s' % timeout, '-v'],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
)
(out, err) = self.ansible_proc.communicate()
+ self.log.debug("Ansible stdout:\n%s" % out)
+ self.log.debug("Ansible stderr:\n%s" % err)
ret = self.ansible_proc.wait()
self.ansible_proc = None
return ret == 0