Merge "Add _projects to convert project list to dictionary" into feature/zuulv3
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6efc43f..3608ef0 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2506,6 +2506,28 @@
self.assertIn('project-merge', status_jobs[1]['dependencies'])
self.assertIn('project-merge', status_jobs[2]['dependencies'])
+ def test_reconfigure_merge(self):
+ """Test that two reconfigure events are merged"""
+
+ tenant = self.sched.abide.tenants['tenant-one']
+ (trusted, project) = tenant.getProject('org/project')
+
+ self.sched.run_handler_lock.acquire()
+ self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
+ self.sched.reconfigureTenant(tenant, project)
+ self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+ self.sched.reconfigureTenant(tenant, project)
+ # The second event should have been combined with the first
+ # so we should still only have one entry.
+ self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+ self.sched.run_handler_lock.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
self.executor_server.hold_jobs_in_build = True
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 2cdac38..426842b 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -1572,6 +1572,7 @@
# Don't call this method from dynamic reconfiguration because
# it interacts with drivers and connections.
layout = model.Layout(tenant)
+ TenantParser.log.debug("Created layout id %s", layout.uuid)
TenantParser._parseLayoutItems(layout, tenant, data,
scheduler, connections)
@@ -1700,6 +1701,7 @@
self._loadDynamicProjectData(config, project, files, False, tenant)
layout = model.Layout(tenant)
+ self.log.debug("Created layout id %s", layout.uuid)
if not include_config_projects:
# NOTE: the actual pipeline objects (complete with queues
# and enqueued items) are copied by reference here. This
diff --git a/zuul/lib/queue.py b/zuul/lib/queue.py
new file mode 100644
index 0000000..db8af47
--- /dev/null
+++ b/zuul/lib/queue.py
@@ -0,0 +1,78 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import threading
+
+
+class MergedQueue(object):
+ def __init__(self):
+ self.queue = collections.deque()
+ self.lock = threading.RLock()
+ self.condition = threading.Condition(self.lock)
+ self.join_condition = threading.Condition(self.lock)
+ self.tasks = 0
+
+ def qsize(self):
+ return len(self.queue)
+
+ def empty(self):
+ return self.qsize() == 0
+
+ def put(self, item):
+ # Returns the original item if added, or an updated equivalent
+ # item if already enqueued.
+ self.condition.acquire()
+ ret = None
+ try:
+ for x in self.queue:
+ if item == x:
+ ret = x
+ if hasattr(ret, 'merge'):
+ ret.merge(item)
+ if ret is None:
+ ret = item
+ self.queue.append(item)
+ self.condition.notify()
+ finally:
+ self.condition.release()
+ return ret
+
+ def get(self):
+ self.condition.acquire()
+ try:
+ while True:
+ try:
+ ret = self.queue.popleft()
+ self.join_condition.acquire()
+ self.tasks += 1
+ self.join_condition.release()
+ return ret
+ except IndexError:
+ self.condition.wait()
+ finally:
+ self.condition.release()
+
+ def task_done(self):
+ self.join_condition.acquire()
+ self.tasks -= 1
+ self.join_condition.notify()
+ self.join_condition.release()
+
+ def join(self):
+ self.join_condition.acquire()
+ while self.tasks:
+ self.join_condition.wait()
+ self.join_condition.release()
diff --git a/zuul/model.py b/zuul/model.py
index e68e46b..464ee16 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -2329,6 +2329,7 @@
"""Holds all of the Pipelines."""
def __init__(self, tenant):
+ self.uuid = uuid4().hex
self.tenant = tenant
self.project_configs = {}
self.project_templates = {}
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5924f8..33b6723 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -31,6 +31,7 @@
from zuul import version as zuul_version
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd
+import zuul.lib.queue
class ManagementEvent(object):
@@ -76,8 +77,23 @@
"""
def __init__(self, tenant, project):
super(TenantReconfigureEvent, self).__init__()
- self.tenant = tenant
- self.project = project
+ self.tenant_name = tenant.name
+ self.projects = set([project])
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __eq__(self, other):
+ if not isinstance(other, TenantReconfigureEvent):
+ return False
+ # We don't check projects because they will get combined when
+ # merged.
+ return (self.tenant_name == other.tenant_name)
+
+ def merge(self, other):
+ if self.tenant_name != other.tenant_name:
+ raise Exception("Can not merge events from different tenants")
+ self.projects |= other.projects
class PromoteEvent(ManagementEvent):
@@ -223,7 +239,7 @@
self.trigger_event_queue = queue.Queue()
self.result_event_queue = queue.Queue()
- self.management_event_queue = queue.Queue()
+ self.management_event_queue = zuul.lib.queue.MergedQueue()
self.abide = model.Abide()
if not testonly:
@@ -475,15 +491,16 @@
self.log.debug("Tenant reconfiguration beginning")
# If a change landed to a project, clear out the cached
# config before reconfiguring.
- if event.project:
- event.project.unparsed_config = None
+ for project in event.projects:
+ project.unparsed_config = None
+ old_tenant = self.abide.tenants[event.tenant_name]
loader = configloader.ConfigLoader()
abide = loader.reloadTenant(
self.config.get('scheduler', 'tenant_config'),
self._get_project_key_dir(),
self, self.merger, self.connections,
- self.abide, event.tenant)
- tenant = abide.tenants[event.tenant.name]
+ self.abide, old_tenant)
+ tenant = abide.tenants[event.tenant_name]
self._reconfigureTenant(tenant)
self.abide = abide
finally:
@@ -939,6 +956,9 @@
data['result_event_queue'] = {}
data['result_event_queue']['length'] = \
self.result_event_queue.qsize()
+ data['management_event_queue'] = {}
+ data['management_event_queue']['length'] = \
+ self.management_event_queue.qsize()
if self.last_reconfigured:
data['last_reconfigured'] = self.last_reconfigured * 1000