Merge "Python 3 Fix: use proper octal notation"
diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst
index c61cea8..f368cb9 100644
--- a/doc/source/launchers.rst
+++ b/doc/source/launchers.rst
@@ -6,7 +6,7 @@
    https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
 
 .. _`Turbo-Hipster`:
-   http://git.openstack.org/cgit/stackforge/turbo-hipster/
+   https://git.openstack.org/cgit/openstack/turbo-hipster/
 
 .. _`Turbo-Hipster Documentation`:
    http://turbo-hipster.rtfd.org/
diff --git a/zuul/ansible/library/zuul_runner.py b/zuul/ansible/library/zuul_runner.py
index bc38376..f490162 100644
--- a/zuul/ansible/library/zuul_runner.py
+++ b/zuul/ansible/library/zuul_runner.py
@@ -19,6 +19,7 @@
 import getpass
 import os
 import subprocess
+import threading
 
 
 class Console(object):
@@ -50,6 +51,21 @@
     return env
 
 
+def follow(fd):
+    newline_warning = False
+    with Console() as console:
+        while True:
+            line = fd.readline()
+            if not line:
+                break
+            if not line.endswith('\n'):
+                line += '\n'
+                newline_warning = True
+            console.addLine(line)
+        if newline_warning:
+            console.addLine('[Zuul] No trailing newline\n')
+
+
 def run(cwd, cmd, args):
     env = get_env()
     env.update(args)
@@ -61,14 +77,20 @@
         env=env,
     )
 
-    with Console() as console:
-        while True:
-            line = proc.stdout.readline()
-            if not line:
-                break
-            console.addLine(line)
+    t = threading.Thread(target=follow, args=(proc.stdout,))
+    t.daemon = True
+    t.start()
 
-        ret = proc.wait()
+    ret = proc.wait()
+    # Give the thread that is writing the console log up to 10 seconds
+    # to catch up and exit.  If it hasn't done so by then, it is very
+    # likely stuck in readline() because it spawed a child that is
+    # holding stdout or stderr open.
+    t.join(10)
+    with Console() as console:
+        if t.isAlive():
+            console.addLine("[Zuul] standard output/error still open "
+                            "after child exited")
         console.addLine("[Zuul] Task exit code: %s\n" % ret)
     return ret
 
diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py
index bbda87d..49643ae 100644
--- a/zuul/cmd/launcher.py
+++ b/zuul/cmd/launcher.py
@@ -29,8 +29,9 @@
 import signal
 
 import zuul.cmd
+import zuul.launcher.ansiblelaunchserver
 
-# No zuul imports here because they pull in paramiko which must not be
+# No zuul imports that pull in paramiko here; it must not be
 # imported until after the daemonization.
 # https://github.com/paramiko/paramiko/issues/59
 # Similar situation with gear and statsd.
@@ -50,7 +51,8 @@
         parser.add_argument('--keep-jobdir', dest='keep_jobdir',
                             action='store_true',
                             help='keep local jobdirs after run completes')
