Ansible launcher: add zuul_runner module

This runs the commands asynchronously (but waits for their
completion).  This is more robust for long-running commands
because it avoids the built-in ssh timeout.

This adds an ansible module to actually run the remote command
so that we can:
 * process the console log
 * use ansible async (the script module does not support it)
 * control the environment variables of the script being run

It also adds a callback plugin to track the elapsed time so that
we can use the built-in timeout features of async commands.

Note that the module and plugin are GPL licensed.

Change-Id: I19b2b6a5c362bb9d843e7802aefe0eb5df9c5ed7
diff --git a/zuul/ansible/__init__.py b/zuul/ansible/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/__init__.py
diff --git a/zuul/ansible/library/__init__.py b/zuul/ansible/library/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/library/__init__.py
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
new file mode 100644
index 0000000..7554244
--- /dev/null
+++ b/zuul/ansible/library/zuul_runner.py
@@ -0,0 +1,74 @@
+#!/usr/bin/python
+
+# Copyright (c) 2016 IBM Corp.
+#
+# This module is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software.  If not, see <http://www.gnu.org/licenses/>.
+
+import datetime
+import subprocess
+
+
+class Console(object):
+    def __enter__(self):
+        self.logfile = open('/tmp/console.log', 'w+')
+        return self
+
+    def __exit__(self, etype, value, tb):
+        self.logfile.close()
+
+    def addLine(self, ln):
+        ts = datetime.datetime.now()
+        outln = '%s %s' % (str(ts), ln)
+        self.logfile.write(outln)
+
+
+def run(cwd, cmd, args):
+    proc = subprocess.Popen(
+        [cmd],
+        cwd=cwd,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.STDOUT,
+        env=args,
+    )
+
+    with Console() as console:
+        while True:
+            line = proc.stdout.readline()
+            if not line:
+                break
+            console.addLine(line)
+
+    ret = proc.wait()
+    return ret
+
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            command=dict(required=True, default=None),
+            cwd=dict(required=True, default=None),
+            parameters=dict(default={}, type='dict')
+        )
+    )
+
+    p = module.params
+    ret = run(p['cwd'], p['command'], p['parameters'])
+    if ret == 0:
+        module.exit_json(changed=True, rc=ret)
+    else:
+        module.fail_json(msg="Exit code %s" % ret, rc=ret)
+
+from ansible.module_utils.basic import *  # noqa
+
+main()
diff --git a/zuul/ansible/plugins/__init__.py b/zuul/ansible/plugins/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/plugins/__init__.py
diff --git a/zuul/ansible/plugins/callback_plugins/__init__.py b/zuul/ansible/plugins/callback_plugins/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/zuul/ansible/plugins/callback_plugins/__init__.py
diff --git a/zuul/ansible/plugins/callback_plugins/timeout.py b/zuul/ansible/plugins/callback_plugins/timeout.py
new file mode 100644
index 0000000..245e988
--- /dev/null
+++ b/zuul/ansible/plugins/callback_plugins/timeout.py
@@ -0,0 +1,57 @@
+# Copyright 2016 IBM Corp.
+#
+# This file is part of Zuul
+#
+# This file is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This file is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this file.  If not, see <http://www.gnu.org/licenses/>.
+
+import time
+
+from ansible.executor.task_result import TaskResult
+from ansible.plugins.callback import CallbackBase
+
+
+class CallbackModule(CallbackBase):
+    def __init__(self, *args, **kw):
+        super(CallbackModule, self).__init__(*args, **kw)
+        self._elapsed_time = 0.0
+        self._task_start_time = None
+        self._play = None
+
+    def v2_playbook_on_play_start(self, play):
+        self._play = play
+
+    def playbook_on_task_start(self, name, is_conditional):
+        self._task_start_time = time.time()
+
+    def v2_on_any(self, *args, **kw):
+        result = None
+        if args and isinstance(args[0], TaskResult):
+            result = args[0]
+        if not result:
+            return
+
+        if self._task_start_time is not None:
+            task_time = time.time() - self._task_start_time
+            self._elapsed_time += task_time
+        if self._play and result._host:
+            manager = self._play.get_variable_manager()
+            facts = dict(elapsed_time=self._elapsed_time)
+
+            overall_timeout = manager.extra_vars.get('timeout')
+            if overall_timeout is not None:
+                timeout = int(overall_timeout) - int(self._elapsed_time)
+                facts['timeout'] = timeout
+
+            manager.set_nonpersistent_facts(result._host, facts)
+        self._task_start_time = None
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 4e652d2..8eb0374 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -16,7 +16,6 @@
 import logging
 import multiprocessing
 import os
