Ansible launcher: Use threads for workers

The SyncManager from the multiprocessing module seems to be exiting
under high load, and not leaving any clues as to why.  We can
probably handle the scale we anticipate using threads for the
moment at least, so switch to that.

Change-Id: If235cf802bb50874ecbe8bc234f67bc66a36ee22
diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py
index 792d1b9..5ecc954 100644
--- a/zuul/launcher/ansiblelaunchserver.py
+++ b/zuul/launcher/ansiblelaunchserver.py
@@ -14,7 +14,6 @@
 
 import json
 import logging
-import multiprocessing
 import os
 import re
 import shutil
@@ -23,8 +22,8 @@
 import subprocess
 import tempfile
 import threading
-import time
 import traceback
+import Queue
 import uuid
 
 import gear
@@ -79,14 +78,10 @@
         self.keep_jobdir = keep_jobdir
         self.hostname = socket.gethostname()
         self.node_workers = {}
-        # This has the side effect of creating the logger; our logging
-        # config will handle the rest.
-        multiprocessing.get_logger()
-        self.mpmanager = multiprocessing.Manager()
-        self.jobs = self.mpmanager.dict()
-        self.builds = self.mpmanager.dict()
-        self.zmq_send_queue = multiprocessing.JoinableQueue()
-        self.termination_queue = multiprocessing.JoinableQueue()
+        self.jobs = {}
+        self.builds = {}
+        self.zmq_send_queue = Queue.Queue()
+        self.termination_queue = Queue.Queue()
         self.sites = {}
         self.static_nodes = {}
         if config.has_option('launcher', 'accept-nodes'):
@@ -188,11 +183,6 @@
         self.gearman_thread.daemon = True
         self.gearman_thread.start()
 
-        # FIXME: Without this, sometimes the subprocess module does
-        # not actually launch any subprocesses.  I have no
-        # explanation.  -corvus
-        time.sleep(1)
-
         # Start static workers
         for node in self.static_nodes.values():
             self.log.debug("Creating static node with arguments: %s" % (node,))
@@ -320,8 +310,8 @@
                             self.termination_queue, self.keep_jobdir)
         self.node_workers[worker.name] = worker
 
-        worker.process = multiprocessing.Process(target=worker.run)
-        worker.process.start()
+        worker.thread = threading.Thread(target=worker.run)
+        worker.thread.start()
 
     def stopJob(self, job):
         try:
@@ -358,7 +348,7 @@
                     continue
                 worker = self.node_workers[item]
                 self.log.debug("Joining %s" % (item,))
-                worker.process.join()
+                worker.thread.join()
                 self.log.debug("Joined %s" % (item,))
                 del self.node_workers[item]
             except Exception:
@@ -384,10 +374,10 @@
         if not isinstance(labels, list):
             labels = [labels]
         self.labels = labels
-        self.process = None
+        self.thread = None
         self.registered_functions = set()
         self._running = True
-        self.queue = multiprocessing.JoinableQueue()
+        self.queue = Queue.Queue()
         self.manager_name = manager_name
         self.zmq_send_queue = zmq_send_queue
         self.termination_queue = termination_queue
@@ -408,12 +398,11 @@
 
     def isAlive(self):
         # Meant to be called from the manager
-        if self.process and self.process.is_alive():
+        if self.thread and self.thread.is_alive():
             return True
         return False
 
     def run(self):
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
         self.log.debug("Node worker %s starting" % (self.name,))
         server = self.config.get('gearman', 'server')
         if self.config.has_option('gearman', 'port'):