import json
import os
from functools import cached_property
from typing import Union, Optional
from urllib.parse import urlparse
from q2_sdk.core.cache import StorageLevel, Q2CacheClient, CacheConfigError
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.core.configuration import settings
[docs]
class Q2CaliperAPICustomHandler(Q2BaseRequestHandler):
"""
RequestHandler meant to be used for requests incoming from CaliperAPI.
"""
DESCRIPTION = ""
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.allow_non_q2_traffic = True
self._request_parameters = None
self._db_config = None
self.default_cache_level = StorageLevel.Stack
[docs]
@cached_property
def stack_cache(self) -> Q2CacheClient:
"""Q2CacheClient scoped to the current financial institution stack."""
if not self.hq_credentials.customer_key:
self.logger.error(
"Customer Key is unset, but cache requested at stack level. Refusing to cache."
)
raise CacheConfigError
return self.get_cache(prefix=self.hq_credentials.customer_key)
[docs]
async def prepare(self):
await super().prepare()
self.logger.debug(f"Request Method: {self.request.method}")
if self.request.headers:
self.logger.debug(f"Request Headers: {dict(self.request.headers)}")
customer_key = self.request.headers.get("customerKey", "")
self._db_config = json.loads(self.request.headers.get("WedgeConfigs", "{}"))
customer_key = customer_key.replace("CHANGEME", "")
if not customer_key:
customer_key = self._db_config.get("customerKey", "")
if customer_key and customer_key != settings.VAULT_KEY:
supplied_hq = self._get_hq_from_key(customer_key)
if supplied_hq:
self.hq_credentials = supplied_hq
if self.request.body:
body = self.request.body
if isinstance(body, bytes):
body = self.request.body.decode()
self.logger.debug(
"Request Body: %s",
body,
add_to_buffer=False,
)
def _get_hq_from_key(self, customer_key) -> HqCredentials:
cache_key = f"hq_for_ck_{customer_key}"
cached_creds = self.service_cache.get(cache_key)
is_url = urlparse(customer_key)
if cached_creds:
hq_creds = HqCredentials(
cached_creds["HqUrl"],
cached_creds["CsrUser"],
cached_creds["CsrPwd"],
cached_creds["ABA"],
customer_key=customer_key,
database_name=cached_creds.get("DB_NAME"),
)
self.logger.debug("Getting HQ from cache for key: %s", customer_key)
elif is_url and (is_url.scheme and is_url.netloc and is_url.path):
if settings.MULTITENANT:
hq_creds = self.vault.get_hq_creds(os.environ.get("ABA", "022222222"))
else:
hq_creds = HqCredentials(
customer_key,
os.environ.get("CSR_USER"),
os.environ.get("CSR_PWD"),
os.environ.get("ABA"),
customer_key=customer_key,
)
self.service_cache.set(cache_key, hq_creds.serialize_as_dict(), expire=60)
else:
hq_creds = self.vault.get_hq_creds(customer_key)
self.service_cache.set(cache_key, hq_creds.serialize_as_dict(), expire=60)
return hq_creds
@property
def wedge_address_configs(self) -> dict:
"""Alias to self.db_config"""
return self._db_config
def parse_query_parameters(self) -> dict:
if not self.request.arguments:
return {}
return {
key: str(value[0], "utf-8") for key, value in self.request.arguments.items()
}
@property
def request_parameters(self):
"""
A convenient parameter for all query parameters and/or request.body fields.
"""
if not self._request_parameters:
fields = self.parse_query_parameters()
if self.request.body:
overlay_fields = {}
try:
overlay_fields = json.loads(self.request.body)
except json.decoder.JSONDecodeError:
self.logger.error("Unable to decode non-JSON body")
fields.update(overlay_fields)
self._request_parameters = fields
return self._request_parameters
@staticmethod
def return_data(
data: dict, success: bool = True, errors: Optional[list[dict]] = None
):
return_shape = {"data": data}
if success:
return_shape["success"] = success
if errors:
return_shape["errors"] = errors
return json.dumps(return_shape)
[docs]
def write(self, chunk: Union[str, bytes, dict]) -> None:
if settings.TEMPORARY_LOG_RESPONSE_ENABLE:
self.logger.info("Response: %s", chunk)
elif settings.LOG_RESPONSE_IN_DEBUG:
self.logger.debug("Response: %s", chunk)
super().write(chunk)