Ansible launcher: send ZMQ start/complete events
We have a lot of infrastructure expecting ZMQ events related to
job start/stops. Send them.
Note, when the ansible launcher shuts down, it will send fake
completion events for each node it has so that nodepool will
delete those nodes.
Change-Id: Ib38c5c77452743a49f6d1cda69c914b5d491134b
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 19e8d3f..36986ec 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -17,6 +17,7 @@
import multiprocessing
import os
import shutil
+import signal
import socket
import subprocess
import tempfile
@@ -26,6 +27,7 @@
import gear
import yaml
import jenkins_jobs.builder
+import zmq
class JobDir(object):
@@ -55,9 +57,17 @@
self.node_workers = {}
self.mpmanager = multiprocessing.Manager()
self.jobs = self.mpmanager.dict()
+ self.zmq_send_queue = multiprocessing.Queue()
def start(self):
self._running = True
+
+ # Setup ZMQ
+ self.zcontext = zmq.Context()
+ self.zsocket = self.zcontext.socket(zmq.PUB)
+ self.zsocket.bind("tcp://*:8881")
+
+ # Setup Gearman
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
@@ -69,11 +79,21 @@
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
+
+ # Load JJB config
self.loadJobs()
+
+ # Start ZMQ worker thread
+ self.log.debug("Starting ZMQ processor")
+ self.zmq_thread = threading.Thread(target=self.run_zmq)
+ self.zmq_thread.daemon = True
+ self.zmq_thread.start()
+
+ # Start Gearman worker thread
self.log.debug("Starting worker")
- self.thread = threading.Thread(target=self.run)
- self.thread.daemon = True
- self.thread.start()
+ self.gearman_thread = threading.Thread(target=self.run)
+ self.gearman_thread.daemon = True
+ self.gearman_thread.start()
def loadJobs(self):
self.log.debug("Loading jobs")
@@ -103,14 +123,24 @@
self._running = False
self.worker.shutdown()
for node in self.node_workers.values():
- node.queue.put(dict(action='stop'))
+ node.stop()
self.log.debug("Stopped")
def join(self):
- self.thread.join()
+ self.gearman_thread.join()
+
+ def run_zmq(self):
+ while self._running:
+ try:
+ item = self.zmq_send_queue.get()
+ self.log.debug("Got ZMQ event %s" % (item,))
+ if item is None:
+ continue
+ self.zsocket.send(item)
+ except Exception:
+ self.log.exception("Exception while processing ZMQ events")
def run(self):
- self.log.debug("Starting launch listener")
while self._running:
try:
job = self.worker.getJob()
@@ -124,6 +154,8 @@
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
+ except gear.InterruptedError:
+ return
except Exception:
self.log.exception("Exception while getting job")
@@ -131,7 +163,8 @@
args = json.loads(job.arguments)
worker = NodeWorker(self.config, self.jobs,
args['name'], args['host'],
- args['description'], args['labels'])
+ args['description'], args['labels'],
+ self.hostname, self.zmq_send_queue)
self.node_workers[worker.name] = worker
worker.process = multiprocessing.Process(target=worker.run)
@@ -145,7 +178,9 @@
class NodeWorker(object):
log = logging.getLogger("zuul.NodeWorker")
- def __init__(self, config, jobs, name, host, description, labels):
+ def __init__(self, config, jobs, name, host, description, labels,
+ manager_name, zmq_send_queue):
+ self.log.debug("Creating node worker %s" % (name,))
self.config = config
self.jobs = jobs
self.name = name
@@ -157,9 +192,14 @@
self.registered_functions = set()
self._running = True
self.queue = multiprocessing.Queue()
+ self.manager_name = manager_name
+ self.zmq_send_queue = zmq_send_queue
+ self.running_job_lock = threading.Lock()
+ self._running_job = False
def run(self):
- self._running_job = False
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ self.log.debug("Node worker %s starting" % (self.name,))
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
@@ -181,11 +221,22 @@
except Exception:
self.log.exception("Exception in queue manager:")
+ def stop(self):
+ # If this is called locally, setting _running will be
+ # effictive, if it's called remotely, it will not be, but it
+ # will be set by the queue thread.
+ self.log.debug("Submitting stop request")
+ self._running = False
+ self.queue.put(dict(action='stop'))
+
def _run_queue(self):
item = self.queue.get()
if item['action'] == 'stop':
+ self.log.debug("Received stop request")
self._running = False
self.worker.shutdown()
+ if not self.abortRunningJob():
+ self.sendFakeCompleteEvent()
elif item['action'] == 'reconfigure':
self.register()
@@ -197,7 +248,11 @@
self.log.exception("Exception in gearman manager:")
def _run_gearman(self):
- job = self.worker.getJob()
+ try:
+ job = self.worker.getJob()
+ except gear.InterruptedError:
+ return
+ self.log.debug("Node worker %s got job %s" % (self.name, job.name))
try:
if job.name not in self.registered_functions:
self.log.error("Unable to handle job %s" % job.name)
@@ -235,14 +290,102 @@
self.worker.unRegisterFunction(function)
self.registered_functions = new_functions
- def launch(self, job):
- self._running_job = True
- thread = threading.Thread(target=self._launch, args=(job,))
- thread.start()
+ def abortRunningJob(self):
+ aborted = False
+ self.log.debug("Abort: acquiring job lock")
+ with self.running_job_lock:
+ if self._running_job:
+ self.log.debug("Abort: a job is running")
+ proc = self.ansible_proc
+ if proc:
+ self.log.debug("Abort: sending kill signal to job process")
+ try:
+ proc.kill()
+ aborted = True
+ except Exception:
+ self.log.exception("Exception while killing "
+ "ansible process:")
+ else:
+ self.log.debug("Abort: no job is running")
- def _launch(self, job):
+ return aborted
+
+ def launch(self, job):
+ self.log.debug("Node worker %s launching job %s" %
+ (self.name, job.name))
+
+ # Make sure we can parse what we need from the job first
+ args = json.loads(job.arguments)
+ # This may be configurable later, or we may choose to honor
+ # OFFLINE_NODE_WHEN_COMPLETE
+ offline = True
+ job_name = job.name.split(':')[1]
+
+ # Initialize the result so we have something regardless of
+ # whether the job actually runs
+ result = None
+
+ try:
+ self.sendStartEvent(job_name, args)
+ except Exception:
+ self.log.exception("Exception while sending job start event")
+
+ try:
+ result = self.runJob()
+ except Exception:
+ self.log.exception("Exception while launching job thread")
+
+ try:
+ job.sendWorkComplete()
+ except Exception:
+ self.log.exception("Exception while sending job completion packet")
+
+ try:
+ self.sendCompleteEvent(job_name, result, args)
+ except Exception:
+ self.log.exception("Exception while sending job completion event")
+
+ if offline:
+ self.stop()
+
+ def sendStartEvent(self, name, parameters):
+ build = dict(node_name=self.name,
+ host_name=self.manager_name,
+ parameters=parameters)
+
+ event = dict(name=name,
+ build=build)
+
+ item = "onStarted %s" % json.dumps(event)
+ self.log.debug("Sending over ZMQ: %s" % (item,))
+ self.zmq_send_queue.put(item)
+
+ def sendCompleteEvent(self, name, status, parameters):
+ build = dict(status=status,
+ node_name=self.name,
+ host_name=self.manager_name,
+ parameters=parameters)
+
+ event = dict(name=name,
+ build=build)
+
+ item = "onFinalized %s" % json.dumps(event)
+ self.log.debug("Sending over ZMQ: %s" % (item,))
+ self.zmq_send_queue.put(item)
+
+ def sendFakeCompleteEvent(self):
+ self.sendCompleteEvent('zuul:launcher-shutdown',
+ 'SUCCESS', {})
+
+ def runJob(self, job):
+ self.ansible_proc = None
+ with self.running_job_lock:
+ if not self._running:
+ return
+ self._running_job = True
+
self.log.debug("Job %s: beginning" % (job.unique,))
- return # TODO
+ return 'SUCCESS' # TODO
with JobDir() as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
@@ -284,14 +427,15 @@
config.write('hostfile = %s\n' % jobdir.inventory)
def runAnsible(self, jobdir):
- proc = subprocess.Popen(
+ self.ansible_proc = subprocess.Popen(
['ansible-playbook', jobdir.playbook],
cwd=jobdir.ansible_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
- (out, err) = proc.communicate()
- ret = proc.wait()
+ (out, err) = self.ansible_proc.communicate()
+ ret = self.ansible_proc.wait()
+ self.ansible_proc = None
print out
print err
if ret == 0: