Source code for q2_sdk.extensions.Cache.extension

"""
Cache Extension returns latest retrieved cache keys
"""

from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core.request_handler.templating import LocalAssetLoader
from q2_sdk.tools.utils import to_bool
from q2_sdk.core.cache import RECENT_KEYS, RecentKey


[docs] class CacheHandler(Q2BaseRequestHandler): async def get(self): self.cache.get_many([x.full_key for x in RECENT_KEYS]) recent_keys = [x for x in RECENT_KEYS if not x.is_expired] html = self.get_template("index.html.jinja2", {"recent_keys": recent_keys}) html = LocalAssetLoader(self.templater, html).expand_local_sources() self.write(html) def post(self): keys = self.get_body_arguments("keys[]") if not keys: self.cache.delete_many([x.full_key for x in RECENT_KEYS]) RECENT_KEYS.clear() else: keys_to_remove = [] for key in keys: prefix, key, is_encrypted = key.split(";") recent_key = RecentKey( key, prefix, is_encrypted=to_bool(is_encrypted or False) ) keys_to_remove.append(f"{prefix}{key}") try: RECENT_KEYS.remove(recent_key) except ValueError: pass self.cache.delete_many(keys_to_remove) self.write("Success")