-        parser.add_argument('command', choices=['reconfigure', 'stop'],
+        parser.add_argument('command',
+                            choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
                             nargs='?')
 
         self.args = parser.parse_args()
@@ -66,21 +68,12 @@
         s.connect(path)
         s.sendall('%s\n' % cmd)
 
-    def send_reconfigure(self):
-        self.send_command('reconfigure')
-        sys.exit(0)
-
-    def send_stop(self):
-        self.send_command('stop')
-        sys.exit(0)
-
     def exit_handler(self):
         self.launcher.stop()
         self.launcher.join()
 
     def main(self, daemon=True):
         # See comment at top of file about zuul imports
-        import zuul.launcher.ansiblelaunchserver
 
         self.setup_logging('launcher', 'log_config')
 
@@ -109,11 +102,8 @@
     server.parse_arguments()
     server.read_config()
 
-    if server.args.command == 'reconfigure':
-        server.send_reconfigure()
-        sys.exit(0)
-    elif server.args.command == 'stop':
-        server.send_stop()
+    if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
+        server.send_command(server.args.command)
         sys.exit(0)
 
     server.configure_connections()
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 6fa78b9..ffbdbf0 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -22,6 +22,7 @@
 import subprocess
 import tempfile
 import threading
+import time
 import traceback
 import Queue
 import uuid
@@ -36,6 +37,13 @@
 import zuul.ansible.plugins.callback_plugins
 from zuul.lib import commandsocket
 
+ANSIBLE_WATCHDOG_GRACE = 5 * 60
+ANSIBLE_DEFAULT_TIMEOUT = 2 * 60 * 60
+ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
+
+
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release']
+
 
 def boolify(x):
     if isinstance(x, str):
@@ -43,12 +51,33 @@
     return bool(x)
 
 
+class Watchdog(object):
+    def __init__(self, timeout, function, args):
+        self.timeout = timeout
+        self.function = function
+        self.args = args
+        self.thread = threading.Thread(target=self._run)
+        self.thread.daemon = True
+
+    def _run(self):
+        while self._running and time.time() < self.end:
+            time.sleep(10)
+        if self._running:
+            self.function(*self.args)
+
+    def start(self):
+        self._running = True
+        self.end = time.time() + self.timeout
+        self.thread.start()
+
+    def stop(self):
+        self._running = False
+
+
 class JobDir(object):
     def __init__(self, keep=False):
         self.keep = keep
         self.root = tempfile.mkdtemp()
-        self.git_root = os.path.join(self.root, 'git')
-        os.makedirs(self.git_root)
         self.ansible_root = os.path.join(self.root, 'ansible')
         os.makedirs(self.ansible_root)
         self.plugins_root = os.path.join(self.ansible_root, 'plugins')
@@ -77,6 +106,7 @@
         self.config = config
         self.keep_jobdir = keep_jobdir
         self.hostname = socket.gethostname()
+        self.registered_functions = set()
         self.node_workers = {}
         self.jobs = {}
         self.builds = {}
@@ -203,9 +233,17 @@
             del self.jobs[name]
 
     def register(self):
+        new_functions = set()
         if self.accept_nodes:
-            self.worker.registerFunction("node-assign:zuul")
-        self.worker.registerFunction("stop:%s" % self.hostname)
+            new_functions.add("node-assign:zuul")
+        new_functions.add("stop:%s" % self.hostname)
+        new_functions.add("set_description:%s" % self.hostname)
+
+        for function in new_functions - self.registered_functions:
+            self.worker.registerFunction(function)
+        for function in self.registered_functions - new_functions:
+            self.worker.unRegisterFunction(function)
+        self.registered_functions = new_functions
 
     def reconfigure(self):
         self.log.debug("Reconfiguring")
@@ -219,6 +257,45 @@
                                    "to worker:")
         self.log.debug("Reconfiguration complete")
 
+    def pause(self):
+        self.log.debug("Pausing")
+        self.accept_nodes = False
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='pause'))
+            except Exception:
+                self.log.exception("Exception sending pause command "
+                                   "to worker:")
+        self.log.debug("Paused")
+
+    def unpause(self):
+        self.log.debug("Unpausing")
+        self.accept_nodes = True
+        self.register()
+        for node in self.node_workers.values():
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='unpause'))
+            except Exception:
+                self.log.exception("Exception sending unpause command "
+                                   "to worker:")
+        self.log.debug("Unpaused")
+
+    def release(self):
+        self.log.debug("Releasing idle nodes")
+        for node in self.node_workers.values():
+            if node.name in self.static_nodes:
+                continue
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='release'))
+            except Exception:
+                self.log.exception("Exception sending release command "
+                                   "to worker:")
+        self.log.debug("Finished releasing idle nodes")
+
     def stop(self):
         self.log.debug("Stopping")
         # First, stop accepting new jobs
@@ -254,8 +331,14 @@
                 command = self.command_socket.get()
                 if command == 'reconfigure':
                     self.reconfigure()
-                if command == 'stop':
+                elif command == 'stop':
                     self.stop()
+                elif command == 'pause':
+                    self.pause()
+                elif command == 'unpause':
+                    self.unpause()
+                elif command == 'release':
+                    self.release()
             except Exception:
                 self.log.exception("Exception while processing command")
 
@@ -283,6 +366,10 @@
                     elif job.name.startswith('stop:'):
                         self.log.debug("Got stop job: %s" % job.unique)
                         self.stopJob(job)
+                    elif job.name.startswith('set_description:'):
+                        self.log.debug("Got set_description job: %s" %
+                                       job.unique)
+                        job.sendWorkComplete()
                     else:
                         self.log.error("Unable to handle job %s" % job.name)
                         job.sendWorkFail()
