Source code for q2_sdk.core.http_handlers.tecton_base_handler

import base64
from enum import Enum
import json
import logging
import copy
from typing import List, Optional, Union

from q2_sdk.core.cli import colored, textui
from q2_sdk.core.configuration import get_settings
from q2_sdk.core.exceptions import DatabaseDataError, TectonError
from q2_sdk.core.http_handlers.base_handler import ConfigurationCheck
from q2_sdk.core.http_handlers.online_handler import Q2OnlineRequestHandler
from q2_sdk.core.rate_limiter import RateLimiter
from q2_sdk.entrypoints import bounce_stack
from q2_sdk.hq.db.data_feed import DataFeed
from q2_sdk.hq.db.user_data import UserData
from q2_sdk.hq.db.mfa import Mfa
from q2_sdk.hq.db.wedge_address import WedgeAddress
from q2_sdk.hq.hq_api.wedge_online_banking import (
    ValidateEdvSecureAccessCode,
    ValidateAuthAccessCode,
    ValidateEdvToken,
    SetExternalMfaToken,
    CreateAuditRecord,
    GetEdvMfaType,
)
from q2_sdk.models.tecton import BadRequest, Success, NotFound, InternalServerError
from q2_sdk.tools import utils
from q2_sdk.core.exceptions import ConfigurationError

MFA_CACHE_KEY = "MfaValidation_"
MFA_VALID_TIME = 300  # 5 minutes
SECURE_ACCESS_CODE_ERROR = TectonError(
    "Validate secure access code",
    {"message": "Failed to validate token", "valid": False},
)


class MFAMessage(Enum):
    NOT_FOUND = "Multifactor authentication validation token not found."
    DENIED = "Action denied due to suspect login."
    EXITED = "Multifactor authentication workflow exited or failed."
    NO_ERROR_HANDLER = "No error handler provided to MFA decorator."
    REQUIRED = "MFA required. Proceeding with multifactor authentication."
    VALIDATED = "MFA validated. Proceeding with request."


class InstallerOperation(Enum):
    INSTALL = "install"
    UNINSTALL = "uninstall"


