import base64
from enum import Enum
import json
import logging
import copy
from typing import List, Optional, Union
from q2_sdk.core.cli import colored, textui
from q2_sdk.core.configuration import get_settings
from q2_sdk.core.exceptions import DatabaseDataError, TectonError
from q2_sdk.core.http_handlers.base_handler import ConfigurationCheck
from q2_sdk.core.http_handlers.online_handler import Q2OnlineRequestHandler
from q2_sdk.core.rate_limiter import RateLimiter
from q2_sdk.entrypoints import bounce_stack
from q2_sdk.hq.db.data_feed import DataFeed
from q2_sdk.hq.db.user_data import UserData
from q2_sdk.hq.db.mfa import Mfa
from q2_sdk.hq.db.wedge_address import WedgeAddress
from q2_sdk.hq.hq_api.wedge_online_banking import (
ValidateEdvSecureAccessCode,
ValidateAuthAccessCode,
ValidateEdvToken,
SetExternalMfaToken,
CreateAuditRecord,
GetEdvMfaType,
)
from q2_sdk.models.tecton import BadRequest, Success, NotFound, InternalServerError
from q2_sdk.tools import utils
from q2_sdk.core.exceptions import ConfigurationError
MFA_CACHE_KEY = "MfaValidation_"
MFA_VALID_TIME = 300 # 5 minutes
SECURE_ACCESS_CODE_ERROR = TectonError(
"Validate secure access code",
{"message": "Failed to validate token", "valid": False},
)
class MFAMessage(Enum):
NOT_FOUND = "Multifactor authentication validation token not found."
DENIED = "Action denied due to suspect login."
EXITED = "Multifactor authentication workflow exited or failed."
NO_ERROR_HANDLER = "No error handler provided to MFA decorator."
REQUIRED = "MFA required. Proceeding with multifactor authentication."
VALIDATED = "MFA validated. Proceeding with request."
class InstallerOperation(Enum):
INSTALL = "install"
UNINSTALL = "uninstall"
[docs]
class Q2TectonBaseRequestHandler(Q2OnlineRequestHandler):
"""
RequestHandler meant to be used for all Tecton related requests
"""
TECTON_USER_CONFIG_PREFIX = "tecton_user_config"
IS_UNAUTHENTICATED = False
TECTON_BETA = False
IS_MFA = False
MFA_REQUIRES_REGISTRATION = False
MFA_TOKEN_LIFETIME_IN_MINUTES = 5
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self._logger_rate_limiter = RateLimiter(
self.logger,
max_tokens=10,
refill_period=5,
refill_amount=2,
request=request,
name="log_to_server",
)
self.expand_local_sources = False
if self.IS_UNAUTHENTICATED:
self.allow_non_q2_traffic = True
if self.IS_MFA:
self.mfa_info = self._set_mfa_info()
self._unparsed_routes.append("default/feature_manifest")
self._user_data_short_name = f"SDK/{self.extension_name}_user_config"
self.platform_info = None
[docs]
async def prepare(self):
await super().prepare()
if self.form_fields.get("routing_key") == "default/log_to_server":
self.rate_limiters.append(self._logger_rate_limiter)
@property
def router(self):
router = super().router
router.update({
"default/authorize_transaction_with_mfa": self.authorize_transaction_with_mfa,
"default/mfa_validate": self.mfa_validate,
"default/feature_manifest": self.get_feature_manifest,
"default/log_to_server": self.log_to_server,
"default/get_user_config": self.get_module_user_config_data,
"default/set_user_config": self.set_module_user_config_data,
"default/remove_user_config": self.remove_module_user_config_data,
})
return router
[docs]
async def q2_post(self, *args, **kwargs):
if "routing_key" in kwargs and "routing_key" not in self.form_fields:
self.form_fields["routing_key"] = kwargs["routing_key"]
return await super().q2_post(*args, **kwargs)
async def _get_upload_url(self):
response = await super()._get_upload_url()
return json.dumps(Success(json.loads(response)).to_json())
def _parse_tecton_payload(self) -> Optional[dict]:
self.logger.debug(self.form_fields)
encoded_data = self.form_fields.get("data")
data = {}
if encoded_data:
replacements = ((b"\xa0", b" "),)
base64_decoded_bytes = base64.b64decode(encoded_data)
for repl in replacements:
base64_decoded_bytes = base64_decoded_bytes.replace(repl[0], repl[1])
data_json = base64_decoded_bytes.decode("utf8")
self.logger.debug("Tecton request data=%s", data_json)
data = json.loads(data_json)
if "tectonRequestMeta" in data:
self.platform_info = data["tectonRequestMeta"]
del data["tectonRequestMeta"]
data.update({"routing_key": self.form_fields.get("routing_key")})
else:
data.update(self.form_fields)
return data
[docs]
def validate_mfa_token(self):
"""Checks if the MFA key is within cache"""
session_id = ""
valid = False
if self.online_session:
session_id = self.online_session.session_id
cache_key = f"{MFA_CACHE_KEY}{session_id}"
if session_id and self.cache.get(cache_key):
valid = True
return valid
def log_to_server(self):
if self._logger_rate_limiter in self.rate_limiters:
form_fields = self.form_fields
if "data" in form_fields:
form_fields = self._parse_tecton_payload()
if "message" in form_fields:
level = form_fields.get("logLevel", "info").upper()
message = form_fields.get("message", "")
self.logger.log(logging.getLevelName(level), f"Frontend: {message}")
return "{}"
def get_feature_manifest(self):
return json.dumps(utils.get_feature_manifest(self.extension_name))
def _user_config_storage_name(self, namespace: str) -> str:
return f"{self.TECTON_USER_CONFIG_PREFIX}_{namespace}"
async def _get_user_configuration(
self, user_config_short_name: str
) -> Union[dict, None]:
user_data_obj = UserData(self.logger, self.hq_credentials)
user_config_rows = await user_data_obj.get(
self.online_user.user_id, user_config_short_name
)
if len(user_config_rows) == 0:
return
assert len(user_config_rows) == 1, (
"There should only be one user config per user in an extension"
)
return json.loads(str(user_config_rows[0].GTDataValue))
async def get_module_user_config_data(self):
data = await self._get_user_configuration(
self._user_config_storage_name(self.form_fields.get("namespace"))
)
return_data = {} if data is None else data
return json.dumps({"status": 200, "data": return_data})
async def set_module_user_config_data(self):
key = self.form_fields.get("key")
value = self.form_fields.get("value")
namespace = self.form_fields.get("namespace")
configuration_save_key = self._user_config_storage_name(namespace)
user_id = self.online_user.user_id
existing_user_configuration = await self._get_user_configuration(
configuration_save_key
)
user_data_obj = UserData(self.logger, self.hq_credentials)
if existing_user_configuration is None:
await user_data_obj.create(user_id, configuration_save_key, "{}")
existing_user_configuration = {}
new_configuration = existing_user_configuration.copy()
new_configuration[key] = value
await user_data_obj.update(
user_id, configuration_save_key, json.dumps(new_configuration)
)
return '{"status": 200}'
async def remove_module_user_config_data(self):
key = self.form_fields.get("key")
configuration_save_key = self._user_config_storage_name(
self.form_fields.get("namespace")
)
user_data_obj = UserData(self.logger, self.hq_credentials)
existing_user_configuration = await self._get_user_configuration(
configuration_save_key
)
try:
existing_user_configuration.pop(key)
await user_data_obj.update(
self.online_user.user_id,
configuration_save_key,
json.dumps(existing_user_configuration),
)
except KeyError:
pass
return '{"status": 200}'
def _extract_form_fields(self) -> dict:
"""Checks if the user submitted secure access code is valid and stores within cache."""
if "data" in self.form_fields:
form_fields = self._parse_tecton_payload()
else:
form_fields = self.form_fields
return form_fields if form_fields else {}
def _extract_secure_access_code(self) -> str:
form_fields = self._extract_form_fields()
secure_access_code = form_fields.get("secureAccessCode")
if not secure_access_code:
raise TectonError(
"Unable to validate secure access code",
{"message": "Secure access code was empty."},
)
return secure_access_code
def _extract_mfa_info(self) -> tuple:
form_fields = self._extract_form_fields()
mfa_validation_type = form_fields.get("MFAValidationType", "sac")
one_time_password1 = form_fields.get("oneTimePassword1")
one_time_password2 = form_fields.get("oneTimePassword2")
secure_access_code = form_fields.get("secureAccessCode")
mfa_value = one_time_password1 or secure_access_code or one_time_password2
if not mfa_value:
raise TectonError(
"Unable to validate",
{"message": "No access code or token was passed."},
)
return (
mfa_validation_type,
one_time_password1,
one_time_password2,
secure_access_code,
)
# THIS APPEARS TO NO LONGER BE IN USE AFTER UUX 4.4.0.132
async def authorize_transaction_with_mfa(self):
secure_access_code = self._extract_secure_access_code()
transaction_id = self._extract_form_fields().get("transactionId")
params = ValidateAuthAccessCode.ParamsObj(
self.logger, self.hq_credentials, transaction_id, secure_access_code
)
response = await ValidateAuthAccessCode.execute(params)
if response.success:
self.logger.info(
f"validated transaction <id: {transaction_id}> with secure access code for login: <{self.online_user.login_name}>"
)
return '{"data": {"valid": true}}'
self.logger.error(
f"Secure access code failed when validating transaction <id: {transaction_id}> for login: <{self.online_user.login_name}>"
)
raise SECURE_ACCESS_CODE_ERROR
async def mfa_validate(self):
(
mfa_validation_type,
one_time_password1,
one_time_password2,
secure_access_code,
) = self._extract_mfa_info()
if mfa_validation_type == "token":
params = ValidateEdvToken.ParamsObj(
logger=self.logger,
hq_credentials=self.hq_credentials,
one_time_password1=one_time_password1,
one_time_password2=one_time_password2,
)
response = await ValidateEdvToken.execute(params)
else:
params = ValidateEdvSecureAccessCode.ParamsObj(
logger=self.logger,
hq_credentials=self.hq_credentials,
secure_access_code=secure_access_code,
)
response = await ValidateEdvSecureAccessCode.execute(params)
if response.success:
self.cache.set(
f"{MFA_CACHE_KEY}{self.online_session.session_id}", "1", MFA_VALID_TIME
)
self.logger.info("Validated MFA for: %s", self.online_user.login_name)
return '{"data": {"valid": true}}'
self.logger.error(
"MFA failed to validate for login: %s",
self.online_user.login_name,
)
raise TectonError(
"Unable to validate MFA",
{
"message": "Failed to validate SAC or token",
"errorReturnCode": response.error_code,
"valid": False,
},
)
@classmethod
async def validate_configuration(cls) -> List[ConfigurationCheck]:
config_checks = await super().validate_configuration()
config_checks.append(await cls._validate_auth_type_setting())
config_checks.append(await cls._validate_datafeed())
config_checks.append(await cls._validate_mfa_provider())
return config_checks
@classmethod
async def _validate_datafeed(cls) -> ConfigurationCheck:
hq_credentials = get_settings().HQ_CREDENTIALS
should_have_datafeed = False
if cls.IS_UNAUTHENTICATED:
wa_obj = WedgeAddress(
None, hq_credentials=hq_credentials, ret_table_obj=True
)
try:
await wa_obj.get_by_name(cls.extension_name)
should_have_datafeed = True
except DatabaseDataError:
should_have_datafeed = False
configuration_check = ConfigurationCheck(True)
if should_have_datafeed:
df_obj = DataFeed(None, hq_credentials=hq_credentials, ret_table_obj=True)
try:
await df_obj.get_by_name(cls.extension_name)
has_datafeed = True
except DatabaseDataError:
has_datafeed = False
if not has_datafeed:
configuration_check = ConfigurationCheck(
False,
"Datafeed must be installed (and stack bounced) if extension is unauthenticated",
cls._fix_datafeed(InstallerOperation.INSTALL),
)
return configuration_check
@classmethod
async def _fix_datafeed(cls, operation: InstallerOperation):
hq_credentials = get_settings().HQ_CREDENTIALS
df_obj = DataFeed(None, hq_credentials=hq_credentials, ret_table_obj=True)
match operation: # noqa: E999
case InstallerOperation.INSTALL:
await df_obj.create(
cls.extension_name,
description="SDK Passthrough Route",
wedge_path_name=cls.extension_name,
)
textui.puts(colored.green("Datafeed Installed"))
await bounce_stack.bounce_stack(None)
case InstallerOperation.UNINSTALL:
await df_obj.delete(cls.extension_name, remove_wedge_address=False)
textui.puts(colored.green("Datafeed Uninstalled"))
@classmethod
async def _validate_mfa_provider(cls) -> ConfigurationCheck:
hq_credentials = get_settings().HQ_CREDENTIALS
should_have_mfa_provider = False
if cls.IS_MFA:
wa_obj = WedgeAddress(
None, hq_credentials=hq_credentials, ret_table_obj=True
)
try:
await wa_obj.get_by_name(cls.extension_name)
should_have_mfa_provider = True
except DatabaseDataError:
should_have_mfa_provider = False
configuration_check = ConfigurationCheck(True)
if should_have_mfa_provider:
mfa_obj = Mfa(None, hq_credentials=hq_credentials, ret_table_obj=True)
try:
await mfa_obj.get_providers(short_name=cls.extension_name)
has_mfa_provider = True
except DatabaseDataError:
has_mfa_provider = False
if not has_mfa_provider:
configuration_check = ConfigurationCheck(
False,
"MFA Provider record must be added (and stack bounced) if MFA extension",
cls._fix_mfa_provider(InstallerOperation.INSTALL),
)
return configuration_check
@classmethod
async def _fix_mfa_provider(cls, operation: InstallerOperation):
hq_credentials = get_settings().HQ_CREDENTIALS
mfa_obj = Mfa(None, hq_credentials=hq_credentials, ret_table_obj=True)
match operation: # noqa: E999
case InstallerOperation.INSTALL:
await mfa_obj.add_provider(
short_name=cls.extension_name,
wedge_address_short_name=cls.extension_name,
registration_required=cls.MFA_REQUIRES_REGISTRATION,
token_lifetime_in_minutes=cls.MFA_TOKEN_LIFETIME_IN_MINUTES,
)
textui.puts(colored.green("MFA Provider Installed"))
await bounce_stack.bounce_stack(None)
case InstallerOperation.UNINSTALL:
await mfa_obj.remove_provider(mfa_provider_name=cls.extension_name)
textui.puts(colored.green("MFA Provider Uninstalled"))
@classmethod
async def _validate_auth_type_setting(cls) -> ConfigurationCheck:
class_is_unauth = cls.IS_UNAUTHENTICATED
class_is_mfa = cls.IS_MFA
if class_is_unauth and class_is_mfa:
err = f"{cls.extension_name} cannot be both unauthenticated and MFA. Please set either IS_UNAUTHENTICATED or IS_MFA to False in your extension.py file."
raise ConfigurationError(err)
else:
return ConfigurationCheck(True)
[docs]
async def set_external_mfa_token(
self, token: str
) -> Union[Success, InternalServerError, NotFound]:
"""
Sets an external MFA token with HQ and accounts for possible pre-auth token
"""
if self.IS_MFA and token:
credentials = copy.deepcopy(self.hq_credentials)
credentials.auth_token = self.request_name_elem.get("HqAuthToken")
params_obj = SetExternalMfaToken.ParamsObj(
self.logger,
hq_credentials=credentials,
external_mfa_token=token,
)
hq_response = await SetExternalMfaToken.execute(params_obj)
if hq_response.success:
return Success(token)
else:
return InternalServerError(hq_response.error_message)
elif self.IS_MFA and not token:
return BadRequest("No token provided.")
else:
return NotFound("This extension is not enabled as a MFA extension.")
async def _create_audit_details(
self, audit_details: Optional[Union[dict, str]] = None
):
"""
Helper for creating and formatting audit details for the EDV workflow
"""
details = {"routing_key": self.form_fields.get("routing_key")}
if isinstance(audit_details, str):
details["details"] = audit_details
elif isinstance(audit_details, dict):
details.update(audit_details)
return utils.dict_to_xml_string(self.extension_name, details, False)
async def _create_audit_record(
self, audit_action_name: str, audit_details: Optional[Union[dict, str]] = None
):
"""
Shared helper for handing audit record creation for the EDV workflow
"""
self.logger.debug(f"Creating audit record for {audit_action_name}")
details = await self._create_audit_details(audit_details)
audit_param = CreateAuditRecord.ParamsObj(
self.logger,
self.hq_credentials,
self.online_session.workstation,
self.online_user.customer_id,
self.online_user.user_id,
self.online_user.user_logon_id,
None,
None,
0,
None,
"OnlineBanking",
self.online_session.session_id,
audit_action_name,
self.request.remote_ip,
audit_details=details,
exception_message=None,
)
# -1022: Tac Target MFA is required.
# -1023: Single Token MFA is required.
# -1024: Dual Token MFA is required.
# -1090: External MFA is required.
# -1629: Action Denied Due To Suspect Login
edv_codes = [-1022, -1023, -1024, -1090]
edv_denied_code = -1629
audit_response = await CreateAuditRecord.execute(audit_param)
return_code = int(audit_response.result_node.Result.HydraErrorReturnCode)
external_mfa_short_name = (
audit_response.result_node.Data.ExternalMfaData.Result.ExternalMfaShortName
if return_code == -1090
else None
)
return {
"return_code": return_code,
"is_edv_code": return_code in edv_codes,
"is_edv_denied": return_code == edv_denied_code,
"is_external_mfa": return_code == -1090,
"external_mfa_short_name": external_mfa_short_name,
}
async def _get_mfa_type(self):
"""
Shared helper for retrieving the MFA type for the MFA workflow
"""
self.logger.debug("Retrieving MFA type from HQ")
try:
return_code_map = {
"Tac": -1022, # Tac Target MFA is required.
"SingleToken": -1023, # Single Token MFA is required
"DualToken": -1024, # Dual Token MFA is required.
"ExternalMfa": -1090, # External MFA is required.
}
mfa_type_param = GetEdvMfaType.ParamsObj(self.logger, self.hq_credentials)
mfa_type_response = await GetEdvMfaType.execute(mfa_type_param)
return_code = return_code_map[
mfa_type_response.result_node.Data.Details.Data.EdvMfaType
]
return {
"hq_check_successful": True,
"return_code": return_code,
"is_external_mfa": return_code == -1090,
"external_mfa_short_name": mfa_type_response.result_node.Data.Details.Data.ExtMfaFormShortName
if return_code == -1090
else None,
}
except Exception:
self.logger.debug("Unable to retrieve MFA type from HQ")
return {
"hq_check_successful": False,
"return_code": -1022,
"is_external_mfa": False,
"external_mfa_short_name": None,
}
def _set_mfa_info(self):
"""
Sets the MFA info for MFA extensions
"""
mfa_context = self.form_fields.get("extMfaTokenType")
transaction_ids = self.form_fields.get("extMfaTxnIds")
audit_action_id = self.form_fields.get("extMfaAuditActionId")
audit_action_name = self.form_fields.get("extMfaAuditActionName")
sso_identifier = self.form_fields.get("extMfaSsoIdentifier")
registration_value = self.form_fields.get("extMfaRegistrationValue")
return {
key: value
for key, value in [
("mfa_context", mfa_context),
(
"transaction_ids",
transaction_ids.split(",") if transaction_ids else None,
),
(
"audit_action_id",
audit_action_id if not audit_action_id == "0" else None,
),
("audit_action_name", audit_action_name),
("sso_identifier", sso_identifier),
("registration_value", registration_value),
]
if value is not None
}