Revert "Use weakref for change cache"

This reverts commit b9704302bdb6857026319ac80df7b490a2282f89.

This is strongly suspected of causing a memory leak.

Change-Id: I0ebf9cee304277909a0b80420ac7ba659a437b29
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9090421..6efc43f 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -875,8 +875,7 @@
         # already (without approvals), we need to clear the cache
         # first.
         for connection in self.connections.connections.values():
-            if hasattr(connection, '_change_cache'):
-                connection._change_cache.clear()
+            connection.maintainCache([])
 
         self.executor_server.hold_jobs_in_build = True
         A.addApproval('Approved', 1)
@@ -946,8 +945,7 @@
 
         self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
         # there should still be changes in the cache
-        self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
-                            0)
+        self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
 
         self.executor_server.hold_jobs_in_build = False
         self.executor_server.release()
@@ -3933,8 +3931,7 @@
         self.assertEqual(B.data['status'], 'NEW')
 
         for connection in self.connections.connections.values():
-            if hasattr(connection, '_change_cache'):
-                connection._change_cache.clear()
+            connection.maintainCache([])
 
         self.executor_server.hold_jobs_in_build = True
         B.addApproval('Approved', 1)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index ca10f21..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -68,6 +68,13 @@
     def registerScheduler(self, sched):
         self.sched = sched
 
+    def maintainCache(self, relevant):
+        """Make cache contain relevant changes.
+
+        This lets the user supply a list of change objects that are
+        still in use.  Anything in our cache that isn't in the supplied
+        list should be safe to remove from the cache."""
+
     def registerWebapp(self, webapp):
         self.webapp = webapp
 
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 343c305..83871e3 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,10 +23,9 @@
 import pprint
 import shlex
 import queue
-import weakref
+import voluptuous as v
 
 from typing import Dict, List
-import voluptuous as v
 
 from zuul.connection import BaseConnection
 from zuul.model import Ref, Tag, Branch, Project
@@ -143,8 +142,7 @@
         # cache as it may be a dependency
         if event.change_number:
             refresh = True
-            if ((event.change_number, event.patch_number) not in
-                self.connection._change_cache):
+            if event.change_number not in self.connection._change_cache:
                 refresh = False
                 for tenant in self.connection.sched.abide.tenants.values():
                     # TODO(fungi): it would be better to have some simple means
@@ -302,7 +300,7 @@
         self.baseurl = self.connection_config.get('baseurl',
                                                   'https://%s' % self.server)
 
-        self._change_cache = weakref.WeakValueDictionary()
+        self._change_cache = {}
         self.projects = {}
         self.gerrit_event_connector = None
         self.source = driver.getSource(self)
@@ -313,6 +311,22 @@
     def addProject(self, project: Project) -> None:
         self.projects[project.name] = project
 
+    def maintainCache(self, relevant):
+        # This lets the user supply a list of change objects that are
+        # still in use.  Anything in our cache that isn't in the supplied
+        # list should be safe to remove from the cache.
+        remove = {}
+        for change_number, patchsets in self._change_cache.items():
+            for patchset, change in patchsets.items():
+                if change not in relevant:
+                    remove.setdefault(change_number, [])
+                    remove[change_number].append(patchset)
+        for change_number, patchsets in remove.items():
+            for patchset in patchsets:
+                del self._change_cache[change_number][patchset]
+            if not self._change_cache[change_number]:
+                del self._change_cache[change_number]
+
     def getChange(self, event, refresh=False):
         if event.change_number:
             change = self._getChange(event.change_number, event.patch_number,
@@ -357,19 +371,22 @@
         return change
 
     def _getChange(self, number, patchset, refresh=False, history=None):
-        change = self._change_cache.get((number, patchset))
+        change = self._change_cache.get(number, {}).get(patchset)
         if change and not refresh:
             return change
         if not change:
             change = GerritChange(None)
             change.number = number
             change.patchset = patchset
-        self._change_cache[(change.number, change.patchset)] = change
+        self._change_cache.setdefault(change.number, {})
+        self._change_cache[change.number][change.patchset] = change
         try:
             self._updateChange(change, history)
         except Exception:
-            if self._change_cache.get((change.number, change.patchset)):
-                del self._change_cache[(change.number, change.patchset)]
+            if self._change_cache.get(change.number, {}).get(change.patchset):
+                del self._change_cache[change.number][change.patchset]
+                if not self._change_cache[change.number]:
+                    del self._change_cache[change.number]
             raise
         return change
 
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 46c8ee5..3d0eb37 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,7 +21,6 @@
 import threading
 import time
 import re
-import weakref
 
 import cachecontrol
 from cachecontrol.cache import DictCache
@@ -395,7 +394,7 @@
     def __init__(self, driver, connection_name, connection_config):
         super(GithubConnection, self).__init__(
             driver, connection_name, connection_config)
-        self._change_cache = weakref.WeakValueDictionary()
+        self._change_cache = {}
         self._project_branch_cache = {}
         self.projects = {}
         self.git_ssh_key = self.connection_config.get('sshkey')
@@ -568,6 +567,11 @@
         # authenticated, if not then anonymous is the best we have.
         return self._github
 
+    def maintainCache(self, relevant):
+        for key, change in self._change_cache.items():
+            if change not in relevant:
+                del self._change_cache[key]
+
     def getChange(self, event, refresh=False):
         """Get the change representing an event."""
 
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 026763b..e5924f8 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -595,6 +595,8 @@
 
             self._reenqueueTenant(old_tenant, tenant)
 
+        # TODOv3(jeblair): update for tenants
+        # self.maintainConnectionCache()
         self.connections.reconfigureDrivers(tenant)
 
         # TODOv3(jeblair): remove postconfig calls?
@@ -726,6 +728,23 @@
             finally:
                 self.run_handler_lock.release()
 
+    def maintainConnectionCache(self):
+        # TODOv3(jeblair): update for tenants
+        relevant = set()
+        for tenant in self.abide.tenants.values():
+            for pipeline in tenant.layout.pipelines.values():
+                self.log.debug("Gather relevant cache items for: %s" %
+                               pipeline)
+
+                for item in pipeline.getAllItems():
+                    relevant.add(item.change)
+                    relevant.update(item.change.getRelatedChanges())
+        for connection in self.connections.values():
+            connection.maintainCache(relevant)
+            self.log.debug(
+                "End maintain connection cache for: %s" % connection)
+        self.log.debug("Connection cache size: %s" % len(relevant))
+
     def process_event_queue(self):
         self.log.debug("Fetching trigger event")
         event = self.trigger_event_queue.get()