Source code for q2_sdk.core.http_handlers.tecton_client_handler

import json
import functools
from packaging import version
from typing import Optional, Callable, Union

from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.tecton_base_handler import (
    Q2TectonBaseRequestHandler,
    MFAMessage,
)
from q2_sdk.models.tecton import InternalServerError, Success, BadRequest


def inner_mfa_function(
    func,
    error_handler: Optional[Callable] = None,
    audit_action_name: Optional[str] = None,
    audit_details: Optional[Union[dict, str, Callable]] = None,
):
    """
    Decorator for checking if session has been validated by the MFA workflow
    """

    async def wrapper(handler):
        validated = handler.validate_mfa_token()
        if validated:
            handler.logger.debug(MFAMessage.VALIDATED.value)
            return await func(handler)
        else:
            handler.logger.debug(MFAMessage.NOT_FOUND.value)

        if handler.platform_info.get("version"):
            platform_version = version.parse(handler.platform_info.get("version"))
            # requestExtensionData MFA support added in UUX 4.6.1.4
            if platform_version >= version.parse("4.6.1.4"):
                if audit_action_name:
                    details = (
                        audit_details(handler)
                        if callable(audit_details)
                        else audit_details
                    )
                    audit_response = await handler._create_audit_record(
                        audit_action_name=audit_action_name,
                        audit_details=details,
                    )

                    if audit_response.get("is_edv_code"):
                        handler.logger.debug(MFAMessage.REQUIRED.value)
                        response = {
                            "errorReturnCode": audit_response.get("return_code"),
                        }
                        if audit_response.get("is_external_mfa"):
                            response["externalMfaShortName"] = str(
                                audit_response.get("external_mfa_short_name")
                            )
                        return Success(response)
                    elif audit_response.get("is_edv_denied"):
                        handler.logger.error(MFAMessage.DENIED.value)
                        if error_handler:
                            return error_handler(handler, MFAMessage.DENIED.value)
                        else:
                            handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
                            return BadRequest(MFAMessage.DENIED.value)
                    else:
                        handler.logger.debug(MFAMessage.VALIDATED.value)
                        return await func(handler)
                else:
                    mfa_type_response = await handler._get_mfa_type()
                    response = {
                        "errorReturnCode": mfa_type_response.get("return_code"),
                    }
                    if mfa_type_response.get("is_external_mfa"):
                        response["externalMfaShortName"] = str(
                            mfa_type_response.get("external_mfa_short_name")
                        )

                    return Success(response)

        if error_handler:
            return error_handler(handler, MFAMessage.NOT_FOUND.value)
        else:
            handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
            return BadRequest(MFAMessage.NOT_FOUND.value)

    return wrapper


def mfa_validation_required(
    error_handler: Optional[Callable] = None,
    audit_action_name: Optional[str] = None,
    audit_details: Optional[Union[dict, str, Callable]] = None,
):
    return functools.partial(
        inner_mfa_function,
        error_handler=error_handler,
        audit_action_name=audit_action_name,
        audit_details=audit_details,
    )


[docs] class Q2TectonClientRequestHandler(Q2TectonBaseRequestHandler): """ RequestHandler meant to be used for requests incoming from Online using the Tecton Client Side rendered content """ OPTIONAL_CONFIGURATIONS = {"FEATURE": None} def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.return_as_html = False
[docs] async def q2_post(self, *args, **kwargs): self.form_fields = self._parse_tecton_payload() try: route_response = await self.route_request() except TectonError as exc: self.logger.error(exc) return json.dumps( InternalServerError( exc.args[0], exc.args[1] if len(exc.args) > 1 else None ).to_json() ) if isinstance(route_response, dict): json_dump = json.dumps(Success(route_response).to_json()) elif isinstance(route_response, str): json_dump = route_response else: json_dump = json.dumps(vars(route_response)) self.logger.debug("Tecton response: %s", json_dump) return json_dump if json_dump else ""
async def mfa_validate(self): return json.loads(await super().mfa_validate())["data"]