Source code for q2_sdk.models.holocron

from __future__ import annotations

import json
import logging
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Optional

import requests
from q2msg.olb.holocron_pb2 import Config, Facet

from q2_sdk.core import cache, contexts, message_bus, q2_requests
from q2_sdk.core.cache import Q2CacheClient
from q2_sdk.core.configuration import settings
from q2_sdk.core.default_settings import HOLOCRON_BY_ENV, EnvLevel, get_normalized_env
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.tools.utils import camelize, to_bool

LOGGER = logging.getLogger(__name__)


[docs] class BadEnvstackError(Exception): """This Envstack does not exist"""
[docs] class NoDataError(Exception): """No data returned for Envstack"""
[docs] @dataclass class Result: success: bool = False error_message: Optional[str] = None data: dict = field(default_factory=dict)
[docs] @dataclass class EnvStackEntry: """ Stores the envstack details returned from Holocron. Attributes: envstack (str): The environment stack. Typically set in the format of <FI Number>-<environment>. aba (str): The American Bankers Association number. customer_key (str): Unique identifier for a stack. db_name (str): The name of the database. core_only (bool): Indicator of whether this stack is an Online Banking Stack or a core adapter """ env_stack: str = "" aba: str = "" customer_key: str = "" db_name: str = "" core_only: bool = False
[docs] @classmethod def from_dict(cls, envstack: dict) -> EnvStackEntry: """ Converts dictionary into a list of EnvStackEntry instances. :param envstack: A dictionary populated with envstack information :returns: Instantiated EnvStackEntry """ return cls( env_stack=envstack.get("envStack", ""), aba=envstack.get("aba", ""), customer_key=envstack.get("customerKey", ""), db_name=envstack.get("dbName", ""), core_only=to_bool(envstack.get("coreOnly", False)), )
[docs] def to_dict(self): return {camelize(k): v for k, v in asdict(self).items()}
[docs] @dataclass class EnvStackResponse: """ Stores envstack response from Holocron. Attributes: success (bool): Indicator of whether the request was successful. reason (str): The reasoning for the failure. data (EnvStackEntry): The environment stack for the provided customer key. """ success: bool = False reason: str = "" data: Optional[EnvStackEntry] = None
[docs] def to_dict(self): return {k: v for k, v in asdict(self).items()}
[docs] @dataclass class EnvStackListResponse: """ Stores envstack response information from Holocron. Attributes: success (bool): Indicator of whether the request was successful. reason (str): The reasoning for the failure. data (list[EnvStackEntry]): The list of environment stacks returned. """ success: bool = False reason: str = "" data: Optional[list] = field(default_factory=list[EnvStackEntry])
[docs] def to_dict(self): return {k: v for k, v in asdict(self).items()}
[docs] class SearchType(Enum): Name = "name" CustomerKey = "customer_key" ABA = "aba" DbName = "db_name"
[docs] @dataclass class SearchFilter: key: str search_type: SearchType
[docs] class HolocronEndpoint: """ Interacts with Holocron for a given environment. """ def __init__( self, logger: Optional[Q2LoggerType] = None, environment: Optional[EnvLevel] = None, ): """ :param logger: Q2Logger instance. :param environment: The holocron environment to point to. """ self.logger = logger if not self.logger: context = contexts.get_current_request(raise_if_none=False) if context: self.logger = context.request_handler.logger else: self.logger = LOGGER if isinstance(environment, EnvLevel): environment = environment.value if hasattr(environment, "value"): environment = environment.value self.environment = get_normalized_env(environment, abbreviate=True)
[docs] async def get_envstack( self, key: str, search_type: SearchType = SearchType.Name ) -> EnvStackResponse: """ Retrieves the envstack information from Holocron using the customer key. :param key: Key to filter by. Note: the response will be marked as unsuccessful if more than one envstack is returned for the provided key. :param search_type: Can be any value from the SearchType enum. Defaults to searching by EnvStack name. :returns: EnvStackResponse dataclass containing information on the response status (success or otherwise), reasoning for any failures, and the environment stack on success. """ result = EnvStackResponse() response = await self.get_envstack_list(SearchFilter(key, search_type)) if not response.success: error_message = response.reason self.logger.error(error_message) result.reason = error_message return result if len(response.data) > 1: error_message = "Multiple environment stacks returned" self.logger.error(error_message) result.reason = error_message elif response.data: result.success = True result.data = response.data[0] return result
[docs] async def get_envstack_list( self, filter: Optional[SearchFilter] = None ) -> EnvStackListResponse: """ Retrieves the envstack information from Holocron. :param key: Key to filter by. :returns: EnvStackListResponse containing information on the response status (success or otherwise), reasoning for any failures, and a list of environment stacks on success. """ result = EnvStackListResponse() url = HOLOCRON_BY_ENV[self.environment] + "/v1/envstack" query_params = {} if filter: match filter.search_type: case SearchType.Name: url = f"{url}/{filter.key}" case SearchType.CustomerKey: query_params = {"customerKey": filter.key} case SearchType.ABA: query_params = {"aba": filter.key} case SearchType.DbName: query_params = {"dbName": filter.key} try: response = await q2_requests.get( self.logger, url, params=query_params, verify_whitelist=False ) response.raise_for_status() envstacks = json.loads(response.content.decode("utf-8")) if isinstance(envstacks, dict): envstacks = [envstacks] result.data = [EnvStackEntry.from_dict(x) for x in envstacks] result.success = True except requests.exceptions.RequestException as err: if ( filter and filter.search_type == SearchType.Name and "Not Found" in str(err) ): result.data = [] result.success = True else: error_message = "Failed to retrieve the envstack information" self.logger.exception(error_message) result.reason = error_message return result
[docs] async def create_or_update_envstack(self, envstack: EnvStackEntry, updated_by: str): """ Envstacks are the base objects that have configs and facets attached to them. This upserts (creates or updates) a given envstack. :param envstack: EnvStack metadata to create :param updated_by: An identifier to who is creating (your name perhaps) """ url = HOLOCRON_BY_ENV[self.environment] + f"/v1/envstack/{envstack.env_stack}" await q2_requests.post( self.logger, url, json={ "aba": envstack.aba, "customerKey": envstack.customer_key, "dbName": envstack.db_name, "updatedBy": updated_by, }, verify_whitelist=False, )
"""This envstack does not exist"""
[docs] async def get_configs(self, envstack_name: str) -> dict: """ Configs are key/value pairs attached to envstacks :param envstack_name: Name of the envstack """ url = ( HOLOCRON_BY_ENV[self.environment] + f"/v1/envstack/{envstack_name}/config/retrieve" ) response = await q2_requests.post(self.logger, url, verify_whitelist=False) match response.status_code: case 500: raise BadEnvstackError case 404: raise NoDataError case _: message = message_bus.decrypt(response.content, response.headers).text return message
[docs] async def get_facets(self, envstack_name: str, facet_id: str) -> dict: """ Facets are key/value pairs attached to envstacks, but segmented one level further. Good for if multiple groups are accessing the envstack, each one can have their own facet_id. :param envstack_name: Name of the envstack """ url = ( HOLOCRON_BY_ENV[self.environment] + f"/v1/envstack/{envstack_name}/facet/{facet_id}/retrieve" ) response = await q2_requests.post(self.logger, url, verify_whitelist=False) match response.status_code: case 500: raise BadEnvstackError case 404: raise NoDataError case _: message = message_bus.decrypt(response.content, response.headers).text return message
[docs] async def add_configs(self, envstack_name: str, config: dict) -> bool: """ Configs are key/value pairs attached to envstacks. This adds arbitrary pairs under the service_configs config location :param envstack_name: Name of the envstack :param config: Dict of key/value pairs to add """ url = ( HOLOCRON_BY_ENV[self.environment] + f"/v1/envstack/{envstack_name}/config/update" ) protobuf = Config(service_configs=config) encrypted_q2msg = message_bus.get_q2msg_with_headers( protobuf, "olb.holocron.Config", envstack_name ) response = await q2_requests.post( self.logger, url, data=encrypted_q2msg.message, headers=encrypted_q2msg.headers, verify_whitelist=False, ) return response.ok
[docs] async def add_facets(self, envstack_name: str, facet_id: str, config: dict) -> bool: """ Facets are key/value pairs attached to envstacks, but segmented one level further. Good for if multiple groups are accessing the envstack, each one can have their own facet_id. :param envstack_name: Name of the envstack :param facet_id: Can be anything that uniquely groups together configs. A UUID, or just a string :param config: Dict of key/value pairs to add """ url = ( HOLOCRON_BY_ENV[self.environment] + f"/v1/envstack/{envstack_name}/facet/{facet_id}/update" ) protobuf = Facet(ints={}, strings=config, creds={}) encrypted_q2msg = message_bus.get_q2msg_with_headers( protobuf, "olb.holocron.Facet", envstack_name ) response = await q2_requests.post( self.logger, url, data=encrypted_q2msg.message, headers=encrypted_q2msg.headers, verify_whitelist=False, ) return response.ok
[docs] async def get_envstack_by_cust_key( logger: Q2LoggerType, hq_credentials: HqCredentials, environment=settings.ENV_LEVEL_DEPLOY_ENV, cache_obj: Q2CacheClient | None = None, ) -> Result: result = Result() # Grab customer key required to fetch envstack if not hq_credentials.customer_key and not settings.VAULT_KEY: result.error_message = "Customer key required for envstack search not found" return result customer_key = hq_credentials.customer_key or settings.VAULT_KEY # Searching cache for envstack cache_obj = cache_obj if cache_obj else cache.get_cache(logger=logger) cache_key = f"HQ_Credentials_{customer_key}" if cache_obj: envstack = await cache_obj.get_async(key=cache_key) if envstack: result.success = True logger.info(f"Envstack {envstack} found in cache") result.data["envstack"] = envstack return result # Searching holocron for envstack holocron = HolocronEndpoint(logger, environment) holocron_response = await holocron.get_envstack( customer_key, SearchType.CustomerKey ) if not holocron_response.success: result.error_message = ( "Failed to fetch envstack from Holocron. " + holocron_response.reason ) return result result.success = True result.data["envstack"] = holocron_response.data.env_stack logger.info(f"Envstack {envstack} found in holocron") # Caching envstack info... ttl = 60 * 60 await cache_obj.set_async(key=cache_key, value=result.data["envstack"], expire=ttl) return result