-import Queue
 import re
 import shutil
 import signal
@@ -24,7 +23,6 @@
 import subprocess
 import tempfile
 import threading
-import time
 import traceback
 import uuid
 
@@ -33,6 +31,9 @@
 import jenkins_jobs.builder
 import zmq
 
+import zuul.ansible.library
+import zuul.ansible.plugins.callback_plugins
+
 
 class JobDir(object):
     def __init__(self):
@@ -41,6 +42,8 @@
         os.makedirs(self.git_root)
         self.ansible_root = os.path.join(self.root, 'ansible')
         os.makedirs(self.ansible_root)
+        self.plugins_root = os.path.join(self.ansible_root, 'plugins')
+        os.makedirs(self.plugins_root)
         self.inventory = os.path.join(self.ansible_root, 'inventory')
         self.playbook = os.path.join(self.ansible_root, 'playbook')
         self.post_playbook = os.path.join(self.ansible_root, 'post_playbook')
@@ -293,8 +296,7 @@
         self._job_complete_event = threading.Event()
         self._running_job = False
         self._sent_complete_event = False
-        self._job_timeout = None
-        self._job_start_time = None
+        self.workspace_root = config.get('launcher', 'workspace_root')
 
     def isAlive(self):
         # Meant to be called from the manager
@@ -336,23 +338,7 @@
         self.queue.join()
 
     def _runQueue(self):
-        # This also runs the timeout function if needed
-        try:
-            item = self.queue.get(True, 10)  # 10 second resolution on timeout
-        except Queue.Empty:
-            # We don't need these in a critical section, but we do
-            # need them not to change while we evaluate them, so make
-            # local copies.
-            running = self._running_job
-            start = self._job_start_time
-            timeout = self._job_timeout
-            now = time.time()
-            if (running and timeout and start
-                and now - start >= timeout):
-                self.log.info("Job timed out after %s seconds" %
-                              (now - start,))
-                self.abortRunningJob()
-            return
+        item = self.queue.get()
         try:
             if item['action'] == 'stop':
                 self.log.debug("Received stop request")
@@ -466,13 +452,11 @@
             self.log.exception("Exception while sending job start event")
 
         try:
-            result = self.runJob(job)
+            result = self.runJob(job, args)
         except Exception:
             self.log.exception("Exception while launching job thread")
 
         self._running_job = False
-        self._job_timeout = None
-        self._job_start_time = None
         if not result:
             result = b''
 
@@ -527,7 +511,7 @@
         self.sendCompleteEvent('zuul:launcher-shutdown',
                                'SUCCESS', {})
 
-    def runJob(self, job):
+    def runJob(self, job, args):
         self.ansible_proc = None
         result = None
         with self.running_job_lock:
@@ -541,10 +525,7 @@
         with JobDir() as jobdir:
             self.log.debug("Job %s: job root at %s" %
                            (job.unique, jobdir.root))
