Merge "Be explicit about legacy var additions/omissions" into feature/zuulv3
diff --git a/doc/source/user/jobs.rst b/doc/source/user/jobs.rst
index 6962b8f..cf607b9 100644
--- a/doc/source/user/jobs.rst
+++ b/doc/source/user/jobs.rst
@@ -249,6 +249,13 @@
          A boolean indicating whether this project appears in the
          :attr:`job.required-projects` list for this job.
 
+   .. var:: _projects
+      :type: dict
+
+      The same as ``projects`` but a dictionary indexed by the
+      ``name`` value of each entry.  ``projects`` will be converted to
+      this.
+
    .. var:: tenant
 
       The name of the current Zuul tenant.
diff --git a/etc/status/public_html/jquery.zuul.js b/etc/status/public_html/jquery.zuul.js
index 3735004..50dbed5 100644
--- a/etc/status/public_html/jquery.zuul.js
+++ b/etc/status/public_html/jquery.zuul.js
@@ -53,6 +53,7 @@
             'msg_id': '#zuul_msg',
             'pipelines_id': '#zuul_pipelines',
             'queue_events_num': '#zuul_queue_events_num',
+            'queue_management_events_num': '#zuul_queue_management_events_num',
             'queue_results_num': '#zuul_queue_results_num',
         }, options);
 
@@ -713,6 +714,10 @@
                             data.trigger_event_queue ?
                                 data.trigger_event_queue.length : '0'
                         );
