Merge "Limit concurrency in zuul-executor under load" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index b3a4c3f..5e7e0e1 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -468,6 +468,23 @@
`bubblewrap` has become integral to securely operating Zuul. If you
have a valid use case for it, we encourage you to let us know.
+ .. attr:: load_multiplier
+ :default: 2.5
+
+ When an executor host gets too busy, the system may suffer
+ timeouts and other ill effects. The executor will stop accepting
+ more than 1 job at a time until load has lowered below a safe
+ level. This level is determined by multiplying the number of
+ CPU's by `load_multiplier`.
+
+ So for example, if the system has 2 CPUs, and load_multiplier
+ is 2.5, the safe load for the system is 5.00. Any time the
+ system load average is over 5.00, the executor will quit
+ accepting multiple jobs at one time.
+
+ The executor will observe system load and determine whether
+ to accept more jobs every 30 seconds.
+
.. attr:: merger
.. attr:: git_user_email
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 1475440..e41d6b7 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -16,6 +16,7 @@
import datetime
import json
import logging
+import multiprocessing
import os
import shutil
import signal
@@ -563,6 +564,10 @@
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
execution_wrapper_name = get_default(self.config, 'executor',
'execution_wrapper', 'bubblewrap')
+ load_multiplier = float(get_default(self.config, 'executor',
+ 'load_multiplier', '2.5'))
+ self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+ self.accepting_work = False
self.execution_wrapper = connections.drivers[execution_wrapper_name]
self.connections = connections
@@ -652,19 +657,32 @@
self.executor_thread = threading.Thread(target=self.run_executor)
self.executor_thread.daemon = True
self.executor_thread.start()
+ self.governor_stop_event = threading.Event()
+ self.governor_thread = threading.Thread(target=self.run_governor)
+ self.governor_thread.daemon = True
+ self.governor_thread.start()
self.disk_accountant.start()
def register(self):
- self.executor_worker.registerFunction("executor:execute")
+ self.register_work()
self.executor_worker.registerFunction("executor:stop:%s" %
self.hostname)
self.merger_worker.registerFunction("merger:merge")
self.merger_worker.registerFunction("merger:cat")
self.merger_worker.registerFunction("merger:refstate")
+ def register_work(self):
+ self.accepting_work = True
+ self.executor_worker.registerFunction("executor:execute")
+
+ def unregister_work(self):
+ self.accepting_work = False
+ self.executor_worker.unregisterFunction("executor:execute")
+
def stop(self):
self.log.debug("Stopping")
self.disk_accountant.stop()
+ self.governor_stop_event.set()
self._running = False
self._command_running = False
self.command_socket.stop()
@@ -708,6 +726,7 @@
self.update_thread.join()
self.merger_thread.join()
self.executor_thread.join()
+ self.governor_thread.join()
def runCommand(self):
while self._command_running:
@@ -796,10 +815,31 @@
except Exception:
self.log.exception("Exception while getting job")
+ def run_governor(self):
+ while not self.governor_stop_event.wait(30):
+ self.manageLoad()
+
def executeJob(self, job):
self.job_workers[job.unique] = AnsibleJob(self, job)
self.job_workers[job.unique].run()
+ def manageLoad(self):
+ ''' Apply some heuristics to decide whether or not we should
+ be askign for more jobs '''
+ load_avg = os.getloadavg()[0]
+ if self.accepting_work:
+ # Don't unregister if we don't have any active jobs.
+ if load_avg > self.max_load_avg and self.job_workers:
+ self.log.info(
+ "Unregistering due to high system load {} > {}".format(
+ load_avg, self.max_load_avg))
+ self.unregister_work()
+ elif load_avg <= self.max_load_avg:
+ self.log.info(
+ "Re-registering as load is within limits {} <= {}".format(
+ load_avg, self.max_load_avg))
+ self.register_work()
+
def finishJob(self, unique):
del(self.job_workers[unique])