Ansible launcher: add watchdog for ansible

Kill the ansible-playbook if it exceeds its timeout.

Change-Id: I9f0a4277dc184fab6ce77ec508b77acbd59ec7ba
Co-Authored-By: James E. Blair <jeblair@redhat.com>
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index bd6f408..77e9e6c 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -22,6 +22,7 @@
 import subprocess
 import tempfile
 import threading
+import time
 import traceback
 import Queue
 import uuid
@@ -36,6 +37,10 @@
 import zuul.ansible.plugins.callback_plugins
 from zuul.lib import commandsocket
 
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
 
 def boolify(x):
     if isinstance(x, str):
@@ -43,6 +48,29 @@
     return bool(x)
 
 
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.function(*self.args)
+
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
+
+
 class JobDir(object):
     def __init__(self, keep=False):
         self.keep = keep
@@ -509,12 +537,14 @@
         self.registered_functions = new_functions
 
     def abortRunningJob(self):
+        return self.abortRunningProc(self.ansible_job_proc)
+
+    def abortRunningProc(self, proc):
         aborted = False
         self.log.debug("Abort: acquiring job lock")
         with self.running_job_lock:
             if self._running_job:
                 self.log.debug("Abort: a job is running")
-                proc = self.ansible_proc
                 if proc:
                     self.log.debug("Abort: sending kill signal to job "
                                    "process group")
@@ -610,7 +640,8 @@
                                'SUCCESS', {})
 
     def runJob(self, job, args):
-        self.ansible_proc = None
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         result = None
         with self.running_job_lock:
             if not self._running:
@@ -809,11 +840,8 @@
                       cwd=parameters['WORKSPACE'],
                       parameters=parameters)
         task = dict(zuul_runner=runner)
-        if timeout:
-            task['when'] = '{{ timeout | int > 0 }}'
-            task['async'] = '{{ timeout }}'
-        else:
-            task['async'] = 2 * 60 * 60  # 2 hour default timeout
+        task['when'] = '{{ timeout | int > 0 }}'
+        task['async'] = '{{ timeout }}'
         task['poll'] = 5
         tasks.append(task)
 
@@ -846,6 +874,8 @@
                     timeout = timeout.get('timeout')
                     if timeout:
                         timeout = timeout * 60
+        if not timeout:
+            timeout = ANSIBLE_DEFAULT_TIMEOUT
 
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
@@ -917,8 +947,12 @@
 
         return timeout
 
+    def _ansibleTimeout(self, proc, msg):
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+
     def runAnsiblePlaybook(self, jobdir, timeout):
-        self.ansible_proc = subprocess.Popen(
+        self.ansible_job_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.playbook,
              '-e', 'timeout=%s' % timeout, '-v'],
             cwd=jobdir.ansible_root,
@@ -926,15 +960,25 @@
             stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
         )
-        for line in iter(self.ansible_proc.stdout.readline, b''):
-            line = line[:1024]
-            self.log.debug(line)
-        ret = self.ansible_proc.wait()
+        ret = None
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (self.ansible_job_proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_job_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = self.ansible_job_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
         self.ansible_proc = None
         return ret == 0
 
     def runAnsiblePostPlaybook(self, jobdir, success):
-        proc = subprocess.Popen(
+        self.ansible_post_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.post_playbook,
              '-e', 'success=%s' % success, '-v'],
             cwd=jobdir.ansible_root,
@@ -942,10 +986,22 @@
             stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
         )
-        for line in iter(proc.stdout.readline, b''):
-            line = line[:1024]
-            self.log.debug(line)
-        return proc.wait() == 0
+        ret = None
+        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+                            self._ansibleTimeout,
+                            (self.ansible_post_proc,
+                             "Ansible post timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_post_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible post output: %s" % (line,))
+            ret = self.ansible_post_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible post exit code: %s" % (ret,))
+        self.ansible_post_proc = None
+        return ret == 0
 
 
 class JJB(jenkins_jobs.builder.Builder):