[docs] class Q2TectonBaseRequestHandler(Q2OnlineRequestHandler): """ RequestHandler meant to be used for all Tecton related requests """ TECTON_USER_CONFIG_PREFIX = "tecton_user_config" IS_UNAUTHENTICATED = False TECTON_BETA = False IS_MFA = False MFA_REQUIRES_REGISTRATION = False MFA_TOKEN_LIFETIME_IN_MINUTES = 5 def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self._logger_rate_limiter = RateLimiter( self.logger, max_tokens=10, refill_period=5, refill_amount=2, request=request, name="log_to_server", ) self.expand_local_sources = False if self.IS_UNAUTHENTICATED: self.allow_non_q2_traffic = True if self.IS_MFA: self.mfa_info = self._set_mfa_info() self._unparsed_routes.append("default/feature_manifest") self._user_data_short_name = f"SDK/{self.extension_name}_user_config" self.platform_info = None
[docs] async def prepare(self): await super().prepare() if self.form_fields.get("routing_key") == "default/log_to_server": self.rate_limiters.append(self._logger_rate_limiter)
@property def router(self): router = super().router router.update({ "default/authorize_transaction_with_mfa": self.authorize_transaction_with_mfa, "default/mfa_validate": self.mfa_validate, "default/feature_manifest": self.get_feature_manifest, "default/log_to_server": self.log_to_server, "default/get_user_config": self.get_module_user_config_data, "default/set_user_config": self.set_module_user_config_data, "default/remove_user_config": self.remove_module_user_config_data, }) return router
[docs] async def q2_post(self, *args, **kwargs): if "routing_key" in kwargs and "routing_key" not in self.form_fields: self.form_fields["routing_key"] = kwargs["routing_key"] return await super().q2_post(*args, **kwargs)
async def _get_upload_url(self): response = await super()._get_upload_url() return json.dumps(Success(json.loads(response)).to_json()) def _parse_tecton_payload(self) -> Optional[dict]: self.logger.debug(self.form_fields) encoded_data = self.form_fields.get("data") data = {} if encoded_data: replacements = ((b"\xa0", b" "),) base64_decoded_bytes = base64.b64decode(encoded_data) for repl in replacements: base64_decoded_bytes = base64_decoded_bytes.replace(repl[0], repl[1]) data_json = base64_decoded_bytes.decode("utf8") self.logger.debug("Tecton request data=%s", data_json) data = json.loads(data_json) if "tectonRequestMeta" in data: self.platform_info = data["tectonRequestMeta"] del data["tectonRequestMeta"] data.update({"routing_key": self.form_fields.get("routing_key")}) else: data.update(self.form_fields) return data
[docs] def validate_mfa_token(self): """Checks if the MFA key is within cache""" session_id = "" valid = False if self.online_session: session_id = self.online_session.session_id cache_key = f"{MFA_CACHE_KEY}{session_id}" if session_id and self.cache.get(cache_key): valid = True return valid
def log_to_server(self): if self._logger_rate_limiter in self.rate_limiters: form_fields = self.form_fields if "data" in form_fields: form_fields = self._parse_tecton_payload() if "message" in form_fields: level = form_fields.get("logLevel", "info").upper() message = form_fields.get("message", "") self.logger.log(logging.getLevelName(level), f"Frontend: {message}") return "{}" def get_feature_manifest(self): return json.dumps(utils.get_feature_manifest(self.extension_name)) def _user_config_storage_name(self, namespace: str) -> str: return f"{self.TECTON_USER_CONFIG_PREFIX}_{namespace}" async def _get_user_configuration( self, user_config_short_name: str ) -> Union[dict, None]: user_data_obj = UserData(self.logger, self.hq_credentials) user_config_rows = await user_data_obj.get( self.online_user.user_id, user_config_short_name ) if len(user_config_rows) == 0: return assert len(user_config_rows) == 1, ( "There should only be one user config per user in an extension" ) return json.loads(str(user_config_rows[0].GTDataValue)) async def get_module_user_config_data(self): data = await self._get_user_configuration( self._user_config_storage_name(self.form_fields.get("namespace")) ) return_data = {} if data is None else data return json.dumps({"status": 200, "data": return_data}) async def set_module_user_config_data(self): key = self.form_fields.get("key") value = self.form_fields.get("value") namespace = self.form_fields.get("namespace") configuration_save_key = self._user_config_storage_name(namespace) user_id = self.online_user.user_id existing_user_configuration = await self._get_user_configuration( configuration_save_key ) user_data_obj = UserData(self.logger, self.hq_credentials) if existing_user_configuration is None: await user_data_obj.create(user_id, configuration_save_key, "{}") existing_user_configuration = {} new_configuration = existing_user_configuration.copy() new_configuration[key] = value await user_data_obj.update( user_id, configuration_save_key, json.dumps(new_configuration) ) return '{"status": 200}' async def remove_module_user_config_data(self): key = self.form_fields.get("key") configuration_save_key = self._user_config_storage_name( self.form_fields.get("namespace") ) user_data_obj = UserData(self.logger, self.hq_credentials) existing_user_configuration = await self._get_user_configuration( configuration_save_key ) try: existing_user_configuration.pop(key) await user_data_obj.update( self.online_user.user_id, configuration_save_key, json.dumps(existing_user_configuration), ) except KeyError: pass return '{"status": 200}' def _extract_form_fields(self) -> dict: """Checks if the user submitted secure access code is valid and stores within cache.""" if "data" in self.form_fields: form_fields = self._parse_tecton_payload() else: form_fields = self.form_fields return form_fields if form_fields else {} def _extract_secure_access_code(self) -> str: form_fields = self._extract_form_fields() secure_access_code = form_fields.get("secureAccessCode") if not secure_access_code: raise TectonError( "Unable to validate secure access code", {"message": "Secure access code was empty."}, ) return secure_access_code def _extract_mfa_info(self) -> tuple: form_fields = self._extract_form_fields() mfa_validation_type = form_fields.get("MFAValidationType", "sac") one_time_password1 = form_fields.get("oneTimePassword1") one_time_password2 = form_fields.get("oneTimePassword2") secure_access_code = form_fields.get("secureAccessCode") mfa_value = one_time_password1 or secure_access_code or one_time_password2 if not mfa_value: raise TectonError( "Unable to validate", {"message": "No access code or token was passed."}, ) return ( mfa_validation_type, one_time_password1, one_time_password2, secure_access_code, ) # THIS APPEARS TO NO LONGER BE IN USE AFTER UUX 4.4.0.132 async def authorize_transaction_with_mfa(self): secure_access_code = self._extract_secure_access_code() transaction_id = self._extract_form_fields().get("transactionId") params = ValidateAuthAccessCode.ParamsObj( self.logger, self.hq_credentials, transaction_id, secure_access_code ) response = await ValidateAuthAccessCode.execute(params) if response.success: self.logger.info( f"validated transaction <id: {transaction_id}> with secure access code for login: <{self.online_user.login_name}>" ) return '{"data": {"valid": true}}' self.logger.error( f"Secure access code failed when validating transaction <id: {transaction_id}> for login: <{self.online_user.login_name}>" ) raise SECURE_ACCESS_CODE_ERROR async def mfa_validate(self): ( mfa_validation_type, one_time_password1, one_time_password2, secure_access_code, ) = self._extract_mfa_info() if mfa_validation_type == "token": params = ValidateEdvToken.ParamsObj( logger=self.logger, hq_credentials=self.hq_credentials, one_time_password1=one_time_password1, one_time_password2=one_time_password2, ) response = await ValidateEdvToken.execute(params) else: params = ValidateEdvSecureAccessCode.ParamsObj( logger=self.logger, hq_credentials=self.hq_credentials, secure_access_code=secure_access_code, ) response = await ValidateEdvSecureAccessCode.execute(params) if response.success: self.cache.set( f"{MFA_CACHE_KEY}{self.online_session.session_id}", "1", MFA_VALID_TIME ) self.logger.info("Validated MFA for: %s", self.online_user.login_name) return '{"data": {"valid": true}}' self.logger.error( "MFA failed to validate for login: %s", self.online_user.login_name, ) raise TectonError( "Unable to validate MFA", { "message": "Failed to validate SAC or token", "errorReturnCode": response.error_code, "valid": False, }, ) @classmethod async def validate_configuration(cls) -> List[ConfigurationCheck]: config_checks = await super().validate_configuration() config_checks.append(await cls._validate_auth_type_setting()) config_checks.append(await cls._validate_datafeed()) config_checks.append(await cls._validate_mfa_provider()) return config_checks @classmethod async def _validate_datafeed(cls) -> ConfigurationCheck: hq_credentials = get_settings().HQ_CREDENTIALS should_have_datafeed = False if cls.IS_UNAUTHENTICATED: wa_obj = WedgeAddress( None, hq_credentials=hq_credentials, ret_table_obj=True ) try: await wa_obj.get_by_name(cls.extension_name) should_have_datafeed = True except DatabaseDataError: should_have_datafeed = False configuration_check = ConfigurationCheck(True) if should_have_datafeed: df_obj = DataFeed(None, hq_credentials=hq_credentials, ret_table_obj=True) try: await df_obj.get_by_name(cls.extension_name) has_datafeed = True except DatabaseDataError: has_datafeed = False if not has_datafeed: configuration_check = ConfigurationCheck( False, "Datafeed must be installed (and stack bounced) if extension is unauthenticated", cls._fix_datafeed(InstallerOperation.INSTALL), ) return configuration_check @classmethod async def _fix_datafeed(cls, operation: InstallerOperation): hq_credentials = get_settings().HQ_CREDENTIALS df_obj = DataFeed(None, hq_credentials=hq_credentials, ret_table_obj=True) match operation: # noqa: E999 case InstallerOperation.INSTALL: await df_obj.create( cls.extension_name, description="SDK Passthrough Route", wedge_path_name=cls.extension_name, ) textui.puts(colored.green("Datafeed Installed")) await bounce_stack.bounce_stack(None) case InstallerOperation.UNINSTALL: await df_obj.delete(cls.extension_name, remove_wedge_address=False) textui.puts(colored.green("Datafeed Uninstalled")) @classmethod async def _validate_mfa_provider(cls) -> ConfigurationCheck: hq_credentials = get_settings().HQ_CREDENTIALS should_have_mfa_provider = False if cls.IS_MFA: wa_obj = WedgeAddress( None, hq_credentials=hq_credentials, ret_table_obj=True ) try: await wa_obj.get_by_name(cls.extension_name) should_have_mfa_provider = True except DatabaseDataError: should_have_mfa_provider = False configuration_check = ConfigurationCheck(True) if should_have_mfa_provider: mfa_obj = Mfa(None, hq_credentials=hq_credentials, ret_table_obj=True) try: await mfa_obj.get_providers(short_name=cls.extension_name) has_mfa_provider = True except DatabaseDataError: has_mfa_provider = False if not has_mfa_provider: configuration_check = ConfigurationCheck( False, "MFA Provider record must be added (and stack bounced) if MFA extension", cls._fix_mfa_provider(InstallerOperation.INSTALL), ) return configuration_check @classmethod async def _fix_mfa_provider(cls, operation: InstallerOperation): hq_credentials = get_settings().HQ_CREDENTIALS mfa_obj = Mfa(None, hq_credentials=hq_credentials, ret_table_obj=True) match operation: # noqa: E999 case InstallerOperation.INSTALL: await mfa_obj.add_provider( short_name=cls.extension_name, wedge_address_short_name=cls.extension_name, registration_required=cls.MFA_REQUIRES_REGISTRATION, token_lifetime_in_minutes=cls.MFA_TOKEN_LIFETIME_IN_MINUTES, ) textui.puts(colored.green("MFA Provider Installed")) await bounce_stack.bounce_stack(None) case InstallerOperation.UNINSTALL: await mfa_obj.remove_provider(mfa_provider_name=cls.extension_name) textui.puts(colored.green("MFA Provider Uninstalled")) @classmethod async def _validate_auth_type_setting(cls) -> ConfigurationCheck: class_is_unauth = cls.IS_UNAUTHENTICATED class_is_mfa = cls.IS_MFA if class_is_unauth and class_is_mfa: err = f"{cls.extension_name} cannot be both unauthenticated and MFA. Please set either IS_UNAUTHENTICATED or IS_MFA to False in your extension.py file." raise ConfigurationError(err) else: return ConfigurationCheck(True)
[docs] async def set_external_mfa_token( self, token: str ) -> Union[Success, InternalServerError, NotFound]: """ Sets an external MFA token with HQ and accounts for possible pre-auth token """ if self.IS_MFA and token: credentials = copy.deepcopy(self.hq_credentials) credentials.auth_token = self.request_name_elem.get("HqAuthToken") params_obj = SetExternalMfaToken.ParamsObj( self.logger, hq_credentials=credentials, external_mfa_token=token, ) hq_response = await SetExternalMfaToken.execute(params_obj) if hq_response.success: return Success(token) else: return InternalServerError(hq_response.error_message) elif self.IS_MFA and not token: return BadRequest("No token provided.") else: return NotFound("This extension is not enabled as a MFA extension.")
async def _create_audit_details( self, audit_details: Optional[Union[dict, str]] = None ): """ Helper for creating and formatting audit details for the EDV workflow """ details = {"routing_key": self.form_fields.get("routing_key")} if isinstance(audit_details, str): details["details"] = audit_details elif isinstance(audit_details, dict): details.update(audit_details) return utils.dict_to_xml_string(self.extension_name, details, False) async def _create_audit_record( self, audit_action_name: str, audit_details: Optional[Union[dict, str]] = None ): """ Shared helper for handing audit record creation for the EDV workflow """ self.logger.debug(f"Creating audit record for {audit_action_name}") details = await self._create_audit_details(audit_details) audit_param = CreateAuditRecord.ParamsObj( self.logger, self.hq_credentials, self.online_session.workstation, self.online_user.customer_id, self.online_user.user_id, self.online_user.user_logon_id, None, None, 0, None, "OnlineBanking", self.online_session.session_id, audit_action_name, self.request.remote_ip, audit_details=details, exception_message=None, ) # -1022: Tac Target MFA is required. # -1023: Single Token MFA is required. # -1024: Dual Token MFA is required. # -1090: External MFA is required. # -1629: Action Denied Due To Suspect Login edv_codes = [-1022, -1023, -1024, -1090] edv_denied_code = -1629 audit_response = await CreateAuditRecord.execute(audit_param) return_code = int(audit_response.result_node.Result.HydraErrorReturnCode) external_mfa_short_name = ( audit_response.result_node.Data.ExternalMfaData.Result.ExternalMfaShortName if return_code == -1090 else None ) return { "return_code": return_code, "is_edv_code": return_code in edv_codes, "is_edv_denied": return_code == edv_denied_code, "is_external_mfa": return_code == -1090, "external_mfa_short_name": external_mfa_short_name, } async def _get_mfa_type(self): """ Shared helper for retrieving the MFA type for the MFA workflow """ self.logger.debug("Retrieving MFA type from HQ") try: return_code_map = { "Tac": -1022, # Tac Target MFA is required. "SingleToken": -1023, # Single Token MFA is required "DualToken": -1024, # Dual Token MFA is required. "ExternalMfa": -1090, # External MFA is required. } mfa_type_param = GetEdvMfaType.ParamsObj(self.logger, self.hq_credentials) mfa_type_response = await GetEdvMfaType.execute(mfa_type_param) return_code = return_code_map[ mfa_type_response.result_node.Data.Details.Data.EdvMfaType ] return { "hq_check_successful": True, "return_code": return_code, "is_external_mfa": return_code == -1090, "external_mfa_short_name": mfa_type_response.result_node.Data.Details.Data.ExtMfaFormShortName if return_code == -1090 else None, } except Exception: self.logger.debug("Unable to retrieve MFA type from HQ") return { "hq_check_successful": False, "return_code": -1022, "is_external_mfa": False, "external_mfa_short_name": None, } def _set_mfa_info(self): """ Sets the MFA info for MFA extensions """ mfa_context = self.form_fields.get("extMfaTokenType") transaction_ids = self.form_fields.get("extMfaTxnIds") audit_action_id = self.form_fields.get("extMfaAuditActionId") audit_action_name = self.form_fields.get("extMfaAuditActionName") sso_identifier = self.form_fields.get("extMfaSsoIdentifier") registration_value = self.form_fields.get("extMfaRegistrationValue") return { key: value for key, value in [ ("mfa_context", mfa_context), ( "transaction_ids", transaction_ids.split(",") if transaction_ids else None, ), ( "audit_action_id", audit_action_id if not audit_action_id == "0" else None, ), ("audit_action_name", audit_action_name), ("sso_identifier", sso_identifier), ("registration_value", registration_value), ] if value is not None }