Merge "Fix branch matching logic" into feature/zuulv3
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index b3a4c3f..5e7e0e1 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -468,6 +468,23 @@
       `bubblewrap` has become integral to securely operating Zuul.  If you
       have a valid use case for it, we encourage you to let us know.
 
+   .. attr:: load_multiplier
+      :default: 2.5
+
+      When an executor host gets too busy, the system may suffer
+      timeouts and other ill effects. The executor will stop accepting
+      more than 1 job at a time until load has lowered below a safe
+      level.  This level is determined by multiplying the number of
+      CPU's by `load_multiplier`.
+
+      So for example, if the system has 2 CPUs, and load_multiplier
+      is 2.5, the safe load for the system is 5.00. Any time the
+      system load average is over 5.00, the executor will quit
+      accepting multiple jobs at one time.
+
+      The executor will observe system load and determine whether
+      to accept more jobs every 30 seconds.
+
 .. attr:: merger
 
    .. attr:: git_user_email
diff --git a/tests/base.py b/tests/base.py
index c240a1b..5841c08 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1518,6 +1518,7 @@
 
     def __init__(self, use_ssl=False):
         self.hold_jobs_in_queue = False
+        self.hold_merge_jobs_in_queue = False
         if use_ssl:
             ssl_ca = os.path.join(FIXTURE_DIR, 'gearman/root-ca.pem')
             ssl_cert = os.path.join(FIXTURE_DIR, 'gearman/server.pem')
@@ -1537,6 +1538,8 @@
                 if not hasattr(job, 'waiting'):
                     if job.name.startswith(b'executor:execute'):
                         job.waiting = self.hold_jobs_in_queue
+                    elif job.name.startswith(b'merger:'):
+                        job.waiting = self.hold_merge_jobs_in_queue
                     else:
                         job.waiting = False
                 if job.waiting:
@@ -1562,10 +1565,15 @@
                 len(self.low_queue))
         self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
         for job in self.getQueue():
-            if job.name != b'executor:execute':
-                continue
-            parameters = json.loads(job.arguments.decode('utf8'))
-            if not regex or re.match(regex, parameters.get('job')):
+            match = False
+            if job.name == b'executor:execute':
+                parameters = json.loads(job.arguments.decode('utf8'))
+                if not regex or re.match(regex, parameters.get('job')):
+                    match = True
+            if job.name == b'merger:merge':
+                if not regex:
+                    match = True
+            if match:
                 self.log.debug("releasing queued job %s" %
                                job.unique)
                 job.waiting = False
@@ -2556,6 +2564,26 @@
             return False
         return True
 
+    def areAllMergeJobsWaiting(self):
+        for client_job in list(self.merge_client.jobs):
+            if not client_job.handle:
+                self.log.debug("%s has no handle" % client_job)
+                return False
+            server_job = self.gearman_server.jobs.get(client_job.handle)
+            if not server_job:
+                self.log.debug("%s is not known to the gearman server" %
+                               client_job)
+                return False
+            if not hasattr(server_job, 'waiting'):
+                self.log.debug("%s is being enqueued" % server_job)
+                return False
+            if server_job.waiting:
+                self.log.debug("%s is waiting" % server_job)
+                continue
+            self.log.debug("%s is not waiting" % server_job)
+            return False
+        return True
+
     def eventQueuesEmpty(self):
         for event_queue in self.event_queues:
             yield event_queue.empty()
@@ -2592,7 +2620,7 @@
                 # processed
                 self.eventQueuesJoin()
                 self.sched.run_handler_lock.acquire()