-
-            self.prepareAnsibleFiles(jobdir, job)
-
-            self._job_start_time = time.time()
+            timeout = self.prepareAnsibleFiles(jobdir, job, args)
 
             data = {
                 'manager': self.manager_name,
@@ -554,7 +535,7 @@
             job.sendWorkData(json.dumps(data))
             job.sendWorkStatus(0, 100)
 
-            job_status = self.runAnsiblePlaybook(jobdir)
+            job_status = self.runAnsiblePlaybook(jobdir, timeout)
             post_status = self.runAnsiblePostPlaybook(jobdir, job_status)
             if job_status and post_status:
                 status = 'SUCCESS'
@@ -621,7 +602,46 @@
         tasks.append(task)
         return tasks
 
-    def prepareAnsibleFiles(self, jobdir, gearman_job):
+    def _makeBuilderTask(self, jobdir, builder, parameters, timeout):
+        tasks = []
+        script_fn = '%s.sh' % str(uuid.uuid4().hex)
+        script_path = os.path.join(jobdir.script_root, script_fn)
+        with open(script_path, 'w') as script:
+            script.write(builder['shell'])
+
+        remote_path = os.path.join('/tmp', script_fn)
+        copy = dict(src=script_path,
+                    dest=remote_path,
+                    mode=0555)
+        task = dict(copy=copy)
+        tasks.append(task)
+
+        runner = dict(command=remote_path,
+                      cwd=parameters['WORKSPACE'],
+                      parameters=parameters)
+        task = dict(zuul_runner=runner)
+        if timeout:
+            task['when'] = '{{ timeout | int > 0 }}'
+            task['async'] = '{{ timeout }}'
+        else:
+            task['async'] = 2 * 60 * 60  # 2 hour default timeout
+        task['poll'] = 5
+        tasks.append(task)
+
+        filetask = dict(path=remote_path,
+                        state='absent')
+        task = dict(file=filetask)
+        tasks.append(task)
+
+        return tasks
+
+    def prepareAnsibleFiles(self, jobdir, gearman_job, args):
+        job_name = gearman_job.name.split(':')[1]
+        jjb_job = self.jobs[job_name]
+
+        parameters = args.copy()
+        parameters['WORKSPACE'] = os.path.join(self.workspace_root, job_name)
+
         with open(jobdir.inventory, 'w') as inventory:
             for host_name, host_vars in self.getHostList():
                 inventory.write(host_name)
@@ -629,27 +649,30 @@
                 for k, v in host_vars.items():
                     inventory.write('%s=%s' % (k, v))
                 inventory.write('\n')
-        job_name = gearman_job.name.split(':')[1]
-        jjb_job = self.jobs[job_name]
 
+        timeout = None
         for wrapper in jjb_job.get('wrappers', []):
             if isinstance(wrapper, dict):
                 timeout = wrapper.get('build-timeout', {})
                 if isinstance(timeout, dict):
                     timeout = timeout.get('timeout')
                     if timeout:
-                        self._job_timeout = timeout * 60
+                        timeout = timeout * 60
 
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
+
+            task = dict(file=dict(path='/tmp/console.log', state='absent'))
+            tasks.append(task)
+
+            task = dict(file=dict(path=parameters['WORKSPACE'],
+                                  state='directory'))
+            tasks.append(task)
+
             for builder in jjb_job.get('builders', []):
                 if 'shell' in builder:
-                    script_fn = '%s.sh' % str(uuid.uuid4().hex)
-                    script_fn = os.path.join(jobdir.script_root, script_fn)
-                    with open(script_fn, 'w') as script:
-                        script.write(builder['shell'])
-                    tasks.append(dict(script='%s >> /tmp/console.log 2>&1' %
-                                      script_fn))
+                    tasks.extend(self._makeBuilderTask(jobdir, builder,
+                                                       parameters, timeout))
             play = dict(hosts='node', name='Job body',
                         tasks=tasks)
             playbook.write(yaml.dump([play]))
@@ -670,15 +693,30 @@
             config.write('hostfile = %s\n' % jobdir.inventory)
             config.write('host_key_checking = False\n')
 
-    def runAnsiblePlaybook(self, jobdir):
+            callback_path = zuul.ansible.plugins.callback_plugins.__file__
+            callback_path = os.path.abspath(callback_path)
+            callback_path = os.path.dirname(callback_path)
+            config.write('callback_plugins = %s\n' % callback_path)
+
+            library_path = zuul.ansible.library.__file__
+            library_path = os.path.abspath(library_path)
+            library_path = os.path.dirname(library_path)
+            config.write('library = %s\n' % library_path)
+
+        return timeout
+
+    def runAnsiblePlaybook(self, jobdir, timeout):
         self.ansible_proc = subprocess.Popen(
-            ['ansible-playbook', jobdir.playbook],
+            ['ansible-playbook', jobdir.playbook,
+             '-e', 'timeout=%s' % timeout, '-v'],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
             preexec_fn=os.setsid,
         )
         (out, err) = self.ansible_proc.communicate()
+        self.log.debug("Ansible stdout:\n%s" % out)
+        self.log.debug("Ansible stderr:\n%s" % err)
         ret = self.ansible_proc.wait()
         self.ansible_proc = None
         return ret == 0