Source code for q2_sdk.core.http_handlers.caliper_api_custom_handler

import json
import os
from functools import cached_property
from typing import Union, Optional
from urllib.parse import urlparse
from q2_sdk.core.cache import StorageLevel, Q2CacheClient, CacheConfigError
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.core.configuration import settings


[docs] class Q2CaliperAPICustomHandler(Q2BaseRequestHandler): """ RequestHandler meant to be used for requests incoming from CaliperAPI. """ DESCRIPTION = "" def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.allow_non_q2_traffic = True self._request_parameters = None self._db_config = None self.default_cache_level = StorageLevel.Stack
[docs] @cached_property def stack_cache(self) -> Q2CacheClient: """Q2CacheClient scoped to the current financial institution stack.""" if not self.hq_credentials.customer_key: self.logger.error( "Customer Key is unset, but cache requested at stack level. Refusing to cache." ) raise CacheConfigError return self.get_cache(prefix=self.hq_credentials.customer_key)
[docs] async def prepare(self): await super().prepare() self.logger.debug(f"Request Method: {self.request.method}") if self.request.headers: self.logger.debug(f"Request Headers: {dict(self.request.headers)}") customer_key = self.request.headers.get("customerKey", "") self._db_config = json.loads(self.request.headers.get("WedgeConfigs", "{}")) customer_key = customer_key.replace("CHANGEME", "") if not customer_key: customer_key = self._db_config.get("customerKey", "") if customer_key and customer_key != settings.VAULT_KEY: supplied_hq = self._get_hq_from_key(customer_key) if supplied_hq: self.hq_credentials = supplied_hq if self.request.body: body = self.request.body if isinstance(body, bytes): body = self.request.body.decode() self.logger.debug( "Request Body: %s", body, add_to_buffer=False, )
def _get_hq_from_key(self, customer_key) -> HqCredentials: cache_key = f"hq_for_ck_{customer_key}" cached_creds = self.service_cache.get(cache_key) is_url = urlparse(customer_key) if cached_creds: hq_creds = HqCredentials( cached_creds["HqUrl"], cached_creds["CsrUser"], cached_creds["CsrPwd"], cached_creds["ABA"], customer_key=customer_key, database_name=cached_creds.get("DB_NAME"), ) self.logger.debug("Getting HQ from cache for key: %s", customer_key) elif is_url and (is_url.scheme and is_url.netloc and is_url.path): if settings.MULTITENANT: hq_creds = self.vault.get_hq_creds(os.environ.get("ABA", "022222222")) else: hq_creds = HqCredentials( customer_key, os.environ.get("CSR_USER"), os.environ.get("CSR_PWD"), os.environ.get("ABA"), customer_key=customer_key, ) self.service_cache.set(cache_key, hq_creds.serialize_as_dict(), expire=60) else: hq_creds = self.vault.get_hq_creds(customer_key) self.service_cache.set(cache_key, hq_creds.serialize_as_dict(), expire=60) return hq_creds @property def wedge_address_configs(self) -> dict: """Alias to self.db_config""" return self._db_config def parse_query_parameters(self) -> dict: if not self.request.arguments: return {} return { key: str(value[0], "utf-8") for key, value in self.request.arguments.items() } @property def request_parameters(self): """ A convenient parameter for all query parameters and/or request.body fields. """ if not self._request_parameters: fields = self.parse_query_parameters() if self.request.body: overlay_fields = {} try: overlay_fields = json.loads(self.request.body) except json.decoder.JSONDecodeError: self.logger.error("Unable to decode non-JSON body") fields.update(overlay_fields) self._request_parameters = fields return self._request_parameters @staticmethod def return_data( data: dict, success: bool = True, errors: Optional[list[dict]] = None ): return_shape = {"data": data} if success: return_shape["success"] = success if errors: return_shape["errors"] = errors return json.dumps(return_shape)
[docs] def write(self, chunk: Union[str, bytes, dict]) -> None: if settings.TEMPORARY_LOG_RESPONSE_ENABLE: self.logger.info("Response: %s", chunk) elif settings.LOG_RESPONSE_IN_DEBUG: self.logger.debug("Response: %s", chunk) super().write(chunk)