Ansible launcher: send ZMQ start/complete events

We have a lot of infrastructure expecting ZMQ events related to
job start/stops.  Send them.

Note, when the ansible launcher shuts down, it will send fake
completion events for each node it has so that nodepool will
delete those nodes.

Change-Id: Ib38c5c77452743a49f6d1cda69c914b5d491134b
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 19e8d3f..36986ec 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -17,6 +17,7 @@
 import multiprocessing
 import os
 import shutil
+import signal
 import socket
 import subprocess
 import tempfile
@@ -26,6 +27,7 @@
 import gear
 import yaml
 import jenkins_jobs.builder
+import zmq
 
 
 class JobDir(object):
@@ -55,9 +57,17 @@
         self.node_workers = {}
         self.mpmanager = multiprocessing.Manager()
         self.jobs = self.mpmanager.dict()
+        self.zmq_send_queue = multiprocessing.Queue()
 
     def start(self):
         self._running = True
+
+        # Setup ZMQ
+        self.zcontext = zmq.Context()
+        self.zsocket = self.zcontext.socket(zmq.PUB)
+        self.zsocket.bind("tcp://*:8881")
+
+        # Setup Gearman
         server = self.config.get('gearman', 'server')
         if self.config.has_option('gearman', 'port'):
             port = self.config.get('gearman', 'port')
@@ -69,11 +79,21 @@
         self.worker.waitForServer()
         self.log.debug("Registering")
         self.register()
+
+        # Load JJB config
         self.loadJobs()
+
+        # Start ZMQ worker thread
+        self.log.debug("Starting ZMQ processor")
+        self.zmq_thread = threading.Thread(target=self.run_zmq)
+        self.zmq_thread.daemon = True
+        self.zmq_thread.start()
+
+        # Start Gearman worker thread
         self.log.debug("Starting worker")
-        self.thread = threading.Thread(target=self.run)
-        self.thread.daemon = True
-        self.thread.start()
+        self.gearman_thread = threading.Thread(target=self.run)
+        self.gearman_thread.daemon = True
+        self.gearman_thread.start()
 
     def loadJobs(self):
         self.log.debug("Loading jobs")
@@ -103,14 +123,24 @@
         self._running = False
         self.worker.shutdown()
         for node in self.node_workers.values():
-            node.queue.put(dict(action='stop'))
+            node.stop()
         self.log.debug("Stopped")
 
     def join(self):
-        self.thread.join()
+        self.gearman_thread.join()
+
+    def run_zmq(self):
+        while self._running:
+            try:
+                item = self.zmq_send_queue.get()
+                self.log.debug("Got ZMQ event %s" % (item,))
+                if item is None:
+                    continue
+                self.zsocket.send(item)
+            except Exception:
+                self.log.exception("Exception while processing ZMQ events")
 
     def run(self):
-        self.log.debug("Starting launch listener")
         while self._running:
             try:
                 job = self.worker.getJob()
@@ -124,6 +154,8 @@
                 except Exception:
                     self.log.exception("Exception while running job")
                     job.sendWorkException(traceback.format_exc())
+            except gear.InterruptedError:
+                return
             except Exception:
                 self.log.exception("Exception while getting job")
 
@@ -131,7 +163,8 @@
         args = json.loads(job.arguments)
         worker = NodeWorker(self.config, self.jobs,
                             args['name'], args['host'],
-                            args['description'], args['labels'])
+                            args['description'], args['labels'],
+                            self.hostname, self.zmq_send_queue)
         self.node_workers[worker.name] = worker
 
         worker.process = multiprocessing.Process(target=worker.run)
@@ -145,7 +178,9 @@
 class NodeWorker(object):
     log = logging.getLogger("zuul.NodeWorker")
 
-    def __init__(self, config, jobs, name, host, description, labels):
+    def __init__(self, config, jobs, name, host, description, labels,
+                 manager_name, zmq_send_queue):
+        self.log.debug("Creating node worker %s" % (name,))
         self.config = config
         self.jobs = jobs
         self.name = name
@@ -157,9 +192,14 @@
         self.registered_functions = set()
         self._running = True
         self.queue = multiprocessing.Queue()
+        self.manager_name = manager_name
+        self.zmq_send_queue = zmq_send_queue
+        self.running_job_lock = threading.Lock()
+        self._running_job = False
 
     def run(self):
-        self._running_job = False
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+        self.log.debug("Node worker %s starting" % (self.name,))
         server = self.config.get('gearman', 'server')
         if self.config.has_option('gearman', 'port'):
             port = self.config.get('gearman', 'port')
@@ -181,11 +221,22 @@
             except Exception:
                 self.log.exception("Exception in queue manager:")
 
