Merge "Limit concurrency in zuul-executor under load" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index b3a4c3f..5e7e0e1 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -468,6 +468,23 @@
       `bubblewrap` has become integral to securely operating Zuul.  If you
       have a valid use case for it, we encourage you to let us know.
 
+   .. attr:: load_multiplier
+      :default: 2.5
+
+      When an executor host gets too busy, the system may suffer
+      timeouts and other ill effects. The executor will stop accepting
+      more than 1 job at a time until load has lowered below a safe
+      level.  This level is determined by multiplying the number of
+      CPU's by `load_multiplier`.
+
+      So for example, if the system has 2 CPUs, and load_multiplier
+      is 2.5, the safe load for the system is 5.00. Any time the
+      system load average is over 5.00, the executor will quit
+      accepting multiple jobs at one time.
+
+      The executor will observe system load and determine whether
+      to accept more jobs every 30 seconds.
+
 .. attr:: merger
 
    .. attr:: git_user_email
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 1475440..e41d6b7 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -16,6 +16,7 @@
 import datetime
 import json
 import logging
+import multiprocessing
 import os
 import shutil
 import signal
@@ -563,6 +564,10 @@
         self.merge_name = get_default(self.config, 'merger', 'git_user_name')
         execution_wrapper_name = get_default(self.config, 'executor',
                                              'execution_wrapper', 'bubblewrap')
+        load_multiplier = float(get_default(self.config, 'executor',
+                                            'load_multiplier', '2.5'))
+        self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+        self.accepting_work = False
         self.execution_wrapper = connections.drivers[execution_wrapper_name]
 
         self.connections = connections
@@ -652,19 +657,32 @@
         self.executor_thread = threading.Thread(target=self.run_executor)
         self.executor_thread.daemon = True
         self.executor_thread.start()
+        self.governor_stop_event = threading.Event()
+        self.governor_thread = threading.Thread(target=self.run_governor)
+        self.governor_thread.daemon = True
+        self.governor_thread.start()
         self.disk_accountant.start()
 
     def register(self):
-        self.executor_worker.registerFunction("executor:execute")
+        self.register_work()
         self.executor_worker.registerFunction("executor:stop:%s" %
                                               self.hostname)
         self.merger_worker.registerFunction("merger:merge")
         self.merger_worker.registerFunction("merger:cat")
         self.merger_worker.registerFunction("merger:refstate")
 
+    def register_work(self):
+        self.accepting_work = True
+        self.executor_worker.registerFunction("executor:execute")
+
+    def unregister_work(self):
+        self.accepting_work = False
+        self.executor_worker.unregisterFunction("executor:execute")
+
     def stop(self):
         self.log.debug("Stopping")
         self.disk_accountant.stop()
+        self.governor_stop_event.set()
         self._running = False
         self._command_running = False
         self.command_socket.stop()
@@ -708,6 +726,7 @@
         self.update_thread.join()
         self.merger_thread.join()
         self.executor_thread.join()
+        self.governor_thread.join()
 
     def runCommand(self):
         while self._command_running:
@@ -796,10 +815,31 @@
             except Exception:
                 self.log.exception("Exception while getting job")
 
+    def run_governor(self):
+        while not self.governor_stop_event.wait(30):
+            self.manageLoad()
+
     def executeJob(self, job):
         self.job_workers[job.unique] = AnsibleJob(self, job)
         self.job_workers[job.unique].run()
 
+    def manageLoad(self):
+        ''' Apply some heuristics to decide whether or not we should
+            be askign for more jobs '''
+        load_avg = os.getloadavg()[0]
+        if self.accepting_work:
+            # Don't unregister if we don't have any active jobs.
+            if load_avg > self.max_load_avg and self.job_workers:
+                self.log.info(
+                    "Unregistering due to high system load {} > {}".format(
+                        load_avg, self.max_load_avg))
+                self.unregister_work()
+        elif load_avg <= self.max_load_avg:
+            self.log.info(
+                "Re-registering as load is within limits {} <= {}".format(
+                    load_avg, self.max_load_avg))
+            self.register_work()
+
     def finishJob(self, unique):
         del(self.job_workers[unique])