from __future__ import annotations
import json
import logging
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Optional
import requests
from q2msg.olb.holocron_pb2 import Config, Facet
from q2_sdk.core import cache, contexts, message_bus, q2_requests
from q2_sdk.core.cache import Q2CacheClient
from q2_sdk.core.configuration import settings
from q2_sdk.core.default_settings import HOLOCRON_BY_ENV, EnvLevel, get_normalized_env
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.tools.utils import camelize, to_bool
LOGGER = logging.getLogger(__name__)
[docs]
class BadEnvstackError(Exception):
"""This Envstack does not exist"""
[docs]
class NoDataError(Exception):
"""No data returned for Envstack"""
[docs]
@dataclass
class Result:
success: bool = False
error_message: Optional[str] = None
data: dict = field(default_factory=dict)
[docs]
@dataclass
class EnvStackEntry:
"""
Stores the envstack details returned from Holocron.
Attributes:
envstack (str): The environment stack. Typically set in the format of <FI Number>-<environment>.
aba (str): The American Bankers Association number.
customer_key (str): Unique identifier for a stack.
db_name (str): The name of the database.
core_only (bool): Indicator of whether this stack is an Online Banking Stack or a core adapter
"""
env_stack: str = ""
aba: str = ""
customer_key: str = ""
db_name: str = ""
core_only: bool = False
[docs]
@classmethod
def from_dict(cls, envstack: dict) -> EnvStackEntry:
"""
Converts dictionary into a list of EnvStackEntry instances.
:param envstack: A dictionary populated with envstack information
:returns: Instantiated EnvStackEntry
"""
return cls(
env_stack=envstack.get("envStack", ""),
aba=envstack.get("aba", ""),
customer_key=envstack.get("customerKey", ""),
db_name=envstack.get("dbName", ""),
core_only=to_bool(envstack.get("coreOnly", False)),
)
[docs]
def to_dict(self):
return {camelize(k): v for k, v in asdict(self).items()}
[docs]
@dataclass
class EnvStackResponse:
"""
Stores envstack response from Holocron.
Attributes:
success (bool): Indicator of whether the request was successful.
reason (str): The reasoning for the failure.
data (EnvStackEntry): The environment stack for the provided customer key.
"""
success: bool = False
reason: str = ""
data: Optional[EnvStackEntry] = None
[docs]
def to_dict(self):
return {k: v for k, v in asdict(self).items()}
[docs]
@dataclass
class EnvStackListResponse:
"""
Stores envstack response information from Holocron.
Attributes:
success (bool): Indicator of whether the request was successful.
reason (str): The reasoning for the failure.
data (list[EnvStackEntry]): The list of environment stacks returned.
"""
success: bool = False
reason: str = ""
data: Optional[list] = field(default_factory=list[EnvStackEntry])
[docs]
def to_dict(self):
return {k: v for k, v in asdict(self).items()}
[docs]
class SearchType(Enum):
Name = "name"
CustomerKey = "customer_key"
ABA = "aba"
DbName = "db_name"
[docs]
@dataclass
class SearchFilter:
key: str
search_type: SearchType
[docs]
class HolocronEndpoint:
"""
Interacts with Holocron for a given environment.
"""
def __init__(
self,
logger: Optional[Q2LoggerType] = None,
environment: Optional[EnvLevel] = None,
):
"""
:param logger: Q2Logger instance.
:param environment: The holocron environment to point to.
"""
self.logger = logger
if not self.logger:
context = contexts.get_current_request(raise_if_none=False)
if context:
self.logger = context.request_handler.logger
else:
self.logger = LOGGER
if isinstance(environment, EnvLevel):
environment = environment.value
if hasattr(environment, "value"):
environment = environment.value
self.environment = get_normalized_env(environment, abbreviate=True)
[docs]
async def get_envstack(
self, key: str, search_type: SearchType = SearchType.Name
) -> EnvStackResponse:
"""
Retrieves the envstack information from Holocron using the customer key.
:param key: Key to filter by. Note: the response will be marked as unsuccessful if more than one envstack is returned for the provided key.
:param search_type: Can be any value from the SearchType enum. Defaults to searching by EnvStack name.
:returns: EnvStackResponse dataclass containing information on the response status (success or otherwise), reasoning for any failures,
and the environment stack on success.
"""
result = EnvStackResponse()
response = await self.get_envstack_list(SearchFilter(key, search_type))
if not response.success:
error_message = response.reason
self.logger.error(error_message)
result.reason = error_message
return result
if len(response.data) > 1:
error_message = "Multiple environment stacks returned"
self.logger.error(error_message)
result.reason = error_message
elif response.data:
result.success = True
result.data = response.data[0]
return result
[docs]
async def get_envstack_list(
self, filter: Optional[SearchFilter] = None
) -> EnvStackListResponse:
"""
Retrieves the envstack information from Holocron.
:param key: Key to filter by.
:returns: EnvStackListResponse containing information on the response status (success or otherwise), reasoning for any failures,
and a list of environment stacks on success.
"""
result = EnvStackListResponse()
url = HOLOCRON_BY_ENV[self.environment] + "/v1/envstack"
query_params = {}
if filter:
match filter.search_type:
case SearchType.Name:
url = f"{url}/{filter.key}"
case SearchType.CustomerKey:
query_params = {"customerKey": filter.key}
case SearchType.ABA:
query_params = {"aba": filter.key}
case SearchType.DbName:
query_params = {"dbName": filter.key}
try:
response = await q2_requests.get(
self.logger, url, params=query_params, verify_whitelist=False
)
response.raise_for_status()
envstacks = json.loads(response.content.decode("utf-8"))
if isinstance(envstacks, dict):
envstacks = [envstacks]
result.data = [EnvStackEntry.from_dict(x) for x in envstacks]
result.success = True
except requests.exceptions.RequestException as err:
if (
filter
and filter.search_type == SearchType.Name
and "Not Found" in str(err)
):
result.data = []
result.success = True
else:
error_message = "Failed to retrieve the envstack information"
self.logger.exception(error_message)
result.reason = error_message
return result
[docs]
async def create_or_update_envstack(self, envstack: EnvStackEntry, updated_by: str):
"""
Envstacks are the base objects that have configs and facets attached to them. This upserts (creates or updates) a given envstack.
:param envstack: EnvStack metadata to create
:param updated_by: An identifier to who is creating (your name perhaps)
"""
url = HOLOCRON_BY_ENV[self.environment] + f"/v1/envstack/{envstack.env_stack}"
await q2_requests.post(
self.logger,
url,
json={
"aba": envstack.aba,
"customerKey": envstack.customer_key,
"dbName": envstack.db_name,
"updatedBy": updated_by,
},
verify_whitelist=False,
)
"""This envstack does not exist"""
[docs]
async def get_configs(self, envstack_name: str) -> dict:
"""
Configs are key/value pairs attached to envstacks
:param envstack_name: Name of the envstack
"""
url = (
HOLOCRON_BY_ENV[self.environment]
+ f"/v1/envstack/{envstack_name}/config/retrieve"
)
response = await q2_requests.post(self.logger, url, verify_whitelist=False)
match response.status_code:
case 500:
raise BadEnvstackError
case 404:
raise NoDataError
case _:
message = message_bus.decrypt(response.content, response.headers).text
return message
[docs]
async def get_facets(self, envstack_name: str, facet_id: str) -> dict:
"""
Facets are key/value pairs attached to envstacks, but segmented one level further. Good for if multiple groups are accessing the envstack,
each one can have their own facet_id.
:param envstack_name: Name of the envstack
"""
url = (
HOLOCRON_BY_ENV[self.environment]
+ f"/v1/envstack/{envstack_name}/facet/{facet_id}/retrieve"
)
response = await q2_requests.post(self.logger, url, verify_whitelist=False)
match response.status_code:
case 500:
raise BadEnvstackError
case 404:
raise NoDataError
case _:
message = message_bus.decrypt(response.content, response.headers).text
return message
[docs]
async def add_configs(self, envstack_name: str, config: dict) -> bool:
"""
Configs are key/value pairs attached to envstacks. This adds arbitrary pairs under the service_configs config location
:param envstack_name: Name of the envstack
:param config: Dict of key/value pairs to add
"""
url = (
HOLOCRON_BY_ENV[self.environment]
+ f"/v1/envstack/{envstack_name}/config/update"
)
protobuf = Config(service_configs=config)
encrypted_q2msg = message_bus.get_q2msg_with_headers(
protobuf, "olb.holocron.Config", envstack_name
)
response = await q2_requests.post(
self.logger,
url,
data=encrypted_q2msg.message,
headers=encrypted_q2msg.headers,
verify_whitelist=False,
)
return response.ok
[docs]
async def add_facets(self, envstack_name: str, facet_id: str, config: dict) -> bool:
"""
Facets are key/value pairs attached to envstacks, but segmented one level further. Good for if multiple groups are accessing the envstack,
each one can have their own facet_id.
:param envstack_name: Name of the envstack
:param facet_id: Can be anything that uniquely groups together configs. A UUID, or just a string
:param config: Dict of key/value pairs to add
"""
url = (
HOLOCRON_BY_ENV[self.environment]
+ f"/v1/envstack/{envstack_name}/facet/{facet_id}/update"
)
protobuf = Facet(ints={}, strings=config, creds={})
encrypted_q2msg = message_bus.get_q2msg_with_headers(
protobuf, "olb.holocron.Facet", envstack_name
)
response = await q2_requests.post(
self.logger,
url,
data=encrypted_q2msg.message,
headers=encrypted_q2msg.headers,
verify_whitelist=False,
)
return response.ok
[docs]
async def get_envstack_by_cust_key(
logger: Q2LoggerType,
hq_credentials: HqCredentials,
environment=settings.ENV_LEVEL_DEPLOY_ENV,
cache_obj: Q2CacheClient | None = None,
) -> Result:
result = Result()
# Grab customer key required to fetch envstack
if not hq_credentials.customer_key and not settings.VAULT_KEY:
result.error_message = "Customer key required for envstack search not found"
return result
customer_key = hq_credentials.customer_key or settings.VAULT_KEY
# Searching cache for envstack
cache_obj = cache_obj if cache_obj else cache.get_cache(logger=logger)
cache_key = f"HQ_Credentials_{customer_key}"
if cache_obj:
envstack = await cache_obj.get_async(key=cache_key)
if envstack:
result.success = True
logger.info(f"Envstack {envstack} found in cache")
result.data["envstack"] = envstack
return result
# Searching holocron for envstack
holocron = HolocronEndpoint(logger, environment)
holocron_response = await holocron.get_envstack(
customer_key, SearchType.CustomerKey
)
if not holocron_response.success:
result.error_message = (
"Failed to fetch envstack from Holocron. " + holocron_response.reason
)
return result
result.success = True
result.data["envstack"] = holocron_response.data.env_stack
logger.info(f"Envstack {envstack} found in holocron")
# Caching envstack info...
ttl = 60 * 60
await cache_obj.set_async(key=cache_key, value=result.data["envstack"], expire=ttl)
return result