185 lines
5.6 KiB
Python
185 lines
5.6 KiB
Python
# The MIT License (MIT)
|
|
#
|
|
# Copyright (c) 2014-2022 Thomas Kemmer
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy of
|
|
# this software and associated documentation files (the "Software"), to deal in
|
|
# the Software without restriction, including without limitation the rights to
|
|
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
|
# the Software, and to permit persons to whom the Software is furnished to do so,
|
|
# subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
# copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
|
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
|
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
|
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
|
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
#
|
|
# -----
|
|
#
|
|
# This is a stripped down LRU-cache from the "cachetools" library.
|
|
# https://github.com/tkem/cachetools/blob/d991ac71b4eb6394be5ec572b835434081393215/src/cachetools/__init__.py
|
|
|
|
import collections
|
|
import collections.abc
|
|
from typing import TypeVar, Dict
|
|
|
|
|
|
class _DefaultSize:
|
|
|
|
__slots__ = ()
|
|
|
|
def __getitem__(self, _):
|
|
return 1
|
|
|
|
def __setitem__(self, _, value):
|
|
assert value == 1
|
|
|
|
def pop(self, _):
|
|
return 1
|
|
|
|
|
|
_KT = TypeVar("_KT")
|
|
_VT = TypeVar("_VT")
|
|
class Cache(collections.abc.MutableMapping[_KT, _VT]):
|
|
"""Mutable mapping to serve as a simple cache or cache base class."""
|
|
|
|
__marker = object()
|
|
|
|
__size = _DefaultSize()
|
|
|
|
def __init__(self, maxsize: int, getsizeof=None):
|
|
if getsizeof:
|
|
self.getsizeof = getsizeof
|
|
if self.getsizeof is not Cache.getsizeof:
|
|
self.__size = dict()
|
|
self.__data = dict() # type: Dict[_KT, _VT]
|
|
self.__currsize = 0
|
|
self.__maxsize = maxsize
|
|
|
|
def __repr__(self):
|
|
return "%s(%s, maxsize=%r, currsize=%r)" % (
|
|
self.__class__.__name__,
|
|
repr(self.__data),
|
|
self.__maxsize,
|
|
self.__currsize,
|
|
)
|
|
|
|
def __getitem__(self, key: _KT) -> _VT:
|
|
try:
|
|
return self.__data[key]
|
|
except KeyError:
|
|
return self.__missing__(key)
|
|
|
|
def __setitem__(self, key: _KT, value: _VT) -> None:
|
|
maxsize = self.__maxsize
|
|
size = self.getsizeof(value)
|
|
if size > maxsize:
|
|
raise ValueError("value too large")
|
|
if key not in self.__data or self.__size[key] < size:
|
|
while self.__currsize + size > maxsize:
|
|
self.popitem()
|
|
if key in self.__data:
|
|
diffsize = size - self.__size[key]
|
|
else:
|
|
diffsize = size
|
|
self.__data[key] = value
|
|
self.__size[key] = size
|
|
self.__currsize += diffsize
|
|
|
|
def __delitem__(self, key: _KT) -> None:
|
|
size = self.__size.pop(key)
|
|
del self.__data[key]
|
|
self.__currsize -= size
|
|
|
|
def __contains__(self, key: _KT) -> bool:
|
|
return key in self.__data
|
|
|
|
def __missing__(self, key: _KT):
|
|
raise KeyError(key)
|
|
|
|
def __iter__(self):
|
|
return iter(self.__data)
|
|
|
|
def __len__(self):
|
|
return len(self.__data)
|
|
|
|
def get(self, key: _KT, default: _VT = None) -> _VT | None:
|
|
if key in self:
|
|
return self[key]
|
|
else:
|
|
return default
|
|
|
|
def pop(self, key: _KT, default=__marker) -> _VT:
|
|
if key in self:
|
|
value = self[key]
|
|
del self[key]
|
|
elif default is self.__marker:
|
|
raise KeyError(key)
|
|
else:
|
|
value = default
|
|
return value
|
|
|
|
def setdefault(self, key: _KT, default: _VT = None) -> _VT | None:
|
|
if key in self:
|
|
value = self[key]
|
|
else:
|
|
self[key] = value = default
|
|
return value
|
|
|
|
@property
|
|
def maxsize(self) -> int:
|
|
"""The maximum size of the cache."""
|
|
return self.__maxsize
|
|
|
|
@property
|
|
def currsize(self) -> int:
|
|
"""The current size of the cache."""
|
|
return self.__currsize
|
|
|
|
@staticmethod
|
|
def getsizeof(value) -> int:
|
|
"""Return the size of a cache element's value."""
|
|
return 1
|
|
|
|
|
|
class LRUCache(Cache[_KT, _VT]):
|
|
"""Least Recently Used (LRU) cache implementation."""
|
|
|
|
def __init__(self, maxsize: int, getsizeof=None):
|
|
Cache.__init__(self, maxsize, getsizeof)
|
|
self.__order = collections.OrderedDict()
|
|
|
|
def __getitem__(self, key: _KT, cache_getitem=Cache.__getitem__) -> _VT | None:
|
|
value = cache_getitem(self, key)
|
|
if key in self: # __missing__ may not store item
|
|
self.__update(key)
|
|
return value
|
|
|
|
def __setitem__(self, key: _KT, value, cache_setitem=Cache.__setitem__) -> None:
|
|
cache_setitem(self, key, value)
|
|
self.__update(key)
|
|
|
|
def __delitem__(self, key: _KT, cache_delitem=Cache.__delitem__) -> None:
|
|
cache_delitem(self, key)
|
|
del self.__order[key]
|
|
|
|
def popitem(self) -> tuple[_KT, _VT]:
|
|
"""Remove and return the `(key, value)` pair least recently used."""
|
|
try:
|
|
key = next(iter(self.__order))
|
|
except StopIteration:
|
|
raise KeyError("%s is empty" % type(self).__name__) from None
|
|
else:
|
|
return (key, self.pop(key))
|
|
|
|
def __update(self, key: _KT) -> None:
|
|
try:
|
|
self.__order.move_to_end(key)
|
|
except KeyError:
|
|
self.__order[key] = None
|