Merge "Fix branch matching logic" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index b3a4c3f..5e7e0e1 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -468,6 +468,23 @@
`bubblewrap` has become integral to securely operating Zuul. If you
have a valid use case for it, we encourage you to let us know.
+ .. attr:: load_multiplier
+ :default: 2.5
+
+ When an executor host gets too busy, the system may suffer
+ timeouts and other ill effects. The executor will stop accepting
+ more than 1 job at a time until load has lowered below a safe
+ level. This level is determined by multiplying the number of
+ CPU's by `load_multiplier`.
+
+ So for example, if the system has 2 CPUs, and load_multiplier
+ is 2.5, the safe load for the system is 5.00. Any time the
+ system load average is over 5.00, the executor will quit
+ accepting multiple jobs at one time.
+
+ The executor will observe system load and determine whether
+ to accept more jobs every 30 seconds.
+
.. attr:: merger
.. attr:: git_user_email
diff --git a/tests/base.py b/tests/base.py
index c240a1b..5841c08 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1518,6 +1518,7 @@
def __init__(self, use_ssl=False):
self.hold_jobs_in_queue = False
+ self.hold_merge_jobs_in_queue = False
if use_ssl:
ssl_ca = os.path.join(FIXTURE_DIR, 'gearman/root-ca.pem')
ssl_cert = os.path.join(FIXTURE_DIR, 'gearman/server.pem')
@@ -1537,6 +1538,8 @@
if not hasattr(job, 'waiting'):
if job.name.startswith(b'executor:execute'):
job.waiting = self.hold_jobs_in_queue
+ elif job.name.startswith(b'merger:'):
+ job.waiting = self.hold_merge_jobs_in_queue
else:
job.waiting = False
if job.waiting:
@@ -1562,10 +1565,15 @@
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
- if job.name != b'executor:execute':
- continue
- parameters = json.loads(job.arguments.decode('utf8'))
- if not regex or re.match(regex, parameters.get('job')):
+ match = False
+ if job.name == b'executor:execute':
+ parameters = json.loads(job.arguments.decode('utf8'))
+ if not regex or re.match(regex, parameters.get('job')):
+ match = True
+ if job.name == b'merger:merge':
+ if not regex:
+ match = True
+ if match:
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
@@ -2556,6 +2564,26 @@
return False
return True
+ def areAllMergeJobsWaiting(self):
+ for client_job in list(self.merge_client.jobs):
+ if not client_job.handle:
+ self.log.debug("%s has no handle" % client_job)
+ return False
+ server_job = self.gearman_server.jobs.get(client_job.handle)
+ if not server_job:
+ self.log.debug("%s is not known to the gearman server" %
+ client_job)
+ return False
+ if not hasattr(server_job, 'waiting'):
+ self.log.debug("%s is being enqueued" % server_job)
+ return False
+ if server_job.waiting:
+ self.log.debug("%s is waiting" % server_job)
+ continue
+ self.log.debug("%s is not waiting" % server_job)
+ return False
+ return True
+
def eventQueuesEmpty(self):
for event_queue in self.event_queues:
yield event_queue.empty()
@@ -2592,7 +2620,7 @@
# processed
self.eventQueuesJoin()
self.sched.run_handler_lock.acquire()
- if (not self.merge_client.jobs and
+ if (self.areAllMergeJobsWaiting() and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting() and
self.areAllNodeRequestsComplete() and
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index c457ff0..e368108 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -262,7 +262,7 @@
# Test master
change.branch = 'master'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -291,7 +291,7 @@
# Test diablo
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -321,7 +321,7 @@
# Test essex
change.branch = 'stable/essex'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -554,7 +554,7 @@
change = model.Change(self.project)
change.branch = 'master'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -568,7 +568,7 @@
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@@ -625,7 +625,7 @@
change.branch = 'master'
change.files = ['/COMMIT_MSG', 'ignored-file']
item = queue.enqueueChange(change)
- item.current_build_set.layout = layout
+ item.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertFalse(python27.changeMatches(change))
@@ -700,7 +700,7 @@
# Test master
change.branch = 'master'
item = self.queue.enqueueChange(change)
- item.current_build_set.layout = self.layout
+ item.layout = self.layout
with testtools.ExpectedException(
Exception,
"Project project2 is not allowed to run job job"):
@@ -736,7 +736,7 @@
# Test master
change.branch = 'master'
item = self.queue.enqueueChange(change)
- item.current_build_set.layout = self.layout
+ item.layout = self.layout
with testtools.ExpectedException(
Exception,
"Pre-review pipeline gate does not allow post-review job"):
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index c15e62c..5226675 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -5589,7 +5589,7 @@
queue = queue_candidate
break
queue_item = queue.queue[0]
- item_dynamic_layout = queue_item.current_build_set.layout
+ item_dynamic_layout = queue_item.layout
dynamic_test_semaphore = \
item_dynamic_layout.semaphores.get('test-semaphore')
self.assertEqual(dynamic_test_semaphore.max, 1)
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 2ca69fc..d8e7054 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -238,7 +238,7 @@
required_projects = set()
def make_project_dict(project, override_branch=None):
- project_config = item.current_build_set.layout.project_configs.get(
+ project_config = item.layout.project_configs.get(
project.canonical_name, None)
if project_config:
project_default_branch = project_config.default_branch
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 1475440..e41d6b7 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -16,6 +16,7 @@
import datetime
import json
import logging
+import multiprocessing
import os
import shutil
import signal
@@ -563,6 +564,10 @@
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
execution_wrapper_name = get_default(self.config, 'executor',
'execution_wrapper', 'bubblewrap')
+ load_multiplier = float(get_default(self.config, 'executor',
+ 'load_multiplier', '2.5'))
+ self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+ self.accepting_work = False
self.execution_wrapper = connections.drivers[execution_wrapper_name]
self.connections = connections
@@ -652,19 +657,32 @@
self.executor_thread = threading.Thread(target=self.run_executor)
self.executor_thread.daemon = True
self.executor_thread.start()
+ self.governor_stop_event = threading.Event()
+ self.governor_thread = threading.Thread(target=self.run_governor)
+ self.governor_thread.daemon = True
+ self.governor_thread.start()
self.disk_accountant.start()
def register(self):
- self.executor_worker.registerFunction("executor:execute")
+ self.register_work()
self.executor_worker.registerFunction("executor:stop:%s" %
self.hostname)
self.merger_worker.registerFunction("merger:merge")
self.merger_worker.registerFunction("merger:cat")
self.merger_worker.registerFunction("merger:refstate")
+ def register_work(self):
+ self.accepting_work = True
+ self.executor_worker.registerFunction("executor:execute")
+
+ def unregister_work(self):
+ self.accepting_work = False
+ self.executor_worker.unregisterFunction("executor:execute")
+
def stop(self):
self.log.debug("Stopping")
self.disk_accountant.stop()
+ self.governor_stop_event.set()
self._running = False
self._command_running = False
self.command_socket.stop()
@@ -708,6 +726,7 @@
self.update_thread.join()
self.merger_thread.join()
self.executor_thread.join()
+ self.governor_thread.join()
def runCommand(self):
while self._command_running:
@@ -796,10 +815,31 @@
except Exception:
self.log.exception("Exception while getting job")
+ def run_governor(self):
+ while not self.governor_stop_event.wait(30):
+ self.manageLoad()
+
def executeJob(self, job):
self.job_workers[job.unique] = AnsibleJob(self, job)
self.job_workers[job.unique].run()
+ def manageLoad(self):
+ ''' Apply some heuristics to decide whether or not we should
+ be askign for more jobs '''
+ load_avg = os.getloadavg()[0]
+ if self.accepting_work:
+ # Don't unregister if we don't have any active jobs.
+ if load_avg > self.max_load_avg and self.job_workers:
+ self.log.info(
+ "Unregistering due to high system load {} > {}".format(
+ load_avg, self.max_load_avg))
+ self.unregister_work()
+ elif load_avg <= self.max_load_avg:
+ self.log.info(
+ "Re-registering as load is within limits {} <= {}".format(
+ load_avg, self.max_load_avg))
+ self.register_work()
+
def finishJob(self, unique):
del(self.job_workers[unique])
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 74039c3..3c0c11f 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -236,11 +236,11 @@
# in-repo files stored in the buildset.
# 3) None in the case that a fetch of the files from
# the merger is still pending.
- item.current_build_set.layout = self.getLayout(item)
+ item.layout = self.getLayout(item)
# Rebuild the frozen job tree from the new layout, if
# we have one. If not, it will be built later.
- if item.current_build_set.layout:
+ if item.layout:
item.freezeJobGraph()
# Re-set build results in case any new jobs have been
@@ -373,7 +373,7 @@
def executeJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
# was executed. Appears to be a longstanding bug.
- if not item.current_build_set.layout:
+ if not item.layout:
return False
jobs = item.findJobsToRun(
@@ -465,7 +465,7 @@
def getLayout(self, item):
if not item.change.updatesConfig():
if item.item_ahead:
- return item.item_ahead.current_build_set.layout
+ return item.item_ahead.layout
else:
return item.queue.pipeline.layout
# This item updates the config, ask the merger for the result.
@@ -516,10 +516,9 @@
def prepareJobs(self, item):
# This only runs once the item is in the pipeline's action window
# Returns True if the item is ready, false otherwise
- build_set = item.current_build_set
- if not build_set.layout:
- build_set.layout = self.getLayout(item)
- if not build_set.layout:
+ if not item.layout:
+ item.layout = self.getLayout(item)
+ if not item.layout:
return False
if not item.job_graph:
@@ -745,8 +744,7 @@
# pipeline, use the dynamic layout if available, otherwise,
# fall back to the current static layout as a best
# approximation.
- layout = (item.current_build_set.layout or
- self.pipeline.layout)
+ layout = (item.layout or self.pipeline.layout)
project_in_pipeline = True
if not layout.getProjectPipelineConfig(item.change.project,
diff --git a/zuul/model.py b/zuul/model.py
index 4d40b6c..c5d0a4d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1271,7 +1271,6 @@
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
self.repo_state = {}
- self.layout = None
self.tries = {}
@property
@@ -1366,7 +1365,7 @@
item = self.item
layout = None
while item:
- layout = item.current_build_set.layout
+ layout = item.layout
if layout:
break
item = item.item_ahead
@@ -1410,7 +1409,7 @@
self.quiet = False
self.active = False # Whether an item is within an active window
self.live = True # Whether an item is intended to be processed at all
- # TODO(jeblair): move job_graph to buildset
+ self.layout = None
self.job_graph = None
def __repr__(self):
@@ -1428,6 +1427,7 @@
old.next_build_set = self.current_build_set
self.current_build_set.previous_build_set = old
self.build_sets.append(self.current_build_set)
+ self.layout = None
self.job_graph = None
def addBuild(self, build):
@@ -1443,8 +1443,7 @@
def freezeJobGraph(self):
"""Find or create actual matching jobs for this item's change and
store the resulting job tree."""
- layout = self.current_build_set.layout
- job_graph = layout.createJobGraph(self)
+ job_graph = self.layout.createJobGraph(self)
for job in job_graph.getJobs():
# Ensure that each jobs's dependencies are fully
# accessible. This will raise an exception if not.
@@ -2527,14 +2526,14 @@
@staticmethod
def _max_count(item, semaphore_name):
- if not item.current_build_set.layout:
+ if not item.layout:
# This should not occur as the layout of the item must already be
# built when acquiring or releasing a semaphore for a job.
raise Exception("Item {} has no layout".format(item))
# find the right semaphore
default_semaphore = Semaphore(semaphore_name, 1)
- semaphores = item.current_build_set.layout.semaphores
+ semaphores = item.layout.semaphores
return semaphores.get(semaphore_name, default_semaphore).max
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 5432661..c321bc0 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -71,10 +71,13 @@
the path specified in the configuration.
:arg Tenant tenant: the tenant to reconfigure
+ :arg Project project: if supplied, clear the cached configuration
+ from this project first
"""
- def __init__(self, tenant):
+ def __init__(self, tenant, project):
super(TenantReconfigureEvent, self).__init__()
self.tenant = tenant
+ self.project = project
class PromoteEvent(ManagementEvent):
@@ -331,14 +334,15 @@
self.result_event_queue.put(event)
self.wake_event.set()
- def reconfigureTenant(self, tenant):
- self.log.debug("Prepare to reconfigure")
- event = TenantReconfigureEvent(tenant)
+ def reconfigureTenant(self, tenant, project):
+ self.log.debug("Submitting tenant reconfiguration event for "
+ "%s due to project %s", tenant.name, project)
+ event = TenantReconfigureEvent(tenant, project)
self.management_event_queue.put(event)
self.wake_event.set()
def reconfigure(self, config):
- self.log.debug("Prepare to reconfigure")
+ self.log.debug("Submitting reconfiguration event")
event = ReconfigureEvent(config)
self.management_event_queue.put(event)
self.wake_event.set()
@@ -457,7 +461,7 @@
self.layout_lock.acquire()
self.config = event.config
try:
- self.log.debug("Performing reconfiguration")
+ self.log.debug("Full reconfiguration beginning")
loader = configloader.ConfigLoader()
abide = loader.loadConfig(
self.config.get('scheduler', 'tenant_config'),
@@ -468,13 +472,18 @@
self.abide = abide
finally:
self.layout_lock.release()
+ self.log.debug("Full reconfiguration complete")
def _doTenantReconfigureEvent(self, event):
# This is called in the scheduler loop after another thread submits
# a request
self.layout_lock.acquire()
try:
- self.log.debug("Performing tenant reconfiguration")
+ self.log.debug("Tenant reconfiguration beginning")
+ # If a change landed to a project, clear out the cached
+ # config before reconfiguring.
+ if event.project:
+ event.project.unparsed_config = None
loader = configloader.ConfigLoader()
abide = loader.reloadTenant(
self.config.get('scheduler', 'tenant_config'),
@@ -486,6 +495,7 @@
self.abide = abide
finally:
self.layout_lock.release()
+ self.log.debug("Tenant reconfiguration complete")
def _reenqueueGetProject(self, tenant, item):
project = item.change.project
@@ -769,8 +779,7 @@
# or a branch was just created or deleted. Clear
# out cached data for this project and perform a
# reconfiguration.
- change.project.unparsed_config = None
- self.reconfigureTenant(tenant)
+ self.reconfigureTenant(tenant, change.project)
for pipeline in tenant.layout.pipelines.values():
if event.isPatchsetCreated():
pipeline.manager.removeOldVersionsOfChange(change)