Merge "Remove old buildsets" into feature/zuulv3
diff --git a/doc/source/user/jobs.rst b/doc/source/user/jobs.rst
index 6962b8f..cf607b9 100644
--- a/doc/source/user/jobs.rst
+++ b/doc/source/user/jobs.rst
@@ -249,6 +249,13 @@
          A boolean indicating whether this project appears in the
          :attr:`job.required-projects` list for this job.
 
+   .. var:: _projects
+      :type: dict
+
+      The same as ``projects`` but a dictionary indexed by the
+      ``name`` value of each entry.  ``projects`` will be converted to
+      this.
+
    .. var:: tenant
 
       The name of the current Zuul tenant.
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6efc43f..3608ef0 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2506,6 +2506,28 @@
         self.assertIn('project-merge', status_jobs[1]['dependencies'])
         self.assertIn('project-merge', status_jobs[2]['dependencies'])
 
+    def test_reconfigure_merge(self):
+        """Test that two reconfigure events are merged"""
+
+        tenant = self.sched.abide.tenants['tenant-one']
+        (trusted, project) = tenant.getProject('org/project')
+
+        self.sched.run_handler_lock.acquire()
+        self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
+        self.sched.reconfigureTenant(tenant, project)
+        self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+        self.sched.reconfigureTenant(tenant, project)
+        # The second event should have been combined with the first
+        # so we should still only have one entry.
+        self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+        self.sched.run_handler_lock.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
     def test_live_reconfiguration(self):
         "Test that live reconfiguration works"
         self.executor_server.hold_jobs_in_build = True
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 041f754..ae22c8e 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -187,6 +187,7 @@
             and item.change.newrev != '0' * 40):
             zuul_params['newrev'] = item.change.newrev
         zuul_params['projects'] = []  # Set below
+        zuul_params['_projects'] = {}  # transitional to convert to dict
         zuul_params['items'] = []
         for i in all_items:
             d = dict()
@@ -270,14 +271,24 @@
                 projects.add(project)
 
         for p in projects:
-            zuul_params['projects'].append(dict(
+            zuul_params['_projects'][p.canonical_name] = (dict(
                 name=p.name,
                 short_name=p.name.split('/')[-1],
-                canonical_hostname=p.canonical_hostname,
+                # Duplicate this into the dict too, so that iterating
+                # project.values() is easier for callers
                 canonical_name=p.canonical_name,
+                canonical_hostname=p.canonical_hostname,
                 src_dir=os.path.join('src', p.canonical_name),
                 required=(p in required_projects),
             ))
+        # We are transitioning "projects" from a list to a dict
+        # indexed by canonical name, as it is much easier to access
+        # values in ansible.  Existing callers are converted to
+        # "_projects", then once "projects" is unused we switch it,
+        # then convert callers back.  Finally when "_projects" is
+        # unused it will be removed.
+        for cn, p in zuul_params['_projects'].items():
+            zuul_params['projects'].append(p)
 
         build = Build(job, uuid)
         build.parameters = params
diff --git a/zuul/lib/queue.py b/zuul/lib/queue.py
new file mode 100644
index 0000000..db8af47
--- /dev/null
+++ b/zuul/lib/queue.py
@@ -0,0 +1,78 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import threading
+
+
+class MergedQueue(object):
+    def __init__(self):
+        self.queue = collections.deque()
+        self.lock = threading.RLock()
+        self.condition = threading.Condition(self.lock)
+        self.join_condition = threading.Condition(self.lock)
+        self.tasks = 0
+
+    def qsize(self):
+        return len(self.queue)
+
+    def empty(self):
+        return self.qsize() == 0
+
+    def put(self, item):
+        # Returns the original item if added, or an updated equivalent
+        # item if already enqueued.
+        self.condition.acquire()
+        ret = None
+        try:
+            for x in self.queue:
+                if item == x:
+                    ret = x
+                    if hasattr(ret, 'merge'):
+                        ret.merge(item)
+            if ret is None:
+                ret = item
+                self.queue.append(item)
+                self.condition.notify()
+        finally:
+            self.condition.release()
+        return ret
+
+    def get(self):
+        self.condition.acquire()
+        try:
+            while True:
+                try:
+                    ret = self.queue.popleft()
+                    self.join_condition.acquire()
+                    self.tasks += 1
+                    self.join_condition.release()
+                    return ret
+                except IndexError:
+                    self.condition.wait()
+        finally:
+            self.condition.release()
+
+    def task_done(self):
+        self.join_condition.acquire()
+        self.tasks -= 1
+        self.join_condition.notify()
+        self.join_condition.release()
+
+    def join(self):
+        self.join_condition.acquire()
+        while self.tasks:
+            self.join_condition.wait()
+        self.join_condition.release()
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5924f8..33b6723 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -31,6 +31,7 @@
 from zuul import version as zuul_version
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd
+import zuul.lib.queue
 
 
 class ManagementEvent(object):
@@ -76,8 +77,23 @@
     """
     def __init__(self, tenant, project):
         super(TenantReconfigureEvent, self).__init__()
-        self.tenant = tenant
-        self.project = project
+        self.tenant_name = tenant.name
+        self.projects = set([project])
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __eq__(self, other):
+        if not isinstance(other, TenantReconfigureEvent):
+            return False
+        # We don't check projects because they will get combined when
+        # merged.
+        return (self.tenant_name == other.tenant_name)
+
+    def merge(self, other):
+        if self.tenant_name != other.tenant_name:
+            raise Exception("Can not merge events from different tenants")
+        self.projects |= other.projects
 
 
 class PromoteEvent(ManagementEvent):
@@ -223,7 +239,7 @@
 
         self.trigger_event_queue = queue.Queue()
         self.result_event_queue = queue.Queue()
-        self.management_event_queue = queue.Queue()
+        self.management_event_queue = zuul.lib.queue.MergedQueue()
         self.abide = model.Abide()
 
         if not testonly:
@@ -475,15 +491,16 @@
             self.log.debug("Tenant reconfiguration beginning")
             # If a change landed to a project, clear out the cached
             # config before reconfiguring.
-            if event.project:
-                event.project.unparsed_config = None
+            for project in event.projects:
+                project.unparsed_config = None
+            old_tenant = self.abide.tenants[event.tenant_name]
             loader = configloader.ConfigLoader()
             abide = loader.reloadTenant(
                 self.config.get('scheduler', 'tenant_config'),
                 self._get_project_key_dir(),
                 self, self.merger, self.connections,
-                self.abide, event.tenant)
-            tenant = abide.tenants[event.tenant.name]
+                self.abide, old_tenant)
+            tenant = abide.tenants[event.tenant_name]
             self._reconfigureTenant(tenant)
             self.abide = abide
         finally:
@@ -939,6 +956,9 @@
         data['result_event_queue'] = {}
         data['result_event_queue']['length'] = \
             self.result_event_queue.qsize()
+        data['management_event_queue'] = {}
+        data['management_event_queue']['length'] = \
+            self.management_event_queue.qsize()
 
         if self.last_reconfigured:
             data['last_reconfigured'] = self.last_reconfigured * 1000