import json
import functools
from packaging import version
from typing import Optional, Callable, Union
from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.tecton_base_handler import (
Q2TectonBaseRequestHandler,
MFAMessage,
)
from q2_sdk.models.tecton import InternalServerError, Success, BadRequest
def inner_mfa_function(
func,
error_handler: Optional[Callable] = None,
audit_action_name: Optional[str] = None,
audit_details: Optional[Union[dict, str, Callable]] = None,
):
"""
Decorator for checking if session has been validated by the MFA workflow
"""
async def wrapper(handler):
validated = handler.validate_mfa_token()
if validated:
handler.logger.debug(MFAMessage.VALIDATED.value)
return await func(handler)
else:
handler.logger.debug(MFAMessage.NOT_FOUND.value)
if handler.platform_info.get("version"):
platform_version = version.parse(handler.platform_info.get("version"))
# requestExtensionData MFA support added in UUX 4.6.1.4
if platform_version >= version.parse("4.6.1.4"):
if audit_action_name:
details = (
audit_details(handler)
if callable(audit_details)
else audit_details
)
audit_response = await handler._create_audit_record(
audit_action_name=audit_action_name,
audit_details=details,
)
if audit_response.get("is_edv_code"):
handler.logger.debug(MFAMessage.REQUIRED.value)
response = {
"errorReturnCode": audit_response.get("return_code"),
}
if audit_response.get("is_external_mfa"):
response["externalMfaShortName"] = str(
audit_response.get("external_mfa_short_name")
)
return Success(response)
elif audit_response.get("is_edv_denied"):
handler.logger.error(MFAMessage.DENIED.value)
if error_handler:
return error_handler(handler, MFAMessage.DENIED.value)
else:
handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
return BadRequest(MFAMessage.DENIED.value)
else:
handler.logger.debug(MFAMessage.VALIDATED.value)
return await func(handler)
else:
mfa_type_response = await handler._get_mfa_type()
response = {
"errorReturnCode": mfa_type_response.get("return_code"),
}
if mfa_type_response.get("is_external_mfa"):
response["externalMfaShortName"] = str(
mfa_type_response.get("external_mfa_short_name")
)
return Success(response)
if error_handler:
return error_handler(handler, MFAMessage.NOT_FOUND.value)
else:
handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
return BadRequest(MFAMessage.NOT_FOUND.value)
return wrapper
def mfa_validation_required(
error_handler: Optional[Callable] = None,
audit_action_name: Optional[str] = None,
audit_details: Optional[Union[dict, str, Callable]] = None,
):
return functools.partial(
inner_mfa_function,
error_handler=error_handler,
audit_action_name=audit_action_name,
audit_details=audit_details,
)
[docs]
class Q2TectonClientRequestHandler(Q2TectonBaseRequestHandler):
"""
RequestHandler meant to be used for requests incoming from Online using the Tecton Client Side rendered content
"""
OPTIONAL_CONFIGURATIONS = {"FEATURE": None}
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.return_as_html = False
[docs]
async def q2_post(self, *args, **kwargs):
self.form_fields = self._parse_tecton_payload()
try:
route_response = await self.route_request()
except TectonError as exc:
self.logger.error(exc)
return json.dumps(
InternalServerError(
exc.args[0], exc.args[1] if len(exc.args) > 1 else None
).to_json()
)
if isinstance(route_response, dict):
json_dump = json.dumps(Success(route_response).to_json())
elif isinstance(route_response, str):
json_dump = route_response
else:
json_dump = json.dumps(vars(route_response))
self.logger.debug("Tecton response: %s", json_dump)
return json_dump if json_dump else ""
async def mfa_validate(self):
return json.loads(await super().mfa_validate())["data"]