@@ -376,6 +463,10 @@
         self.labels = labels
         self.thread = None
         self.registered_functions = set()
+        # If the unpaused Event is set, that means we should run jobs.
+        # If it is clear, then we are paused and should not run jobs.
+        self.unpaused = threading.Event()
+        self.unpaused.set()
         self._running = True
         self.queue = Queue.Queue()
         self.manager_name = manager_name
@@ -383,6 +474,8 @@
         self.termination_queue = termination_queue
         self.keep_jobdir = keep_jobdir
         self.running_job_lock = threading.Lock()
+        self._get_job_lock = threading.Lock()
+        self._got_job = False
         self._job_complete_event = threading.Event()
         self._running_job = False
         self._sent_complete_event = False
@@ -434,9 +527,31 @@
         # will be set by the queue thread.
         self.log.debug("Submitting stop request")
         self._running = False
+        self.unpaused.set()
         self.queue.put(dict(action='stop'))
         self.queue.join()
 
+    def pause(self):
+        self.unpaused.clear()
+        self.worker.stopWaitingForJobs()
+
+    def unpause(self):
+        self.unpaused.set()
+
+    def release(self):
+        # If this node is idle, stop it.
+        old_unpaused = self.unpaused.is_set()
+        if old_unpaused:
+            self.pause()
+        with self._get_job_lock:
+            if self._got_job:
+                self.log.debug("This worker is not idle")
+                if old_unpaused:
+                    self.unpause()
+                return
+        self.log.debug("Stopping due to release command")
+        self.queue.put(dict(action='stop'))
+
     def _runQueue(self):
         item = self.queue.get()
         try:
@@ -449,6 +564,15 @@
                 else:
                     self._job_complete_event.wait()
                 self.worker.shutdown()
+            if item['action'] == 'pause':
+                self.log.debug("Received pause request")
+                self.pause()
+            if item['action'] == 'unpause':
+                self.log.debug("Received unpause request")
+                self.unpause()
+            if item['action'] == 'release':
+                self.log.debug("Received release request")
+                self.release()
             elif item['action'] == 'reconfigure':
                 self.log.debug("Received reconfigure request")
                 self.register()
@@ -461,15 +585,21 @@
     def runGearman(self):
         while self._running:
             try:
-                self._runGearman()
+                self.unpaused.wait()
+                if self._running:
+                    self._runGearman()
             except Exception:
                 self.log.exception("Exception in gearman manager:")
+            with self._get_job_lock:
+                self._got_job = False
 
     def _runGearman(self):
-        try:
-            job = self.worker.getJob()
-        except gear.InterruptedError:
-            return
+        with self._get_job_lock:
+            try:
+                job = self.worker.getJob()
+                self._got_job = True
+            except gear.InterruptedError:
+                return
         self.log.debug("Node worker %s got job %s" % (self.name, job.name))
         try:
             if job.name not in self.registered_functions:
@@ -509,12 +639,14 @@
         self.registered_functions = new_functions
 
     def abortRunningJob(self):
+        return self.abortRunningProc(self.ansible_job_proc)
+
+    def abortRunningProc(self, proc):
         aborted = False
         self.log.debug("Abort: acquiring job lock")
         with self.running_job_lock:
             if self._running_job:
                 self.log.debug("Abort: a job is running")
-                proc = self.ansible_proc
                 if proc:
                     self.log.debug("Abort: sending kill signal to job "
                                    "process group")
@@ -610,7 +742,8 @@
                                'SUCCESS', {})
 
     def runJob(self, job, args):
-        self.ansible_proc = None
+        self.ansible_job_proc = None
+        self.ansible_post_proc = None
         result = None
         with self.running_job_lock:
             if not self._running:
@@ -715,16 +848,20 @@
                                 (dest,))
 
             local_args = [
-                'command', '/usr/bin/rsync', '--delay-updates', '-F',
+                'shell', '/usr/bin/rsync', '--delay-updates', '-F',
                 '--compress', '-rt', '--safe-links',
                 '--rsync-path="mkdir -p {dest} && rsync"',
                 '--rsh="/usr/bin/ssh -i {private_key_file} -S none '
                 '-o StrictHostKeyChecking=no -q"',
                 '--out-format="<<CHANGED>>%i %n%L"',
-                '"{source}/"', '"{user}@{host}:{dest}"'
+                '{source}', '"{user}@{host}:{dest}"'
             ]
+            if scpfile.get('keep-hierarchy'):
+                source = '"%s/"' % scproot
+            else:
+                source = '`/usr/bin/find "%s" -type f`' % scproot
             local_action = ' '.join(local_args).format(
-                source=scproot,
+                source=source,
                 dest=dest,
                 private_key_file=self.private_key_file,
                 host=site['host'],
@@ -805,11 +942,8 @@
                       cwd=parameters['WORKSPACE'],
                       parameters=parameters)
         task = dict(zuul_runner=runner)
-        if timeout:
-            task['when'] = '{{ timeout | int > 0 }}'
-            task['async'] = '{{ timeout }}'
-        else:
-            task['async'] = 2 * 60 * 60  # 2 hour default timeout
+        task['when'] = '{{ timeout | int > 0 }}'
+        task['async'] = '{{ timeout }}'
         task['poll'] = 5
         tasks.append(task)
 
@@ -842,6 +976,8 @@
                     timeout = timeout.get('timeout')
                     if timeout:
                         timeout = timeout * 60
+        if not timeout:
+            timeout = ANSIBLE_DEFAULT_TIMEOUT
 
         with open(jobdir.playbook, 'w') as playbook:
             tasks = []
@@ -860,12 +996,6 @@
                                   state='directory'))
             main_block.append(task)
 
-            # TODO: remove once zuul-worker DIB element has landed
-            main_block.append(dict(shell="[ -f /usr/bin/yum ] && "
-                                   "sudo /usr/bin/yum install "
-                                   "libselinux-python || "
-                                   "/bin/true"))
-
             for builder in jjb_job.get('builders', []):
                 if 'shell' in builder:
                     main_block.extend(
@@ -876,6 +1006,7 @@
 
             task = dict(zuul_log=dict(msg="Job complete, result: FAILURE"))
             error_block.append(task)
+            error_block.append(dict(fail=dict(msg='FAILURE')))
 
             play = dict(hosts='node', name='Job body',
                         tasks=tasks)
@@ -912,43 +1043,61 @@
 
         return timeout
 
+    def _ansibleTimeout(self, proc, msg):
+        self.log.warning(msg)
+        self.abortRunningProc(proc)
+
     def runAnsiblePlaybook(self, jobdir, timeout):
-        self.ansible_proc = subprocess.Popen(
+        self.ansible_job_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.playbook,
              '-e', 'timeout=%s' % timeout, '-v'],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
         )
-        (out, err) = self.ansible_proc.communicate()
-        for line in out.split('\n'):
-            line = line[:1024]
-            self.log.debug("Ansible stdout: %s" % line)
-        for line in err.split('\n'):
-            line = line[:1024]
-            self.log.debug("Ansible stderr: %s" % line)
-        ret = self.ansible_proc.wait()
+        ret = None
+        watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
+                            self._ansibleTimeout,
+                            (self.ansible_job_proc,
+                             "Ansible timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_job_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible output: %s" % (line,))
+            ret = self.ansible_job_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible exit code: %s" % (ret,))
         self.ansible_proc = None
         return ret == 0
 
     def runAnsiblePostPlaybook(self, jobdir, success):
-        proc = subprocess.Popen(
+        self.ansible_post_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.post_playbook,
              '-e', 'success=%s' % success, '-v'],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
             preexec_fn=os.setsid,
         )
-        (out, err) = proc.communicate()
-        for line in out.split('\n'):
-            line = line[:1024]
-            self.log.debug("Ansible post stdout: %s" % line)
-        for line in err.split('\n'):
-            line = line[:1024]
-            self.log.debug("Ansible post stderr: %s" % line)
-        return proc.wait() == 0
+        ret = None
+        watchdog = Watchdog(ANSIBLE_DEFAULT_POST_TIMEOUT,
+                            self._ansibleTimeout,
+                            (self.ansible_post_proc,
+                             "Ansible post timeout exceeded"))
+        watchdog.start()
+        try:
+            for line in iter(self.ansible_post_proc.stdout.readline, b''):
+                line = line[:1024].rstrip()
+                self.log.debug("Ansible post output: %s" % (line,))
+            ret = self.ansible_post_proc.wait()
+        finally:
+            watchdog.stop()
+        self.log.debug("Ansible post exit code: %s" % (ret,))
+        self.ansible_post_proc = None
+        return ret == 0
 
 
 class JJB(jenkins_jobs.builder.Builder):