+    def stop(self):
+        # If this is called locally, setting _running will be
+        # effictive, if it's called remotely, it will not be, but it
+        # will be set by the queue thread.
+        self.log.debug("Submitting stop request")
+        self._running = False
+        self.queue.put(dict(action='stop'))
+
     def _run_queue(self):
         item = self.queue.get()
         if item['action'] == 'stop':
+            self.log.debug("Received stop request")
             self._running = False
             self.worker.shutdown()
+            if not self.abortRunningJob():
+                self.sendFakeCompleteEvent()
         elif item['action'] == 'reconfigure':
             self.register()
 
@@ -197,7 +248,11 @@
                 self.log.exception("Exception in gearman manager:")
 
     def _run_gearman(self):
-        job = self.worker.getJob()
+        try:
+            job = self.worker.getJob()
+        except gear.InterruptedError:
+            return
+        self.log.debug("Node worker %s got job %s" % (self.name, job.name))
         try:
             if job.name not in self.registered_functions:
                 self.log.error("Unable to handle job %s" % job.name)
@@ -235,14 +290,102 @@
             self.worker.unRegisterFunction(function)
         self.registered_functions = new_functions
 
-    def launch(self, job):
-        self._running_job = True
-        thread = threading.Thread(target=self._launch, args=(job,))
-        thread.start()
+    def abortRunningJob(self):
+        aborted = False
+        self.log.debug("Abort: acquiring job lock")
+        with self.running_job_lock:
+            if self._running_job:
+                self.log.debug("Abort: a job is running")
+                proc = self.ansible_proc
+                if proc:
+                    self.log.debug("Abort: sending kill signal to job process")
+                    try:
+                        proc.kill()
+                        aborted = True
+                    except Exception:
+                        self.log.exception("Exception while killing "
+                                           "ansible process:")
+            else:
+                self.log.debug("Abort: no job is running")
 
-    def _launch(self, job):
+        return aborted
+
+    def launch(self, job):
+        self.log.debug("Node worker %s launching job %s" %
+                       (self.name, job.name))
+
+        # Make sure we can parse what we need from the job first
+        args = json.loads(job.arguments)
+        # This may be configurable later, or we may choose to honor
+        # OFFLINE_NODE_WHEN_COMPLETE
+        offline = True
+        job_name = job.name.split(':')[1]
+
+        # Initialize the result so we have something regardless of
+        # whether the job actually runs
+        result = None
+
+        try:
+            self.sendStartEvent(job_name, args)
+        except Exception:
+            self.log.exception("Exception while sending job start event")
+
+        try:
+            result = self.runJob()
+        except Exception:
+            self.log.exception("Exception while launching job thread")
+
+        try:
+            job.sendWorkComplete()
+        except Exception:
+            self.log.exception("Exception while sending job completion packet")
+
+        try:
+            self.sendCompleteEvent(job_name, result, args)
+        except Exception:
+            self.log.exception("Exception while sending job completion event")
+
+        if offline:
+            self.stop()
+
+    def sendStartEvent(self, name, parameters):
+        build = dict(node_name=self.name,
+                     host_name=self.manager_name,
+                     parameters=parameters)
+
+        event = dict(name=name,
+                     build=build)
+
+        item = "onStarted %s" % json.dumps(event)
+        self.log.debug("Sending over ZMQ: %s" % (item,))
+        self.zmq_send_queue.put(item)
+
+    def sendCompleteEvent(self, name, status, parameters):
+        build = dict(status=status,
+                     node_name=self.name,
+                     host_name=self.manager_name,
+                     parameters=parameters)
+
+        event = dict(name=name,
+                     build=build)
+
+        item = "onFinalized %s" % json.dumps(event)
+        self.log.debug("Sending over ZMQ: %s" % (item,))
+        self.zmq_send_queue.put(item)
+
+    def sendFakeCompleteEvent(self):
+        self.sendCompleteEvent('zuul:launcher-shutdown',
+                               'SUCCESS', {})
+
+    def runJob(self, job):
+        self.ansible_proc = None
+        with self.running_job_lock:
+            if not self._running:
+                return
+            self._running_job = True
+
         self.log.debug("Job %s: beginning" % (job.unique,))
-        return  # TODO
+        return 'SUCCESS'  # TODO
         with JobDir() as jobdir:
             self.log.debug("Job %s: job root at %s" %
                            (job.unique, jobdir.root))
@@ -284,14 +427,15 @@
             config.write('hostfile = %s\n' % jobdir.inventory)
 
     def runAnsible(self, jobdir):
-        proc = subprocess.Popen(
+        self.ansible_proc = subprocess.Popen(
             ['ansible-playbook', jobdir.playbook],
             cwd=jobdir.ansible_root,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
         )
-        (out, err) = proc.communicate()
-        ret = proc.wait()
+        (out, err) = self.ansible_proc.communicate()
+        ret = self.ansible_proc.wait()
+        self.ansible_proc = None
         print out
         print err
         if ret == 0: