Revert "Use weakref for change cache"
This reverts commit b9704302bdb6857026319ac80df7b490a2282f89.
This is strongly suspected of causing a memory leak.
Change-Id: I0ebf9cee304277909a0b80420ac7ba659a437b29
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9090421..6efc43f 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -875,8 +875,7 @@
# already (without approvals), we need to clear the cache
# first.
for connection in self.connections.connections.values():
- if hasattr(connection, '_change_cache'):
- connection._change_cache.clear()
+ connection.maintainCache([])
self.executor_server.hold_jobs_in_build = True
A.addApproval('Approved', 1)
@@ -946,8 +945,7 @@
self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
# there should still be changes in the cache
- self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
- 0)
+ self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@@ -3933,8 +3931,7 @@
self.assertEqual(B.data['status'], 'NEW')
for connection in self.connections.connections.values():
- if hasattr(connection, '_change_cache'):
- connection._change_cache.clear()
+ connection.maintainCache([])
self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index ca10f21..b44fa46 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -68,6 +68,13 @@
def registerScheduler(self, sched):
self.sched = sched
+ def maintainCache(self, relevant):
+ """Make cache contain relevant changes.
+
+ This lets the user supply a list of change objects that are
+ still in use. Anything in our cache that isn't in the supplied
+ list should be safe to remove from the cache."""
+
def registerWebapp(self, webapp):
self.webapp = webapp
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 343c305..83871e3 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,10 +23,9 @@
import pprint
import shlex
import queue
-import weakref
+import voluptuous as v
from typing import Dict, List
-import voluptuous as v
from zuul.connection import BaseConnection
from zuul.model import Ref, Tag, Branch, Project
@@ -143,8 +142,7 @@
# cache as it may be a dependency
if event.change_number:
refresh = True
- if ((event.change_number, event.patch_number) not in
- self.connection._change_cache):
+ if event.change_number not in self.connection._change_cache:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -302,7 +300,7 @@
self.baseurl = self.connection_config.get('baseurl',
'https://%s' % self.server)
- self._change_cache = weakref.WeakValueDictionary()
+ self._change_cache = {}
self.projects = {}
self.gerrit_event_connector = None
self.source = driver.getSource(self)
@@ -313,6 +311,22 @@
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
+ def maintainCache(self, relevant):
+ # This lets the user supply a list of change objects that are
+ # still in use. Anything in our cache that isn't in the supplied
+ # list should be safe to remove from the cache.
+ remove = {}
+ for change_number, patchsets in self._change_cache.items():
+ for patchset, change in patchsets.items():
+ if change not in relevant:
+ remove.setdefault(change_number, [])
+ remove[change_number].append(patchset)
+ for change_number, patchsets in remove.items():
+ for patchset in patchsets:
+ del self._change_cache[change_number][patchset]
+ if not self._change_cache[change_number]:
+ del self._change_cache[change_number]
+
def getChange(self, event, refresh=False):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number,
@@ -357,19 +371,22 @@
return change
def _getChange(self, number, patchset, refresh=False, history=None):
- change = self._change_cache.get((number, patchset))
+ change = self._change_cache.get(number, {}).get(patchset)
if change and not refresh:
return change
if not change:
change = GerritChange(None)
change.number = number
change.patchset = patchset
- self._change_cache[(change.number, change.patchset)] = change
+ self._change_cache.setdefault(change.number, {})
+ self._change_cache[change.number][change.patchset] = change
try:
self._updateChange(change, history)
except Exception:
- if self._change_cache.get((change.number, change.patchset)):
- del self._change_cache[(change.number, change.patchset)]
+ if self._change_cache.get(change.number, {}).get(change.patchset):
+ del self._change_cache[change.number][change.patchset]
+ if not self._change_cache[change.number]:
+ del self._change_cache[change.number]
raise
return change
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 46c8ee5..3d0eb37 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,7 +21,6 @@
import threading
import time
import re
-import weakref
import cachecontrol
from cachecontrol.cache import DictCache
@@ -395,7 +394,7 @@
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
- self._change_cache = weakref.WeakValueDictionary()
+ self._change_cache = {}
self._project_branch_cache = {}
self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey')
@@ -568,6 +567,11 @@
# authenticated, if not then anonymous is the best we have.
return self._github
+ def maintainCache(self, relevant):
+ for key, change in self._change_cache.items():
+ if change not in relevant:
+ del self._change_cache[key]
+
def getChange(self, event, refresh=False):
"""Get the change representing an event."""
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 026763b..e5924f8 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -595,6 +595,8 @@
self._reenqueueTenant(old_tenant, tenant)
+ # TODOv3(jeblair): update for tenants
+ # self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
@@ -726,6 +728,23 @@
finally:
self.run_handler_lock.release()
+ def maintainConnectionCache(self):
+ # TODOv3(jeblair): update for tenants
+ relevant = set()
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ self.log.debug("Gather relevant cache items for: %s" %
+ pipeline)
+
+ for item in pipeline.getAllItems():
+ relevant.add(item.change)
+ relevant.update(item.change.getRelatedChanges())
+ for connection in self.connections.values():
+ connection.maintainCache(relevant)
+ self.log.debug(
+ "End maintain connection cache for: %s" % connection)
+ self.log.debug("Connection cache size: %s" % len(relevant))
+
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()