from __future__ import annotations
import json
import logging
from asyncio import iscoroutinefunction, get_event_loop
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import auto, StrEnum, IntEnum
from functools import wraps, partial
from hashlib import sha256
from os import makedirs, unlink
from os.path import exists, isfile, join
from timeit import default_timer
from typing import (
Callable,
Dict,
List,
Optional,
TYPE_CHECKING,
Union,
)
from zlib import compress, decompress
from dateutil import parser
from pymemcache import MemcacheError
from pymemcache.client.base import PooledClient
from q2_sdk.core import security
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import MockTestError
from q2_sdk.core.non_http_handlers.udp import writer as udp_writer
from q2_sdk.core.prometheus import MetricType, get_metric
from q2_sdk.models.unique_stack import ForkedUniqueStack
from q2_sdk.tools import utils
if TYPE_CHECKING:
from q2_sdk.tools.testing.models import CacheMockValues
COMPRESS_DATA = settings.CACHE.get("COMPRESS_DATA")
ENCRYPTION_KEY = settings.CACHE.get("ENCRYPTION_KEY")
ENCRYPTED_DATA_CONST = "<encrypted_data>"
[docs]
class CacheReturnCode(IntEnum):
String = 1
Json = 2
Integer = 3
Encrypted = 4
[docs]
class CacheConfigError(Exception):
"""Bad configs to instantiate Q2CacheClient"""
def _json_serializer(key, value, encryption_key):
return_val, return_code = serialize_as_json(value, COMPRESS_DATA, encryption_key)
return return_val, return_code
def serialize_as_json(value, compress_data: bool, encryption_key: Optional[bytes]):
if isinstance(value, str):
return_val = value
return_code = CacheReturnCode.String
elif isinstance(value, int):
return_val = value
return_code = CacheReturnCode.Integer
else:
return_val = json.dumps(value)
return_code = CacheReturnCode.Json
if encryption_key:
return_val = security.encrypt(return_val, encryption_key)
return_code = CacheReturnCode.Encrypted
if compress_data is True and return_code not in [
CacheReturnCode.Integer,
CacheReturnCode.Encrypted,
]:
return_val = compress(return_val.encode())
return return_val, return_code
def _json_deserializer(key, value, flags, encryption_key):
return_val = deserialize_json(value, flags, COMPRESS_DATA, encryption_key)
return return_val
[docs]
def deserialize_json(
value, flags: CacheReturnCode, compress_data: bool, encryption_key: Optional[bytes]
):
"""
:param value: Raw value to deserialize
:param flags: 1=str, 2=json, 3=int, 4=encrypted
:param compress_data: is_zlib compressed
:param encryption_key: If present, will decrypt with security module
"""
assert flags in [x.value for x in CacheReturnCode], "Unknown serialization format"
if encryption_key:
value = security.decrypt(value, encryption_key)
elif (
flags == CacheReturnCode.Encrypted
): # Looking up encrypted data without the key will result in gibberish
value = ENCRYPTED_DATA_CONST
if compress_data is True and flags not in [
CacheReturnCode.Integer,
CacheReturnCode.Encrypted,
]:
value = decompress(value)
if isinstance(value, bytes):
value = value.decode()
if flags == CacheReturnCode.Json:
return json.loads(value)
elif flags == CacheReturnCode.Integer:
return int(value)
return value
[docs]
class RecentKeysStack(ForkedUniqueStack):
"""
Configure RecentKeys to work with forked mode
"""
UDP_MESSAGE_TYPE = udp_writer.MsgType.Cache
def __init__(self, max_size, seed_from_file=False):
super().__init__(max_size)
if settings.DEBUG and seed_from_file and settings.TEST_MODE is False:
local_keys = LocalCache().update_cache_file(LocalCacheAction.READ)
self.extend(local_keys)
[docs]
def append(self, item: RecentKey):
assert isinstance(item, RecentKey)
super().append(item)
def replace(self, new: list[RecentKey]):
assert isinstance(new, list)
if len(new) > 0:
assert isinstance(new[0], RecentKey)
self.clear()
self.extend(new)
[docs]
def remove_safe(self, item: RecentKey):
"""Same as .remove but will not except if it doesn't exist"""
if item in self:
self.remove(item)
[docs]
@dataclass
class RecentKey:
key: str
prefix: str = ""
is_encrypted: bool = False
expire_time: Optional[datetime] = field(compare=False, default=None)
def __post_init__(self):
if isinstance(self.expire_time, str):
self.expire_time = parser.parse(self.expire_time)
@property
def is_expired(self):
if self.expire_time:
return self.expire_time < datetime.now()
return False
@property
def full_key(self):
return self.prefix + self.key
def as_dict(self):
expire_time = None
if self.expire_time:
expire_time = self.expire_time.strftime("%m/%d/%Y, %H:%M:%S")
return {
"key": self.key,
"prefix": self.prefix,
"is_encrypted": self.is_encrypted,
"expire_time": expire_time,
}
[docs]
class Q2CacheClient(PooledClient):
"""
Same interface as pymemcache base class, but remembers the
keys being requested, which makes it easy to bust the cache
Parent docs: https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html#module-pymemcache.client.base
"""
AVAILABLE_VERBS = ("get", "set", "get_many", "set_many", "delete_many", "flush_all")
WARNING_MSG = (
'Cache entry "%s" will not come back in the same shape you put it in. '
"Please ensure all keys are strings"
)
def __init__(
self,
*args,
logger=None,
prefix=None,
local_path=None,
encryption_key=None,
**kwargs,
):
if settings.TEST_MODE:
from q2_sdk.tools.testing.models import CacheMock
if not isinstance(self, CacheMock):
raise MockTestError(
"Please use ``CacheMock`` or ``q2_cache`` fixture for testing"
)
if prefix is None:
prefix = ""
else:
if not prefix.endswith(":"):
prefix = prefix + ":"
if kwargs.get("key_prefix"):
kwargs["key_prefix"] += prefix
self.prefix = prefix
super().__init__(*args, **kwargs)
if not logger:
logger = logging.getLogger()
logger.setLevel(settings.LOGGING_LEVEL)
self.logger = logger
self.local_path = local_path
self.encryption_key = encryption_key
self.recent_keys = RECENT_KEYS
def _call(self, func: partial, verb):
assert verb in self.AVAILABLE_VERBS, "Verb must be in %s" % str(
self.AVAILABLE_VERBS
)
try:
start_time = default_timer()
response = func()
end_time = default_timer()
get_metric(
MetricType.Histogram,
"caliper_cache_requests",
"Memcached traffic",
{"method": verb},
chain={"op": "observe", "params": [end_time - start_time]},
)
return response
except ConnectionRefusedError:
self.logger.error(
'ConnectionRefusedError: Please start memcached by either using caliper_admin or by "memcached -d" command'
)
[docs]
def get(self, key: str, default=None, **kwargs):
"""
:param key: Will prepend CACHE['PREFIX'] from settings file
:param default: Returned if the key was not found
"""
func = partial(super().get, key)
response = self._call(func, "get")
if response is None:
if callable(default):
default = default()
response = default
if self.encryption_key:
is_encrypted = True
else:
is_encrypted = response == ENCRYPTED_DATA_CONST
recent_key = RecentKey(key, self.prefix, is_encrypted=is_encrypted)
if response:
self.recent_keys.append(recent_key)
elif recent_key in self.recent_keys:
self.recent_keys.remove(recent_key)
return response
[docs]
def get_many(self, keys: List[str]):
"""
:param keys: Will prepend CACHE['PREFIX'] from settings file to all
"""
func = partial(super().get_many, keys)
response = self._call(func, "get_many")
if not settings.DEBUG:
for key, value in response.items():
if self.encryption_key:
is_encrypted = True
else:
is_encrypted = value == ENCRYPTED_DATA_CONST
recent_key = RecentKey(key, self.prefix, is_encrypted=is_encrypted)
self.recent_keys.append(recent_key)
for key in keys:
if key not in response:
recent_key = RecentKey(key, self.prefix)
if recent_key in RECENT_KEYS:
self.recent_keys.remove(recent_key)
return response
def _check_integrity(self, key, value):
if isinstance(value, dict):
serialized = serialize_as_json(value, False, None)
if deserialize_json(serialized[0], 2, False, None) != value:
self.logger.warning(self.WARNING_MSG, key)
[docs]
def set(self, key: str, value: str | dict | int | list, expire=0, noreply=None):
"""
:param key: Will prepend CACHE['PREFIX'] from settings file
:param value: Automatically compressed
:param expire: In seconds. 0 for no expiry (the default)
:param noreply: If False, will wait for Memcached to respond. Typically you will want to leave this alone.
"""
self._check_integrity(key, value)
func = partial(super().set, key, value, expire=expire, noreply=noreply)
result = self._call(func, "set")
if settings.DEBUG:
expire_time = None
if expire:
expire_time = (datetime.now() + timedelta(seconds=expire)).strftime(
"%m/%d/%Y, %H:%M:%S"
)
key_dict = {
key: {
"expire_time": expire_time,
"is_encrypted": self.encryption_key is not None,
"prefix": self.prefix,
}
}
local_cache = LocalCache(self.logger, self.local_path)
local_keys = local_cache.update_cache_file(
action=LocalCacheAction.SET, key_dict=key_dict
)
self.recent_keys.replace(local_keys)
else:
self.recent_keys.append(
RecentKey(
key, self.prefix, is_encrypted=self.encryption_key is not None
)
)
return result
[docs]
def set_and_return(
self, key: str, value: str | dict | int | list, expire=0, noreply=None
):
"""
Simple wrapper to set that returns the value and raises a MemcacheError if the result is not True
:param key: Will prepend CACHE['PREFIX'] from settings file
:param value: Automatically compressed
:param expire: In seconds. 0 for no expiry (the default)
:param noreply: If False, will wait for Memcached to respond.
Typically, you will want to leave this alone.
"""
if callable(value):
value = value()
result = self.set(key, value, expire=expire, noreply=noreply)
if result is not True:
raise MemcacheError("Failed to set key %s" % key)
return value
[docs]
def set_many(
self,
values: Dict[str, str] | Dict[str, dict] | Dict[str, int] | Dict[str, list],
expire=0,
noreply=None,
):
"""
A convenience function for setting multiple values.
:param values: {key: value, key2: value2}
:param expire: In seconds. 0 for no expiry (the default)
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
for key, value in values.items():
self._check_integrity(key, value)
func = partial(super().set_many, values, expire=expire, noreply=noreply)
result = self._call(func, "set_many")
key_values = []
for key in values.keys():
key = self.prefix + key
key_values.append(key)
self.recent_keys.replace([RecentKey(key, self.prefix) for key in values.keys()])
return result
[docs]
def delete_many(self, keys: List[str], noreply=None):
"""
A convenience function to delete multiple keys.
:param key: Will prepend CACHE['PREFIX'] from settings file
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
func = partial(super().delete_many, keys, noreply)
result = self._call(func, "delete_many")
if settings.DEBUG:
local_cache = LocalCache(self.logger, self.local_path)
local_keys = local_cache.update_cache_file(
action=LocalCacheAction.UPDATE, key_dict=keys, cache_obj=self
)
self.recent_keys.replace(local_keys)
for key in keys:
self.recent_keys.remove_safe(RecentKey(key, self.prefix))
return result
[docs]
def flush_all(self, delay=0, noreply=None):
"""
A convenience function to clear all cached keys.
:param delay: optional int, the number of seconds to wait before flushing,
or zero to flush immediately (the default).
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
if settings.DEBUG:
func = partial(super().flush_all, delay, noreply=noreply)
self._call(func, "flush_all")
local_cache = LocalCache(self.logger, self.local_path)
local_keys = local_cache.update_cache_file(action=LocalCacheAction.CLEAR)
self.recent_keys.replace(local_keys)
elif self.recent_keys:
self.recent_keys.clear()
return True
[docs]
async def get_async(self, key: str, default=None, **kwargs):
"""
Same as get but will run in a separate thread
:param key: Will prepend CACHE['PREFIX'] from settings file
:param default: Returned if the key was not found
"""
func = partial(self.get, key)
response = await get_event_loop().run_in_executor(None, func)
if response is None:
if callable(default):
if iscoroutinefunction(default):
default = await default()
else:
default = default()
response = default
return response
[docs]
async def get_many_async(self, keys: List[str]):
"""
Same as get_many but will run in a separate thread
:param keys: Will prepend CACHE['PREFIX'] from settings file to all
"""
func = partial(self.get_many, keys)
return await get_event_loop().run_in_executor(None, func)
[docs]
async def set_async(
self, key: str, value: str | dict | int | list, expire=0, noreply=None
):
"""
Same as set but will run in a separate thread
:param key: Will prepend CACHE['PREFIX'] from settings file
:param value: Automatically compressed
:param expire: In seconds. 0 for no expiry (the default)
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
func = partial(self.set, key, value, expire=expire, noreply=noreply)
return await get_event_loop().run_in_executor(None, func)
[docs]
async def set_and_return_async(
self, key: str, value: str | dict | int | list | partial, expire=0, noreply=None
):
"""
Simple wrapper to set that returns the value and raises a MemcacheError if the result is not True
:param key: Will prepend CACHE['PREFIX'] from settings file
:param value: Automatically compressed
:param expire: In seconds. 0 for no expiry (the default)
:param noreply: If False, will wait for Memcached to respond.
Typically, you will want to leave this alone.
"""
if callable(value):
if iscoroutinefunction(value):
value = await value()
else:
value = value()
result = await self.set_async(key, value, expire=expire, noreply=noreply)
if result is not True:
raise MemcacheError("Failed to set key %s" % key)
return value
[docs]
async def set_many_async(
self, values: Dict[str, str | dict | int | list], expire=0, noreply=None
):
"""
Same as set_many but will run in a separate thread
:param values: {key: value, key2: value2}
:param expire: In seconds. 0 for no expiry (the default)
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
func = partial(self.set_many, values, expire=expire, noreply=noreply)
return await get_event_loop().run_in_executor(None, func)
[docs]
async def delete_many_async(self, keys: List[str], noreply=None):
"""
Same as delete_many but will run in a separate thread
:param key: Will prepend CACHE['PREFIX'] from settings file
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
func = partial(self.delete_many, keys, noreply=noreply)
return await get_event_loop().run_in_executor(None, func)
[docs]
async def flush_all_async(self, delay=0, noreply=None):
"""
Same as flush_all but will run in a separate thread
:param delay: optional int, the number of seconds to wait before flushing,
or zero to flush immediately (the default).
:param noreply: If False, will wait for Memcached to respond.
Typically you will want to leave this alone.
"""
func = partial(self.flush_all, delay, noreply=noreply)
return await get_event_loop().run_in_executor(None, func)
[docs]
class LocalCacheAction(StrEnum):
READ = auto()
SET = auto()
CLEAR = auto()
UPDATE = auto()
class LocalCache:
def __init__(self, logger=None, file_path=""):
self.file_path = file_path
if not file_path:
self.file_path = join(
utils.get_repo_root(), ".antilles", "recent_cache.json"
)
if not logger:
logger = logging.getLogger()
logger.setLevel(settings.LOGGING_LEVEL)
self.logger = logger
self.recent_keys: RecentKeysStack = RecentKeysStack(
settings.CACHE["KEYS_TO_REMEMBER"]
)
def update_cache_file(
self,
action: LocalCacheAction,
key_dict: Optional[dict] = None,
cache_obj: Optional[Q2CacheClient] = None,
) -> list[RecentKey]:
if not key_dict:
key_dict = {}
self._validate_file_existence()
with open(self.file_path, "r") as handle:
file_data = []
try:
json_data = json.load(handle)
except json.JSONDecodeError:
json_data = []
if isinstance(json_data, dict):
json_data = []
for item in json_data:
file_data.append(
RecentKey(
item["key"],
item.get("prefix", ""),
is_encrypted=item.get("is_encrypted"),
expire_time=item.get("expire_time"),
)
)
match action: # noqa: E999
case LocalCacheAction.READ:
file_data = self._read_cached_keys(file_data)
return file_data
case LocalCacheAction.SET:
file_data = self._add_cached_keys(key_dict, file_data)
case LocalCacheAction.CLEAR:
file_data = []
case LocalCacheAction.UPDATE:
file_data = self._refresh_cache_keys(cache_obj, file_data)
with open(self.file_path, "w") as handle:
json.dump([f.as_dict() for f in file_data], handle, indent=4)
return self.recent_keys
def _read_cached_keys(self, file_data: list[RecentKey]) -> list[RecentKey]:
return [x for x in file_data if not x.is_expired]
def _validate_file_existence(self) -> bool:
if not isfile(self.file_path):
antilles_path = join(utils.get_repo_root(), ".antilles")
makedirs(antilles_path, exist_ok=True)
with open(self.file_path, "w") as handle:
json.dump({}, handle)
return isfile(self.file_path)
def _add_cached_keys(
self, key_dict: dict, file_data: list[RecentKey]
) -> RecentKeysStack:
new_keys_stack = RecentKeysStack(self.recent_keys.max_size)
new_keys_stack.extend(file_data)
for key, val in key_dict.items():
new_keys_stack.append(
RecentKey(
key,
val.get("prefix", ""),
val.get("is_encrypted", False),
val.get("expire_time"),
)
)
self.recent_keys = new_keys_stack
return self.recent_keys
def _refresh_cache_keys(
self, cache_obj: Q2CacheClient, file_data: list[RecentKey]
) -> RecentKeysStack:
"""
retrieving existing keys from file, updating file and recent keys if expired keys are present
"""
memcache_keys = cache_obj.get_many([
f"{x.prefix}{x.key}" for x in file_data
]).keys()
valid_keys = list(memcache_keys)
keys_to_remove = []
for i in file_data:
if f"{i.prefix}{i.key}" not in valid_keys:
keys_to_remove.append(i)
for key in keys_to_remove:
file_data.remove(key)
self.recent_keys = file_data
return self.recent_keys
class Q2LocalCacheClient:
@staticmethod
def get(key, default=None) -> str:
value = default
path = join(utils.get_repo_root(), ".antilles", key)
if exists(path):
with open(path, "r") as local_cache:
value = local_cache.read()
return value if value else default
@staticmethod
def set(key, value):
antilles_path = join(utils.get_repo_root(), ".antilles")
path = join(antilles_path, key)
makedirs(antilles_path, exist_ok=True)
with open(path, "w") as local_cache:
local_cache.write(str(value))
@staticmethod
def delete(key):
path = join(utils.get_repo_root(), ".antilles", key)
if exists(path):
unlink(path)
[docs]
def get_cache(
logger=None,
prefix=None,
cachemock_params: Optional[CacheMockValues] = None,
encryption_key=ENCRYPTION_KEY,
**kwargs,
) -> Q2CacheClient:
"""
:param prefix: If defined will be prepended to all keys
:param logger: Reference to calling request's logger (self.logger in your extension)
"""
if settings.TEST_MODE:
from q2_sdk.tools.testing.models import CacheMock
if cachemock_params:
return CacheMock.from_cachemock_params(cachemock_params)
return CacheMock()
config = settings.CACHE
host = (config["HOST"], config["PORT"])
serializer = partial(_json_serializer, encryption_key=encryption_key)
deserializer = partial(_json_deserializer, encryption_key=encryption_key)
ignore_exc = settings.DEPLOY_ENV == settings.DEPLOY_ENV_LEVEL.PROD
client = Q2CacheClient(
host,
prefix=prefix,
key_prefix=config["PREFIX"],
serializer=serializer,
deserializer=deserializer,
connect_timeout=config.get("CONNECT_TIMEOUT", 1),
timeout=config.get("TIMEOUT", 1),
ignore_exc=ignore_exc,
logger=logger,
encryption_key=encryption_key,
)
return client
[docs]
class StorageLevel(StrEnum):
"""
At what scope should the cache be stored?
:param Service: Cache will be scoped to the service
:param Stack: Cache will be scoped to the customer stack (self.hq_credentials.customer_key)
:param Session: Cache will be scoped to the user session (self.online_session.session_id)
"""
Service = auto()
Stack = auto()
Session = auto()
[docs]
def cache_key(fn, *args, **kwargs):
"""Simple function that returns a unique key based on function name, and arguments"""
# Return the hexadecimal representation of the hash
return sha256(
str(
[fn.__qualname__] + list(args) + [(key, val) for key, val in kwargs.items()]
).encode("utf-8")
).hexdigest()
[docs]
def cache(
_func=None,
*,
timeout: int = 300,
key: Union[Callable, str, None] = cache_key,
key_prefix: Optional[str] = None,
storage_level: StorageLevel = StorageLevel.Stack,
**kw,
):
"""
Decorator to handle caching elegantly.
Usage:
@cache(timeout=300, key='cacheKey', key_prefix='keyPrefix', storage_level=StorageLevel.Stack)
def func(foo, bar):
...
If the key is present, it returns the cached value instead of running the function. If the key is not yet cached,
it executes the function and stores the value under the key.
By default, the cache key is the qualified name of the function for the active stack (customer environment).
You can override the function name portion with the "key" parameter, and the stack portion with the "storage_level" parameter.
The key is set using the "Q2CacheClient.set()" method, and you can add any additional prefix according
to the client's configs. You can also add a decorator-specific prefix using the "key_prefix" parameter.
The "timeout" parameter specifies the duration, in seconds, that the function is cached. It defaults to 300
seconds (5 minutes).
:param timeout: Duration, in seconds, the function is cached. This argument defaults to 300 seconds (5 minutes)
:param key: Optional cache key that may be set, the default is the qualified name of the function.
:param key_prefix: A string that will be automatically included (prepended by default) to all cache keys
:param storage_level: StorageLevel enum, Service: Scoped to this service, Stack: Scoped to the customer env,
Session: Scoped to the user session. If no level is passed the function's `self.default_storage_level` will be used.
Finally if that isn't available the 'Stack` level will be used.
"""
def decorator(fn):
key_var = key
prefix = str(key_prefix) if key_prefix is not None else ""
_storage_level = storage_level
def _get_cache(self) -> Q2CacheClient:
"""
Returns a Cache based on the current prefix and storage level
:param self: The decorator's self object
:return: a tuple containing the cache object and prefix
"""
nonlocal _storage_level # ensure we use the outer scope storage level
# If no storage level is passed in use the default storage level if available
if _storage_level is None:
_storage_level = getattr(
self, "default_storage_level", StorageLevel.Stack
)
match _storage_level:
case StorageLevel.Stack | StorageLevel.Service:
final_cache = self.cache
case StorageLevel.Session:
final_cache = self.session_cache
case _:
self.logger.error("Unknown storage level: %s", _storage_level)
final_cache = self.session_cache
return final_cache
def _get_prefix(self, current_prefix: str) -> str:
"""
Augments the prefix in the case that the Storage level in Stack
:param self: The decorator's self object
:param current_prefix: The given prefix object
:return: the correct prefix object
"""
prefix = current_prefix
if _storage_level == StorageLevel.Stack:
prefix = ":".join([self.hq_credentials.customer_key, prefix])
if prefix and not prefix.endswith(":"):
prefix = prefix + ":"
return prefix
def _get_key(current_key: Callable | str | None, args, kwargs) -> str:
"""
Return a keystring based on the object passed in
:param current_key: Object that a key is derived from
:param args: args from function being wrapped
:param kwargs: kwargs from function being wrapped
:return: key string
"""
if callable(current_key):
final_key = current_key(fn, *args, **kwargs)
elif current_key is None:
final_key = fn.__qualname__
else:
final_key = current_key
return final_key
if iscoroutinefunction(fn):
# Async Call
@wraps(fn)
async def _wrapped(self, *args, **kwargs):
if "use_session_cache" in kw:
self.logger.warning(
"`use_session_cache` property is deprecated. Please switch to `storage_level=StorageLevel.Session` instead"
)
final_cache = _get_cache(self)
final_cache.prefix = _get_prefix(self, current_prefix=prefix)
final_key = _get_key(current_key=key_var, args=args, kwargs=kwargs)
try:
return await final_cache.get_async(
key=final_key,
default=partial(
final_cache.set_and_return_async,
key=final_key,
value=partial(fn, self, *args, **kwargs),
expire=timeout,
),
)
except MemcacheError as error:
self.logger.error("Error Accessing memcache: %s" % error)
return await fn(*args, **kwargs)
return _wrapped
else:
# Sync Call
@wraps(fn)
def _wrapped(self, *args, **kwargs):
if "use_session_cache" in kw:
self.logger.warning(
"`use_session_cache` property is deprecated. Please switch to `storage_level=StorageLevel.Session` instead"
)
final_cache = _get_cache(self)
final_cache.prefix = _get_prefix(self, current_prefix=prefix)
final_key = _get_key(current_key=key_var, args=args, kwargs=kwargs)
try:
return final_cache.get(
key=final_key,
default=partial(
final_cache.set_and_return,
key=final_key,
value=partial(fn, self, *args, **kwargs),
expire=timeout,
),
)
except MemcacheError as error:
self.logger.error("Error Accessing memcache: %s" % error)
return fn(*args, **kwargs)
return _wrapped
if _func is None:
return decorator
elif isinstance(_func, int):
timeout = _func
return decorator
return decorator(_func)
RECENT_KEYS = None
if settings.REPO_ROOT:
RECENT_KEYS = RecentKeysStack(
settings.CACHE["KEYS_TO_REMEMBER"], seed_from_file=True
)