-                if (not self.merge_client.jobs and
+                if (self.areAllMergeJobsWaiting() and
                     self.haveAllBuildsReported() and
                     self.areAllBuildsWaiting() and
                     self.areAllNodeRequestsComplete() and
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index c457ff0..e368108 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -262,7 +262,7 @@
         # Test master
         change.branch = 'master'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -291,7 +291,7 @@
         # Test diablo
         change.branch = 'stable/diablo'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -321,7 +321,7 @@
         # Test essex
         change.branch = 'stable/essex'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -554,7 +554,7 @@
         change = model.Change(self.project)
         change.branch = 'master'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -568,7 +568,7 @@
 
         change.branch = 'stable/diablo'
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertTrue(python27.changeMatches(change))
@@ -625,7 +625,7 @@
         change.branch = 'master'
         change.files = ['/COMMIT_MSG', 'ignored-file']
         item = queue.enqueueChange(change)
-        item.current_build_set.layout = layout
+        item.layout = layout
 
         self.assertTrue(base.changeMatches(change))
         self.assertFalse(python27.changeMatches(change))
@@ -700,7 +700,7 @@
         # Test master
         change.branch = 'master'
         item = self.queue.enqueueChange(change)
-        item.current_build_set.layout = self.layout
+        item.layout = self.layout
         with testtools.ExpectedException(
                 Exception,
                 "Project project2 is not allowed to run job job"):
@@ -736,7 +736,7 @@
         # Test master
         change.branch = 'master'
         item = self.queue.enqueueChange(change)
-        item.current_build_set.layout = self.layout
+        item.layout = self.layout
         with testtools.ExpectedException(
                 Exception,
                 "Pre-review pipeline gate does not allow post-review job"):
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index c15e62c..5226675 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -5589,7 +5589,7 @@
                 queue = queue_candidate
                 break
         queue_item = queue.queue[0]
-        item_dynamic_layout = queue_item.current_build_set.layout
+        item_dynamic_layout = queue_item.layout
         dynamic_test_semaphore = \
             item_dynamic_layout.semaphores.get('test-semaphore')
         self.assertEqual(dynamic_test_semaphore.max, 1)
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 2ca69fc..d8e7054 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -238,7 +238,7 @@
         required_projects = set()
 
         def make_project_dict(project, override_branch=None):
-            project_config = item.current_build_set.layout.project_configs.get(
+            project_config = item.layout.project_configs.get(
                 project.canonical_name, None)
             if project_config:
                 project_default_branch = project_config.default_branch
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 1475440..e41d6b7 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -16,6 +16,7 @@
 import datetime
 import json
 import logging
+import multiprocessing
 import os
 import shutil
 import signal
@@ -563,6 +564,10 @@
         self.merge_name = get_default(self.config, 'merger', 'git_user_name')
         execution_wrapper_name = get_default(self.config, 'executor',
                                              'execution_wrapper', 'bubblewrap')
+        load_multiplier = float(get_default(self.config, 'executor',
+                                            'load_multiplier', '2.5'))
+        self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
+        self.accepting_work = False
         self.execution_wrapper = connections.drivers[execution_wrapper_name]
 
         self.connections = connections
@@ -652,19 +657,32 @@
         self.executor_thread = threading.Thread(target=self.run_executor)
         self.executor_thread.daemon = True
         self.executor_thread.start()
+        self.governor_stop_event = threading.Event()
+        self.governor_thread = threading.Thread(target=self.run_governor)
+        self.governor_thread.daemon = True
+        self.governor_thread.start()
         self.disk_accountant.start()
 
     def register(self):
-        self.executor_worker.registerFunction("executor:execute")
+        self.register_work()
         self.executor_worker.registerFunction("executor:stop:%s" %
                                               self.hostname)
         self.merger_worker.registerFunction("merger:merge")
         self.merger_worker.registerFunction("merger:cat")
         self.merger_worker.registerFunction("merger:refstate")
 
+    def register_work(self):
+        self.accepting_work = True
+        self.executor_worker.registerFunction("executor:execute")
+
+    def unregister_work(self):
+        self.accepting_work = False
+        self.executor_worker.unregisterFunction("executor:execute")
+
     def stop(self):
         self.log.debug("Stopping")
         self.disk_accountant.stop()
+        self.governor_stop_event.set()
         self._running = False
         self._command_running = False
         self.command_socket.stop()
@@ -708,6 +726,7 @@
         self.update_thread.join()
         self.merger_thread.join()
         self.executor_thread.join()
+        self.governor_thread.join()
 
     def runCommand(self):
         while self._command_running:
@@ -796,10 +815,31 @@
             except Exception:
                 self.log.exception("Exception while getting job")
 
+    def run_governor(self):
+        while not self.governor_stop_event.wait(30):
+            self.manageLoad()
+
     def executeJob(self, job):
         self.job_workers[job.unique] = AnsibleJob(self, job)
         self.job_workers[job.unique].run()
 
+    def manageLoad(self):
+        ''' Apply some heuristics to decide whether or not we should
+            be askign for more jobs '''
+        load_avg = os.getloadavg()[0]
+        if self.accepting_work:
+            # Don't unregister if we don't have any active jobs.
+            if load_avg > self.max_load_avg and self.job_workers:
+                self.log.info(
+                    "Unregistering due to high system load {} > {}".format(
+                        load_avg, self.max_load_avg))
+                self.unregister_work()
+        elif load_avg <= self.max_load_avg:
+            self.log.info(
+                "Re-registering as load is within limits {} <= {}".format(
+                    load_avg, self.max_load_avg))
+            self.register_work()
+
     def finishJob(self, unique):
         del(self.job_workers[unique])
 
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 74039c3..3c0c11f 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -236,11 +236,11 @@
                 #    in-repo files stored in the buildset.
                 # 3) None in the case that a fetch of the files from
                 #    the merger is still pending.
-                item.current_build_set.layout = self.getLayout(item)
+                item.layout = self.getLayout(item)
 
                 # Rebuild the frozen job tree from the new layout, if
                 # we have one.  If not, it will be built later.
-                if item.current_build_set.layout:
+                if item.layout:
                     item.freezeJobGraph()
 
                 # Re-set build results in case any new jobs have been
@@ -373,7 +373,7 @@
     def executeJobs(self, item):
         # TODO(jeblair): This should return a value indicating a job
         # was executed.  Appears to be a longstanding bug.
-        if not item.current_build_set.layout:
+        if not item.layout:
             return False
 
         jobs = item.findJobsToRun(
@@ -465,7 +465,7 @@
     def getLayout(self, item):
         if not item.change.updatesConfig():
             if item.item_ahead:
-                return item.item_ahead.current_build_set.layout
+                return item.item_ahead.layout
             else:
                 return item.queue.pipeline.layout
         # This item updates the config, ask the merger for the result.
@@ -516,10 +516,9 @@
     def prepareJobs(self, item):
         # This only runs once the item is in the pipeline's action window
         # Returns True if the item is ready, false otherwise
-        build_set = item.current_build_set
-        if not build_set.layout:
-            build_set.layout = self.getLayout(item)
-        if not build_set.layout:
+        if not item.layout:
+            item.layout = self.getLayout(item)
+        if not item.layout:
             return False
 
         if not item.job_graph:
@@ -745,8 +744,7 @@
         # pipeline, use the dynamic layout if available, otherwise,
         # fall back to the current static layout as a best
         # approximation.
-        layout = (item.current_build_set.layout or
-                  self.pipeline.layout)
+        layout = (item.layout or self.pipeline.layout)
 
         project_in_pipeline = True
         if not layout.getProjectPipelineConfig(item.change.project,
diff --git a/zuul/model.py b/zuul/model.py
index 4d40b6c..c5d0a4d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1271,7 +1271,6 @@
         self.node_requests = {}  # job -> reqs
         self.files = RepoFiles()
         self.repo_state = {}
-        self.layout = None
         self.tries = {}
 
     @property
@@ -1366,7 +1365,7 @@
         item = self.item
         layout = None
         while item:
-            layout = item.current_build_set.layout
+            layout = item.layout
             if layout:
                 break
             item = item.item_ahead
@@ -1410,7 +1409,7 @@
         self.quiet = False
         self.active = False  # Whether an item is within an active window
         self.live = True  # Whether an item is intended to be processed at all
-        # TODO(jeblair): move job_graph to buildset
+        self.layout = None
         self.job_graph = None
 
     def __repr__(self):
@@ -1428,6 +1427,7 @@
         old.next_build_set = self.current_build_set
         self.current_build_set.previous_build_set = old
         self.build_sets.append(self.current_build_set)
+        self.layout = None
         self.job_graph = None
 
     def addBuild(self, build):
@@ -1443,8 +1443,7 @@
     def freezeJobGraph(self):
         """Find or create actual matching jobs for this item's change and
         store the resulting job tree."""
-        layout = self.current_build_set.layout
-        job_graph = layout.createJobGraph(self)
+        job_graph = self.layout.createJobGraph(self)
         for job in job_graph.getJobs():
             # Ensure that each jobs's dependencies are fully
             # accessible.  This will raise an exception if not.
@@ -2527,14 +2526,14 @@
 
     @staticmethod
     def _max_count(item, semaphore_name):
-        if not item.current_build_set.layout:
+        if not item.layout:
             # This should not occur as the layout of the item must already be
             # built when acquiring or releasing a semaphore for a job.
             raise Exception("Item {} has no layout".format(item))
 
         # find the right semaphore
         default_semaphore = Semaphore(semaphore_name, 1)
-        semaphores = item.current_build_set.layout.semaphores
+        semaphores = item.layout.semaphores
         return semaphores.get(semaphore_name, default_semaphore).max
 
 
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 5432661..c321bc0 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -71,10 +71,13 @@
     the path specified in the configuration.
 
     :arg Tenant tenant: the tenant to reconfigure
+    :arg Project project: if supplied, clear the cached configuration
+         from this project first
     """
-    def __init__(self, tenant):
+    def __init__(self, tenant, project):
         super(TenantReconfigureEvent, self).__init__()
         self.tenant = tenant
+        self.project = project
 
 
 class PromoteEvent(ManagementEvent):
@@ -331,14 +334,15 @@
         self.result_event_queue.put(event)
         self.wake_event.set()
 
-    def reconfigureTenant(self, tenant):
-        self.log.debug("Prepare to reconfigure")
-        event = TenantReconfigureEvent(tenant)
+    def reconfigureTenant(self, tenant, project):
+        self.log.debug("Submitting tenant reconfiguration event for "
+                       "%s due to project %s", tenant.name, project)
+        event = TenantReconfigureEvent(tenant, project)
         self.management_event_queue.put(event)
         self.wake_event.set()
 
     def reconfigure(self, config):
-        self.log.debug("Prepare to reconfigure")
+        self.log.debug("Submitting reconfiguration event")
         event = ReconfigureEvent(config)
         self.management_event_queue.put(event)
         self.wake_event.set()
@@ -457,7 +461,7 @@
         self.layout_lock.acquire()
         self.config = event.config
         try:
-            self.log.debug("Performing reconfiguration")
+            self.log.debug("Full reconfiguration beginning")
             loader = configloader.ConfigLoader()
             abide = loader.loadConfig(
                 self.config.get('scheduler', 'tenant_config'),
@@ -468,13 +472,18 @@
             self.abide = abide
         finally:
             self.layout_lock.release()
+        self.log.debug("Full reconfiguration complete")
 
     def _doTenantReconfigureEvent(self, event):
         # This is called in the scheduler loop after another thread submits
         # a request
         self.layout_lock.acquire()
         try:
-            self.log.debug("Performing tenant reconfiguration")
+            self.log.debug("Tenant reconfiguration beginning")
+            # If a change landed to a project, clear out the cached
+            # config before reconfiguring.
+            if event.project:
+                event.project.unparsed_config = None
             loader = configloader.ConfigLoader()
             abide = loader.reloadTenant(
                 self.config.get('scheduler', 'tenant_config'),
@@ -486,6 +495,7 @@
             self.abide = abide
         finally:
             self.layout_lock.release()
+        self.log.debug("Tenant reconfiguration complete")
 
     def _reenqueueGetProject(self, tenant, item):
         project = item.change.project
@@ -769,8 +779,7 @@
                     # or a branch was just created or deleted.  Clear
                     # out cached data for this project and perform a
                     # reconfiguration.
-                    change.project.unparsed_config = None
-                    self.reconfigureTenant(tenant)
+                    self.reconfigureTenant(tenant, change.project)
                 for pipeline in tenant.layout.pipelines.values():
                     if event.isPatchsetCreated():
                         pipeline.manager.removeOldVersionsOfChange(change)