Use weakref for change cache
The change cache is used to holf references to active Change objects
so that if something about them changes, the connection driver can
immediately update the data in memory and all enqueued items, etc,
will reflect the new data (because each logical change has a single
Change object that they reference).
However, once these objects are no longer referenced by any items
in pipelines or pending events, we don't need to keep them in the
cache. We used to manually prune the cache, but that was difficult
and we stopped doing it some time ago. Currently, this is a slow
but intentional memory leak.
Instead, let the python GC handle it for us by using weak references
to the Change objects in the cache. Once the only references to
these objects are the cache itself, they will be discarded.
Change-Id: Id659888a41dd60b7e72e891a47f1f742c30d0cc4
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 3203960..b38b64a 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -865,7 +865,8 @@
# already (without approvals), we need to clear the cache
# first.
for connection in self.connections.connections.values():
- connection.maintainCache([])
+ if hasattr(connection, '_change_cache'):
+ connection._change_cache.clear()
self.executor_server.hold_jobs_in_build = True
A.addApproval('Approved', 1)
@@ -935,7 +936,8 @@
self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
# there should still be changes in the cache
- self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
+ self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
+ 0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@@ -3897,7 +3899,8 @@
self.assertEqual(B.data['status'], 'NEW')
for connection in self.connections.connections.values():
- connection.maintainCache([])
+ if hasattr(connection, '_change_cache'):
+ connection._change_cache.clear()
self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1)
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 4fb49e3..08bc53e 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -71,13 +71,6 @@
def registerScheduler(self, sched):
self.sched = sched
- def maintainCache(self, relevant):
- """Make cache contain relevant changes.
-
- This lets the user supply a list of change objects that are
- still in use. Anything in our cache that isn't in the supplied
- list should be safe to remove from the cache."""
-
def registerWebapp(self, webapp):
self.webapp = webapp
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 83871e3..343c305 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -23,9 +23,10 @@
import pprint
import shlex
import queue
-import voluptuous as v
+import weakref
from typing import Dict, List
+import voluptuous as v
from zuul.connection import BaseConnection
from zuul.model import Ref, Tag, Branch, Project
@@ -142,7 +143,8 @@
# cache as it may be a dependency
if event.change_number:
refresh = True
- if event.change_number not in self.connection._change_cache:
+ if ((event.change_number, event.patch_number) not in
+ self.connection._change_cache):
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -300,7 +302,7 @@
self.baseurl = self.connection_config.get('baseurl',
'https://%s' % self.server)
- self._change_cache = {}
+ self._change_cache = weakref.WeakValueDictionary()
self.projects = {}
self.gerrit_event_connector = None
self.source = driver.getSource(self)
@@ -311,22 +313,6 @@
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
- def maintainCache(self, relevant):
- # This lets the user supply a list of change objects that are
- # still in use. Anything in our cache that isn't in the supplied
- # list should be safe to remove from the cache.
- remove = {}
- for change_number, patchsets in self._change_cache.items():
- for patchset, change in patchsets.items():
- if change not in relevant:
- remove.setdefault(change_number, [])
- remove[change_number].append(patchset)
- for change_number, patchsets in remove.items():
- for patchset in patchsets:
- del self._change_cache[change_number][patchset]
- if not self._change_cache[change_number]:
- del self._change_cache[change_number]
-
def getChange(self, event, refresh=False):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number,
@@ -371,22 +357,19 @@
return change
def _getChange(self, number, patchset, refresh=False, history=None):
- change = self._change_cache.get(number, {}).get(patchset)
+ change = self._change_cache.get((number, patchset))
if change and not refresh:
return change
if not change:
change = GerritChange(None)
change.number = number
change.patchset = patchset
- self._change_cache.setdefault(change.number, {})
- self._change_cache[change.number][change.patchset] = change
+ self._change_cache[(change.number, change.patchset)] = change
try:
self._updateChange(change, history)
except Exception:
- if self._change_cache.get(change.number, {}).get(change.patchset):
- del self._change_cache[change.number][change.patchset]
- if not self._change_cache[change.number]:
- del self._change_cache[change.number]
+ if self._change_cache.get((change.number, change.patchset)):
+ del self._change_cache[(change.number, change.patchset)]
raise
return change
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 3d0eb37..46c8ee5 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -21,6 +21,7 @@
import threading
import time
import re
+import weakref
import cachecontrol
from cachecontrol.cache import DictCache
@@ -394,7 +395,7 @@
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
- self._change_cache = {}
+ self._change_cache = weakref.WeakValueDictionary()
self._project_branch_cache = {}
self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey')
@@ -567,11 +568,6 @@
# authenticated, if not then anonymous is the best we have.
return self._github
- def maintainCache(self, relevant):
- for key, change in self._change_cache.items():
- if change not in relevant:
- del self._change_cache[key]
-
def getChange(self, event, refresh=False):
"""Get the change representing an event."""
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index cfcd865..258d57d 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -595,8 +595,6 @@
self._reenqueueTenant(old_tenant, tenant)
- # TODOv3(jeblair): update for tenants
- # self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
@@ -728,23 +726,6 @@
finally:
self.run_handler_lock.release()
- def maintainConnectionCache(self):
- # TODOv3(jeblair): update for tenants
- relevant = set()
- for tenant in self.abide.tenants.values():
- for pipeline in tenant.layout.pipelines.values():
- self.log.debug("Gather relevant cache items for: %s" %
- pipeline)
-
- for item in pipeline.getAllItems():
- relevant.add(item.change)
- relevant.update(item.change.getRelatedChanges())
- for connection in self.connections.values():
- connection.maintainCache(relevant)
- self.log.debug(
- "End maintain connection cache for: %s" % connection)
- self.log.debug("Connection cache size: %s" % len(relevant))
-
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()