Merge "Improve safety around canceling node requests" into feature/zuulv3
diff --git a/requirements.txt b/requirements.txt
index 2fe6963..9f20458 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,7 +11,7 @@
 extras
 statsd>=1.0.0,<3.0
 voluptuous>=0.10.2
-gear>=0.5.7,<1.0.0
+gear>=0.9.0,<1.0.0
 apscheduler>=3.0
 PrettyTable>=0.6,<0.8
 babel>=1.0
diff --git a/zuul/ansible/library/command.py b/zuul/ansible/library/command.py
index 328ae7b..52de5a4 100644
--- a/zuul/ansible/library/command.py
+++ b/zuul/ansible/library/command.py
@@ -123,6 +123,8 @@
 
 LOG_STREAM_FILE = '/tmp/console.log'
 PASSWD_ARG_RE = re.compile(r'^[-]{0,2}pass[-]?(word|wd)?')
+# List to save stdout log lines in as we collect them
+_log_lines = []
 
 
 class Console(object):
@@ -150,6 +152,7 @@
             line = fd.readline()
             if not line:
                 break
+            _log_lines.append(line)
             if not line.endswith('\n'):
                 line += '\n'
                 newline_warning = True
@@ -330,7 +333,8 @@
         # cmd.stdout.close()
 
         # ZUUL: stdout and stderr are in the console log file
-        stdout = ''
+        # ZUUL: return the saved log lines so we can ship them back
+        stdout = ''.join(_log_lines)
         stderr = ''
 
         rc = cmd.returncode
diff --git a/zuul/executor/ansiblelaunchserver.py b/zuul/executor/ansiblelaunchserver.py
index 0202bdd..18762b2 100644
--- a/zuul/executor/ansiblelaunchserver.py
+++ b/zuul/executor/ansiblelaunchserver.py
@@ -59,7 +59,7 @@
     return bool(x)
 
 
-class LaunchGearWorker(gear.Worker):
+class LaunchGearWorker(gear.TextWorker):
     def __init__(self, *args, **kw):
         self.__launch_server = kw.pop('launch_server')
         super(LaunchGearWorker, self).__init__(*args, **kw)
@@ -71,7 +71,7 @@
         return super(LaunchGearWorker, self).handleNoop(packet)
 
 
-class NodeGearWorker(gear.Worker):
+class NodeGearWorker(gear.TextWorker):
     MASS_DO = 101
 
     def sendMassDo(self, functions):
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 9f234e9..7a2e265 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -308,8 +308,8 @@
             self.sched.onBuildCompleted(build, 'SUCCESS')
             return build
 
-        gearman_job = gear.Job('executor:execute', json.dumps(params),
-                               unique=uuid)
+        gearman_job = gear.TextJob('executor:execute', json.dumps(params),
+                                   unique=uuid)
         build.__gearman_job = gearman_job
         build.__gearman_manager = None
         self.builds[uuid] = build
@@ -452,8 +452,8 @@
                            (build,))
         stop_uuid = str(uuid4().hex)
         data = dict(uuid=build.__gearman_job.unique)
-        stop_job = gear.Job("executor:stop:%s" % build.__gearman_manager,
-                            json.dumps(data), unique=stop_uuid)
+        stop_job = gear.TextJob("executor:stop:%s" % build.__gearman_manager,
+                                json.dumps(data), unique=stop_uuid)
         self.meta_jobs[stop_uuid] = stop_job
         self.log.debug("Submitting stop job: %s", stop_job)
         self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH,
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 71b643a..dfa5a2b 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -335,7 +335,7 @@
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker('Zuul Executor Server')
+        self.worker = gear.TextWorker('Zuul Executor Server')
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 642bc1b..069cbf5 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -56,7 +56,7 @@
         self.__merge_client.onBuildCompleted(job)
 
 
-class MergeJob(gear.Job):
+class MergeJob(gear.TextJob):
     def __init__(self, *args, **kw):
         super(MergeJob, self).__init__(*args, **kw)
         self.__event = threading.Event()
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 39551c9..04fd03b 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -54,7 +54,7 @@
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker('Zuul Merger')
+        self.worker = gear.TextWorker('Zuul Merger')
         self.worker.addServer(server, port)
         self.log.debug("Waiting for server")
         self.worker.waitForServer()
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 9d81520..d980992 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -35,9 +35,9 @@
 
     def submitJob(self, name, data):
         self.log.debug("Submitting job %s with data %s" % (name, data))
-        job = gear.Job(name,
-                       json.dumps(data),
-                       unique=str(time.time()))
+        job = gear.TextJob(name,
+                           json.dumps(data),
+                           unique=str(time.time()))
         self.gearman.submitJob(job, timeout=300)
 
         self.log.debug("Waiting for job completion")
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 105c34b..6508e84 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -38,7 +38,7 @@
             port = self.config.get('gearman', 'port')
         else:
             port = 4730
-        self.worker = gear.Worker('Zuul RPC Listener')
+        self.worker = gear.TextWorker('Zuul RPC Listener')
         self.worker.addServer(server, port)
         self.worker.waitForServer()
         self.register()