+                        $(options.queue_management_events_num).text(
+                            data.management_event_queue ?
+                                data.management_event_queue.length : '0'
+                        );
                         $(options.queue_results_num).text(
                             data.result_event_queue ?
                                 data.result_event_queue.length : '0'
diff --git a/etc/status/public_html/zuul.app.js b/etc/status/public_html/zuul.app.js
index ae950e8..7ceb2dd 100644
--- a/etc/status/public_html/zuul.app.js
+++ b/etc/status/public_html/zuul.app.js
@@ -33,7 +33,7 @@
         + '<div class="zuul-container" id="zuul-container">'
         + '<div style="display: none;" class="alert" id="zuul_msg"></div>'
         + '<button class="btn pull-right zuul-spinner">updating <span class="glyphicon glyphicon-refresh"></span></button>'
-        + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_results_num">0</span> results.</p>'
+        + '<p>Queue lengths: <span id="zuul_queue_events_num">0</span> events, <span id="zuul_queue_management_events_num">0</span> management events, <span id="zuul_queue_results_num">0</span> results.</p>'
         + '<div id="zuul_controls"></div>'
         + '<div id="zuul_pipelines" class="row"></div>'
         + '<p>Zuul version: <span id="zuul-version-span"></span></p>'
diff --git a/tests/base.py b/tests/base.py
index 797bfe6..a02ee5a 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -2384,7 +2384,7 @@
         # before noticing they should exit, but they should exit on their own.
         # Further the pydevd threads also need to be whitelisted so debugging
         # e.g. in PyCharm is possible without breaking shutdown.
-        whitelist = ['executor-watchdog',
+        whitelist = ['watchdog',
                      'pydevd.CommandThread',
                      'pydevd.Reader',
                      'pydevd.Writer',
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
new file mode 100644
index 0000000..40b3ca7
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/post.yaml
@@ -0,0 +1,13 @@
+- hosts: all
+  tasks:
+    - debug: var=waitpath
+    - file:
+        path: "{{zuul._test.test_root}}/{{zuul.build}}.post_start.flag"
+        state: touch
+    # Do not finish until test creates the flag file
+    - wait_for:
+        state: present
+        path: "{{waitpath}}"
+    - file:
+        path: "{{zuul._test.test_root}}/{{zuul.build}}.post_end.flag"
+        state: touch
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/pre.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
new file mode 100644
index 0000000..f679dce
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/playbooks/python27.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+  tasks: []
diff --git a/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
new file mode 100644
index 0000000..92a5515
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/common-config/zuul.yaml
@@ -0,0 +1,24 @@
+- pipeline:
+    name: check
+    manager: independent
+    post-review: true
+    trigger:
+      gerrit:
+        - event: patchset-created
+    success:
+      gerrit:
+        Verified: 1
+    failure:
+      gerrit:
+        Verified: -1
+
+- job:
+    name: base
+    parent: null
+
+- job:
+    name: python27
+    pre-run: playbooks/pre
+    post-run: playbooks/post
+    vars:
+      waitpath: '{{zuul._test.test_root}}/{{zuul.build}}/test_wait'
diff --git a/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
new file mode 100644
index 0000000..89a5674
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/.zuul.yaml
@@ -0,0 +1,5 @@
+- project:
+    name: org/project
+    check:
+      jobs:
+        - python27
diff --git a/tests/fixtures/config/post-playbook/git/org_project/README b/tests/fixtures/config/post-playbook/git/org_project/README
new file mode 100644
index 0000000..9daeafb
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/post-playbook/main.yaml b/tests/fixtures/config/post-playbook/main.yaml
new file mode 100644
index 0000000..6033879
--- /dev/null
+++ b/tests/fixtures/config/post-playbook/main.yaml
@@ -0,0 +1,9 @@
+- tenant:
+    name: tenant-one
+    source:
+      gerrit:
+        config-projects:
+          - common-config
+        untrusted-projects:
+          - org/project
+
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9090421..3608ef0 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -875,8 +875,7 @@
         # already (without approvals), we need to clear the cache
         # first.
         for connection in self.connections.connections.values():
-            if hasattr(connection, '_change_cache'):
-                connection._change_cache.clear()
+            connection.maintainCache([])
 
         self.executor_server.hold_jobs_in_build = True
         A.addApproval('Approved', 1)
@@ -946,8 +945,7 @@
 
         self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
         # there should still be changes in the cache
-        self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
-                            0)
+        self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
 
         self.executor_server.hold_jobs_in_build = False
         self.executor_server.release()
@@ -2508,6 +2506,28 @@
         self.assertIn('project-merge', status_jobs[1]['dependencies'])
         self.assertIn('project-merge', status_jobs[2]['dependencies'])
 
+    def test_reconfigure_merge(self):
+        """Test that two reconfigure events are merged"""
+
+        tenant = self.sched.abide.tenants['tenant-one']
+        (trusted, project) = tenant.getProject('org/project')
+
+        self.sched.run_handler_lock.acquire()
+        self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
+        self.sched.reconfigureTenant(tenant, project)
+        self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+        self.sched.reconfigureTenant(tenant, project)
+        # The second event should have been combined with the first
+        # so we should still only have one entry.
+        self.assertEqual(self.sched.management_event_queue.qsize(), 1)
+
+        self.sched.run_handler_lock.release()
+        self.waitUntilSettled()
+
+        self.assertEqual(self.sched.management_event_queue.qsize(), 0)
+
     def test_live_reconfiguration(self):
         "Test that live reconfiguration works"
         self.executor_server.hold_jobs_in_build = True
@@ -3933,8 +3953,7 @@
         self.assertEqual(B.data['status'], 'NEW')
 
         for connection in self.connections.connections.values():
-            if hasattr(connection, '_change_cache'):
-                connection._change_cache.clear()
+            connection.maintainCache([])
 
         self.executor_server.hold_jobs_in_build = True
         B.addApproval('Approved', 1)
diff --git a/tests/unit/test_stack_dump.py b/tests/unit/test_stack_dump.py
index 824e04c..13e83c6 100644
--- a/tests/unit/test_stack_dump.py
+++ b/tests/unit/test_stack_dump.py
@@ -31,4 +31,5 @@
 
         zuul.cmd.stack_dump_handler(signal.SIGUSR2, None)
         self.assertIn("Thread", self.log_fixture.output)
+        self.assertIn("MainThread", self.log_fixture.output)
         self.assertIn("test_stack_dump_logs", self.log_fixture.output)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index bfb9088..b30d710 100755
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -20,6 +20,7 @@
 import os
 import textwrap
 import gc
+import time
 from unittest import skip
 
 import testtools
@@ -1460,6 +1461,40 @@
                         "The file %s should exist" % post_flag_path)
 
 
+class TestPostPlaybooks(AnsibleZuulTestCase):
+    tenant_config_file = 'config/post-playbook/main.yaml'
+
+    def test_post_playbook_abort(self):
+        # Test that when we abort a job in the post playbook, that we
+        # don't send back POST_FAILURE.
+        self.executor_server.verbose = True
+        A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+        self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+        while not len(self.builds):
+            time.sleep(0.1)
+        build = self.builds[0]
+
+        post_start = os.path.join(self.test_root, build.uuid +
+                                  '.post_start.flag')
+        start = time.time()
+        while time.time() < start + 90:
+            if os.path.exists(post_start):
+                break
+            time.sleep(0.1)
+        # The post playbook has started, abort the job
+        self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
+        self.waitUntilSettled()
+
+        build = self.getJobFromHistory('python27')
+        self.assertEqual('ABORTED', build.result)
+
+        post_end = os.path.join(self.test_root, build.uuid +
+                                '.post_end.flag')
+        self.assertTrue(os.path.exists(post_start))
+        self.assertFalse(os.path.exists(post_end))
+
+
 class TestBrokenConfig(ZuulTestCase):
     # Test that we get an appropriate syntax error if we start with a
     # broken config.
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 1870890..86f7f12 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -23,6 +23,7 @@
 import signal
 import sys
 import traceback
+import threading
 
 yappi = extras.try_import('yappi')
 objgraph = extras.try_import('objgraph')
@@ -41,9 +42,17 @@
     log = logging.getLogger("zuul.stack_dump")
     log.debug("Beginning debug handler")
     try:
+        threads = {}
+        for t in threading.enumerate():
+            threads[t.ident] = t
         log_str = ""
         for thread_id, stack_frame in sys._current_frames().items():
-            log_str += "Thread: %s\n" % thread_id
+            thread = threads.get(thread_id)
+            if thread:
+                thread_name = thread.name
+            else:
+                thread_name = thread.ident
+            log_str += "Thread: %s %s\n" % (thread_id, thread_name)
             log_str += "".join(traceback.format_stack(stack_frame))
         log.debug(log_str)
     except Exception:
diff --git a/zuul/configloader.py b/zuul/configloader.py
index cee744d..426842b 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -1374,7 +1374,12 @@
             # branch.  Remember the branch and then implicitly add a
             # branch selector to each job there.  This makes the
             # in-repo configuration apply only to that branch.
-            for branch in project.source.getProjectBranches(project, tenant):
+            branches = sorted(project.source.getProjectBranches(
+                project, tenant))
+            if 'master' in branches:
+                branches.remove('master')
+                branches = ['master'] + branches
+            for branch in branches:
                 new_project_unparsed_branch_config[project][branch] = \
                     model.UnparsedTenantConfig()
                 job = merger.getFiles(
@@ -1567,6 +1572,7 @@
         # Don't call this method from dynamic reconfiguration because
         # it interacts with drivers and connections.
         layout = model.Layout(tenant)
+        TenantParser.log.debug("Created layout id %s", layout.uuid)
 
         TenantParser._parseLayoutItems(layout, tenant, data,
                                        scheduler, connections)
@@ -1695,6 +1701,7 @@
             self._loadDynamicProjectData(config, project, files, False, tenant)
 
         layout = model.Layout(tenant)
+        self.log.debug("Created layout id %s", layout.uuid)
         if not include_config_projects:
             # NOTE: the actual pipeline objects (complete with queues
             # and enqueued items) are copied by reference here.  This
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index ca10f21..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -68,6 +68,13 @@
     def registerScheduler(self, sched):
         self.sched = sched
 
+    def maintainCache(self, relevant):
+        """Make cache contain relevant changes.
+
+        This lets the user supply a list of change objects that are
+        still in use.  Anything in our cache that isn't in the supplied
+        list should be safe to remove from the cache."""
+
     def registerWebapp(self, webapp):
         self.webapp = webapp
 
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 343c305..83871e3 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,10 +23,9 @@
 import pprint
 import shlex
 import queue
-import weakref
+import voluptuous as v
 
 from typing import Dict, List
-import voluptuous as v
 
 from zuul.connection import BaseConnection
 from zuul.model import Ref, Tag, Branch, Project
@@ -143,8 +142,7 @@
         # cache as it may be a dependency
         if event.change_number:
             refresh = True
-            if ((event.change_number, event.patch_number) not in
-                self.connection._change_cache):
+            if event.change_number not in self.connection._change_cache:
                 refresh = False
                 for tenant in self.connection.sched.abide.tenants.values():
                     # TODO(fungi): it would be better to have some simple means
@@ -302,7 +300,7 @@
         self.baseurl = self.connection_config.get('baseurl',
                                                   'https://%s' % self.server)
 
-        self._change_cache = weakref.WeakValueDictionary()
+        self._change_cache = {}
         self.projects = {}
         self.gerrit_event_connector = None
         self.source = driver.getSource(self)
@@ -313,6 +311,22 @@
     def addProject(self, project: Project) -> None:
         self.projects[project.name] = project
 
+    def maintainCache(self, relevant):
+        # This lets the user supply a list of change objects that are
+        # still in use.  Anything in our cache that isn't in the supplied
+        # list should be safe to remove from the cache.
+        remove = {}
+        for change_number, patchsets in self._change_cache.items():
+            for patchset, change in patchsets.items():
+                if change not in relevant:
+                    remove.setdefault(change_number, [])
+                    remove[change_number].append(patchset)
+        for change_number, patchsets in remove.items():
+            for patchset in patchsets:
+                del self._change_cache[change_number][patchset]
+            if not self._change_cache[change_number]:
+                del self._change_cache[change_number]
+
     def getChange(self, event, refresh=False):
         if event.change_number:
             change = self._getChange(event.change_number, event.patch_number,
@@ -357,19 +371,22 @@
         return change
 
     def _getChange(self, number, patchset, refresh=False, history=None):
-        change = self._change_cache.get((number, patchset))
+        change = self._change_cache.get(number, {}).get(patchset)
         if change and not refresh:
             return change
         if not change:
             change = GerritChange(None)
             change.number = number
             change.patchset = patchset
-        self._change_cache[(change.number, change.patchset)] = change
+        self._change_cache.setdefault(change.number, {})
+        self._change_cache[change.number][change.patchset] = change
         try:
             self._updateChange(change, history)
         except Exception:
-            if self._change_cache.get((change.number, change.patchset)):
-                del self._change_cache[(change.number, change.patchset)]
+            if self._change_cache.get(change.number, {}).get(change.patchset):
+                del self._change_cache[change.number][change.patchset]
+                if not self._change_cache[change.number]:
+                    del self._change_cache[change.number]
             raise
         return change
 
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 46c8ee5..3d0eb37 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,7 +21,6 @@
 import threading
 import time
 import re
-import weakref
 
 import cachecontrol
 from cachecontrol.cache import DictCache
@@ -395,7 +394,7 @@
     def __init__(self, driver, connection_name, connection_config):
         super(GithubConnection, self).__init__(
             driver, connection_name, connection_config)
-        self._change_cache = weakref.WeakValueDictionary()
+        self._change_cache = {}
         self._project_branch_cache = {}
         self.projects = {}
         self.git_ssh_key = self.connection_config.get('sshkey')
@@ -568,6 +567,11 @@
         # authenticated, if not then anonymous is the best we have.
         return self._github
 
+    def maintainCache(self, relevant):
+        for key, change in self._change_cache.items():
+            if change not in relevant:
+                del self._change_cache[key]
+
     def getChange(self, event, refresh=False):
         """Get the change representing an event."""
 
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 0f8d7d7..ae22c8e 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -187,6 +187,7 @@
             and item.change.newrev != '0' * 40):
             zuul_params['newrev'] = item.change.newrev
         zuul_params['projects'] = []  # Set below
+        zuul_params['_projects'] = {}  # transitional to convert to dict
         zuul_params['items'] = []
         for i in all_items:
             d = dict()
@@ -270,14 +271,24 @@
                 projects.add(project)
 
         for p in projects:
-            zuul_params['projects'].append(dict(
+            zuul_params['_projects'][p.canonical_name] = (dict(
                 name=p.name,
                 short_name=p.name.split('/')[-1],
-                canonical_hostname=p.canonical_hostname,
+                # Duplicate this into the dict too, so that iterating
+                # project.values() is easier for callers
                 canonical_name=p.canonical_name,
+                canonical_hostname=p.canonical_hostname,
                 src_dir=os.path.join('src', p.canonical_name),
                 required=(p in required_projects),
             ))
+        # We are transitioning "projects" from a list to a dict
+        # indexed by canonical name, as it is much easier to access
+        # values in ansible.  Existing callers are converted to
+        # "_projects", then once "projects" is unused we switch it,
+        # then convert callers back.  Finally when "_projects" is
+        # unused it will be removed.
+        for cn, p in zuul_params['_projects'].items():
+            zuul_params['projects'].append(p)
 
         build = Build(job, uuid)
         build.parameters = params
@@ -379,6 +390,11 @@
             result_data = data.get('data', {})
             self.log.info("Build %s complete, result %s" %
                           (job, result))
+            # If the build should be retried, don't supply the result
+            # so that elsewhere we don't have to deal with keeping
+            # track of which results are non-final.
+            if build.retry:
+                result = None
             self.sched.onBuildCompleted(build, result, result_data)
             # The test suite expects the build to be removed from the
             # internal dict after it's added to the report queue.
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 5998922..e3f8a24 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -95,7 +95,7 @@
         if cache_dir == jobs_base:
             raise Exception("Cache dir and jobs dir cannot be the same")
         self.thread = threading.Thread(target=self._run,
-                                       name='executor-diskaccountant')
+                                       name='diskaccountant')
         self.thread.daemon = True
         self._running = False
         self.jobs_base = jobs_base
@@ -154,7 +154,7 @@
         self.function = function
         self.args = args
         self.thread = threading.Thread(target=self._run,
-                                       name='executor-watchdog')
+                                       name='watchdog')
         self.thread.daemon = True
         self.timed_out = None
 
@@ -556,7 +556,8 @@
 
     def run(self):
         self.running = True
-        self.thread = threading.Thread(target=self.execute)
+        self.thread = threading.Thread(target=self.execute,
+                                       name='build-%s' % self.job.unique)
         self.thread.start()
 
     def stop(self, reason=None):
@@ -834,6 +835,8 @@
             # TODOv3(pabelanger): Implement post-run timeout setting.
             post_status, post_code = self.runAnsiblePlaybook(
                 playbook, args['timeout'], success, phase='post', index=index)
+            if post_status == self.RESULT_ABORTED:
+                return 'ABORTED'
             if post_status != self.RESULT_NORMAL or post_code != 0:
                 success = False
                 # If we encountered a pre-failure, that takes
@@ -1645,22 +1648,27 @@
 
         self.log.debug("Starting command processor")
         self.command_socket.start()
-        self.command_thread = threading.Thread(target=self.runCommand)
+        self.command_thread = threading.Thread(target=self.runCommand,
+                                               name='command')
         self.command_thread.daemon = True
         self.command_thread.start()
 
         self.log.debug("Starting worker")
-        self.update_thread = threading.Thread(target=self._updateLoop)
+        self.update_thread = threading.Thread(target=self._updateLoop,
+                                              name='update')
         self.update_thread.daemon = True
         self.update_thread.start()
-        self.merger_thread = threading.Thread(target=self.run_merger)
+        self.merger_thread = threading.Thread(target=self.run_merger,
+                                              name='merger')
         self.merger_thread.daemon = True
         self.merger_thread.start()
-        self.executor_thread = threading.Thread(target=self.run_executor)
+        self.executor_thread = threading.Thread(target=self.run_executor,
+                                                name='executor')
         self.executor_thread.daemon = True
         self.executor_thread.start()
         self.governor_stop_event = threading.Event()
-        self.governor_thread = threading.Thread(target=self.run_governor)
+        self.governor_thread = threading.Thread(target=self.run_governor,
+                                                name='governor')
         self.governor_thread.daemon = True
         self.governor_thread.start()
         self.disk_accountant.start()
@@ -1869,7 +1877,10 @@
 
     def run_governor(self):
         while not self.governor_stop_event.wait(30):
-            self.manageLoad()
+            try:
+                self.manageLoad()
+            except Exception:
+                self.log.exception("Exception in governor thread:")
 
     def manageLoad(self):
         ''' Apply some heuristics to decide whether or not we should
diff --git a/zuul/lib/queue.py b/zuul/lib/queue.py
new file mode 100644
index 0000000..db8af47
--- /dev/null
+++ b/zuul/lib/queue.py
@@ -0,0 +1,78 @@
+# Copyright 2014 OpenStack Foundation
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import threading
+
+
+class MergedQueue(object):
+    def __init__(self):
+        self.queue = collections.deque()
+        self.lock = threading.RLock()
+        self.condition = threading.Condition(self.lock)
+        self.join_condition = threading.Condition(self.lock)
+        self.tasks = 0
+
+    def qsize(self):
+        return len(self.queue)
+
+    def empty(self):
+        return self.qsize() == 0
+
+    def put(self, item):
+        # Returns the original item if added, or an updated equivalent
+        # item if already enqueued.
+        self.condition.acquire()
+        ret = None
+        try:
+            for x in self.queue:
+                if item == x:
+                    ret = x
+                    if hasattr(ret, 'merge'):
+                        ret.merge(item)
+            if ret is None:
+                ret = item
+                self.queue.append(item)
+                self.condition.notify()
+        finally:
+            self.condition.release()
+        return ret
+
+    def get(self):
+        self.condition.acquire()
+        try:
+            while True:
+                try:
+                    ret = self.queue.popleft()
+                    self.join_condition.acquire()
+                    self.tasks += 1
+                    self.join_condition.release()
+                    return ret
+                except IndexError:
+                    self.condition.wait()
+        finally:
+            self.condition.release()
+
+    def task_done(self):
+        self.join_condition.acquire()
+        self.tasks -= 1
+        self.join_condition.notify()
+        self.join_condition.release()
+
+    def join(self):
+        self.join_condition.acquire()
+        while self.tasks:
+            self.join_condition.wait()
+        self.join_condition.release()
diff --git a/zuul/model.py b/zuul/model.py
index b7c3031..7d6e80c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -888,6 +888,8 @@
         return Attributes(name=self.name)
 
     def setRun(self):
+        msg = 'self %s' % (repr(self),)
+        self.inheritance_path = self.inheritance_path + (msg,)
         if not self.run:
             self.run = self.implied_run
 
@@ -1261,8 +1263,6 @@
         self.item = item
         self.builds = {}
         self.result = None
-        self.next_build_set = None
-        self.previous_build_set = None
         self.uuid = None
         self.commit = None
         self.dependent_items = None
@@ -1400,10 +1400,8 @@
         self.pipeline = queue.pipeline
         self.queue = queue
         self.change = change  # a ref
-        self.build_sets = []
         self.dequeued_needing_change = False
         self.current_build_set = BuildSet(self)
-        self.build_sets.append(self.current_build_set)
         self.item_ahead = None
         self.items_behind = []
         self.enqueue_time = None
@@ -1425,12 +1423,7 @@
             id(self), self.change, pipeline)
 
     def resetAllBuilds(self):
-        old = self.current_build_set
-        self.current_build_set.result = 'CANCELED'
         self.current_build_set = BuildSet(self)
-        old.next_build_set = self.current_build_set
-        self.current_build_set.previous_build_set = old
-        self.build_sets.append(self.current_build_set)
         self.layout = None
         self.job_graph = None
 
@@ -2327,6 +2320,7 @@
     """Holds all of the Pipelines."""
 
     def __init__(self, tenant):
+        self.uuid = uuid4().hex
         self.tenant = tenant
         self.project_configs = {}
         self.project_templates = {}
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 026763b..33b6723 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -31,6 +31,7 @@
 from zuul import version as zuul_version
 from zuul.lib.config import get_default
 from zuul.lib.statsd import get_statsd
+import zuul.lib.queue
 
 
 class ManagementEvent(object):
@@ -76,8 +77,23 @@
     """
     def __init__(self, tenant, project):
         super(TenantReconfigureEvent, self).__init__()
-        self.tenant = tenant
-        self.project = project
+        self.tenant_name = tenant.name
+        self.projects = set([project])
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __eq__(self, other):
+        if not isinstance(other, TenantReconfigureEvent):
+            return False
+        # We don't check projects because they will get combined when
+        # merged.
+        return (self.tenant_name == other.tenant_name)
+
+    def merge(self, other):
+        if self.tenant_name != other.tenant_name:
+            raise Exception("Can not merge events from different tenants")
+        self.projects |= other.projects
 
 
 class PromoteEvent(ManagementEvent):
@@ -223,7 +239,7 @@
 
         self.trigger_event_queue = queue.Queue()
         self.result_event_queue = queue.Queue()
-        self.management_event_queue = queue.Queue()
+        self.management_event_queue = zuul.lib.queue.MergedQueue()
         self.abide = model.Abide()
 
         if not testonly:
@@ -475,15 +491,16 @@
             self.log.debug("Tenant reconfiguration beginning")
             # If a change landed to a project, clear out the cached
             # config before reconfiguring.
-            if event.project:
-                event.project.unparsed_config = None
+            for project in event.projects:
+                project.unparsed_config = None
+            old_tenant = self.abide.tenants[event.tenant_name]
             loader = configloader.ConfigLoader()
             abide = loader.reloadTenant(
                 self.config.get('scheduler', 'tenant_config'),
                 self._get_project_key_dir(),
                 self, self.merger, self.connections,
-                self.abide, event.tenant)
-            tenant = abide.tenants[event.tenant.name]
+                self.abide, old_tenant)
+            tenant = abide.tenants[event.tenant_name]
             self._reconfigureTenant(tenant)
             self.abide = abide
         finally:
@@ -595,6 +612,8 @@
 
             self._reenqueueTenant(old_tenant, tenant)
 
+        # TODOv3(jeblair): update for tenants
+        # self.maintainConnectionCache()
         self.connections.reconfigureDrivers(tenant)
 
         # TODOv3(jeblair): remove postconfig calls?
@@ -726,6 +745,23 @@
             finally:
                 self.run_handler_lock.release()
 
+    def maintainConnectionCache(self):
+        # TODOv3(jeblair): update for tenants
+        relevant = set()
+        for tenant in self.abide.tenants.values():
+            for pipeline in tenant.layout.pipelines.values():
+                self.log.debug("Gather relevant cache items for: %s" %
+                               pipeline)
+
+                for item in pipeline.getAllItems():
+                    relevant.add(item.change)
+                    relevant.update(item.change.getRelatedChanges())
+        for connection in self.connections.values():
+            connection.maintainCache(relevant)
+            self.log.debug(
+                "End maintain connection cache for: %s" % connection)
+        self.log.debug("Connection cache size: %s" % len(relevant))
+
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
         event = self.trigger_event_queue.get()
@@ -920,6 +956,9 @@
         data['result_event_queue'] = {}
         data['result_event_queue']['length'] = \
             self.result_event_queue.qsize()
+        data['management_event_queue'] = {}
+        data['management_event_queue']['length'] = \
+            self.management_event_queue.qsize()
 
         if self.last_reconfigured:
             data['last_reconfigured'] = self.last_reconfigured * 1000