Merge "Ansible launcher: add release command"
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index c78b4ac..45704c1 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -42,7 +42,7 @@
ANSIBLE_DEFAULT_POST_TIMEOUT = 10 * 60
-COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause']
+COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause', 'release']
def boolify(x):
@@ -284,6 +284,19 @@
"to worker:")
self.log.debug("Unpaused")
+ def release(self):
+ self.log.debug("Releasing idle nodes")
+ for node in self.node_workers.values():
+ if node.name in self.static_nodes:
+ continue
+ try:
+ if node.isAlive():
+ node.queue.put(dict(action='release'))
+ except Exception:
+ self.log.exception("Exception sending release command "
+ "to worker:")
+ self.log.debug("Finished releasing idle nodes")
+
def stop(self):
self.log.debug("Stopping")
# First, stop accepting new jobs
@@ -325,6 +338,8 @@
self.pause()
elif command == 'unpause':
self.unpause()
+ elif command == 'release':
+ self.release()
except Exception:
self.log.exception("Exception while processing command")
@@ -456,6 +471,8 @@
self.termination_queue = termination_queue
self.keep_jobdir = keep_jobdir
self.running_job_lock = threading.Lock()
+ self._get_job_lock = threading.Lock()
+ self._got_job = False
self._job_complete_event = threading.Event()
self._running_job = False
self._sent_complete_event = False
@@ -518,6 +535,20 @@
def unpause(self):
self.unpaused.set()
+ def release(self):
+ # If this node is idle, stop it.
+ old_unpaused = self.unpaused.is_set()
+ if old_unpaused:
+ self.pause()
+ with self._get_job_lock:
+ if self._got_job:
+ self.log.debug("This worker is not idle")
+ if old_unpaused:
+ self.unpause()
+ return
+ self.log.debug("Stopping due to release command")
+ self.queue.put(dict(action='stop'))
+
def _runQueue(self):
item = self.queue.get()
try:
@@ -536,6 +567,9 @@
if item['action'] == 'unpause':
self.log.debug("Received unpause request")
self.unpause()
+ if item['action'] == 'release':
+ self.log.debug("Received release request")
+ self.release()
elif item['action'] == 'reconfigure':
self.log.debug("Received reconfigure request")
self.register()
@@ -553,12 +587,16 @@
self._runGearman()
except Exception:
self.log.exception("Exception in gearman manager:")
+ with self._get_job_lock:
+ self._got_job = False
def _runGearman(self):
- try:
- job = self.worker.getJob()
- except gear.InterruptedError:
- return
+ with self._get_job_lock:
+ try:
+ job = self.worker.getJob()
+ self._got_job = True
+ except gear.InterruptedError:
+ return
self.log.debug("Node worker %s got job %s" % (self.name, job.name))
try:
if job.name not in self.registered_functions: