Merge "Ansible launcher: add release command"
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index c78b4ac..45704c1 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -42,7 +42,7 @@
 ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
 
 
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause']
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release']
 
 
 def boolify(x):
@@ -284,6 +284,19 @@
                                    "to worker:")
         self.log.debug("Unpaused")
 
+    def release(self):
+        self.log.debug("Releasing idle nodes")
+        for node in self.node_workers.values():
+            if node.name in self.static_nodes:
+                continue
+            try:
+                if node.isAlive():
+                    node.queue.put(dict(action='release'))
+            except Exception:
+                self.log.exception("Exception sending release command "
+                                   "to worker:")
+        self.log.debug("Finished releasing idle nodes")
+
     def stop(self):
         self.log.debug("Stopping")
         # First, stop accepting new jobs
@@ -325,6 +338,8 @@
                     self.pause()
                 elif command == 'unpause':
                     self.unpause()
+                elif command == 'release':
+                    self.release()
             except Exception:
                 self.log.exception("Exception while processing command")
 
@@ -456,6 +471,8 @@
         self.termination_queue = termination_queue
         self.keep_jobdir = keep_jobdir
         self.running_job_lock = threading.Lock()
+        self._get_job_lock = threading.Lock()
+        self._got_job = False
         self._job_complete_event = threading.Event()
         self._running_job = False
         self._sent_complete_event = False
@@ -518,6 +535,20 @@
     def unpause(self):
         self.unpaused.set()
 
+    def release(self):
+        # If this node is idle, stop it.
+        old_unpaused = self.unpaused.is_set()
+        if old_unpaused:
+            self.pause()
+        with self._get_job_lock:
+            if self._got_job:
+                self.log.debug("This worker is not idle")
+                if old_unpaused:
+                    self.unpause()
+                return
+        self.log.debug("Stopping due to release command")
+        self.queue.put(dict(action='stop'))
+
     def _runQueue(self):
         item = self.queue.get()
         try:
@@ -536,6 +567,9 @@
             if item['action'] == 'unpause':
                 self.log.debug("Received unpause request")
                 self.unpause()
+            if item['action'] == 'release':
+                self.log.debug("Received release request")
+                self.release()
             elif item['action'] == 'reconfigure':
                 self.log.debug("Received reconfigure request")
                 self.register()
@@ -553,12 +587,16 @@
                     self._runGearman()
             except Exception:
                 self.log.exception("Exception in gearman manager:")
+            with self._get_job_lock:
+                self._got_job = False
 
     def _runGearman(self):
-        try:
-            job = self.worker.getJob()
-        except gear.InterruptedError:
-            return
+        with self._get_job_lock:
+            try:
+                job = self.worker.getJob()
+                self._got_job = True
+            except gear.InterruptedError:
+                return
         self.log.debug("Node worker %s got job %s" % (self.name, job.name))
         try:
             if job.name not in self.registered_functions: