Source code for q2_sdk.core.http_handlers.tecton_client_handler

import json
import functools
from packaging import version
from typing import Optional, Callable, Union

from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.tecton_base_handler import (
    Q2TectonBaseRequestHandler,
    MFAMessage,
)
from q2_sdk.models.tecton import InternalServerError, Success, BadRequest


def _mfa_handling_supported(handler):
    """
    Check if the platform version supports requestExtensionData MFA (added in UUX 4.6.1.4).
    """
    if handler.platform_info.get("version"):
        platform_version = version.parse(handler.platform_info.get("version"))
        # requestExtensionData MFA support added in UUX 4.6.1.4
        return platform_version >= version.parse("4.6.1.4")
    return False


async def _handle_audit_and_response(
    handler, func, error_handler, audit_action_name, audit_details
):
    """
    Handle audit record creation and processing the response.
    """
    details = audit_details(handler) if callable(audit_details) else audit_details
    audit_response = await handler._create_audit_record(
        audit_action_name=audit_action_name,
        audit_details=details,
    )

    if audit_response.get("is_edv_code"):
        handler.logger.debug(MFAMessage.REQUIRED.value)
        response = {
            "errorReturnCode": audit_response.get("return_code"),
        }
        if audit_response.get("is_external_mfa"):
            response["externalMfaShortName"] = str(
                audit_response.get("external_mfa_short_name")
            )
        return Success(response)
    elif audit_response.get("is_edv_denied"):
        handler.logger.error(MFAMessage.DENIED.value)
        if error_handler:
            return error_handler(handler, MFAMessage.DENIED.value)
        else:
            handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
            return BadRequest(MFAMessage.DENIED.value)
    else:
        handler.logger.debug(MFAMessage.VALIDATED.value)
        return await func(handler)


def inner_mfa_function(
    func,
    error_handler: Optional[Callable] = None,
    audit_action_name: Optional[str] = None,
    audit_details: Optional[Union[dict, str, Callable]] = None,
):
    """
    Decorator for checking if session has been validated by the MFA workflow
    """

    @functools.wraps(func)
    async def wrapper(handler):
        validated = handler.validate_mfa_token()
        if validated:
            handler.logger.debug(MFAMessage.VALIDATED.value)
            return await func(handler)
        else:
            handler.logger.debug(MFAMessage.NOT_FOUND.value)

        if _mfa_handling_supported(handler):
            if audit_action_name:
                return await _handle_audit_and_response(
                    handler, func, error_handler, audit_action_name, audit_details
                )
            else:
                mfa_type_response = await handler._get_mfa_type()
                response = {
                    "errorReturnCode": mfa_type_response.get("return_code"),
                }
                if mfa_type_response.get("is_external_mfa"):
                    response["externalMfaShortName"] = str(
                        mfa_type_response.get("external_mfa_short_name")
                    )

                return Success(response)

        if error_handler:
            return error_handler(handler, MFAMessage.NOT_FOUND.value)
        else:
            handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
            return BadRequest(MFAMessage.NOT_FOUND.value)

    return wrapper


def mfa_validation_required(
    error_handler: Optional[Callable] = None,
    audit_action_name: Optional[str] = None,
    audit_details: Optional[Union[dict, str, Callable]] = None,
):
    return functools.partial(
        inner_mfa_function,
        error_handler=error_handler,
        audit_action_name=audit_action_name,
        audit_details=audit_details,
    )


def inner_audit_function(
    func,
    audit_action_name: str,
    audit_details: Optional[Union[dict, str, Callable]] = None,
    error_handler: Optional[Callable] = None,
):
    """
    Decorator for creating an audit record and handling edv response as needed
    """

    @functools.wraps(func)
    async def wrapper(handler):
        if _mfa_handling_supported(handler):
            return await _handle_audit_and_response(
                handler, func, error_handler, audit_action_name, audit_details
            )

        if error_handler:
            return error_handler(handler, MFAMessage.NOT_FOUND.value)
        else:
            handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
            return BadRequest(MFAMessage.NOT_FOUND.value)

    return wrapper


def create_audit(
    audit_action_name: str,
    audit_details: Optional[Union[dict, str, Callable]] = None,
    error_handler: Optional[Callable] = None,
):
    return functools.partial(
        inner_audit_function,
        error_handler=error_handler,
        audit_action_name=audit_action_name,
        audit_details=audit_details,
    )


[docs] class Q2TectonClientRequestHandler(Q2TectonBaseRequestHandler): """ RequestHandler meant to be used for requests incoming from Online using the Tecton Client Side rendered content """ OPTIONAL_CONFIGURATIONS = {"FEATURE": None} def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.return_as_html = False
[docs] async def q2_post(self, *args, **kwargs): self.form_fields = self._parse_tecton_payload() try: route_response = await self.route_request() except TectonError as exc: self.logger.error(exc) return json.dumps( InternalServerError( exc.args[0], exc.args[1] if len(exc.args) > 1 else None ).to_json() ) if isinstance(route_response, dict): json_dump = json.dumps(Success(route_response).to_json()) elif isinstance(route_response, str): json_dump = route_response else: json_dump = json.dumps(vars(route_response)) self.logger.debug("Tecton response: %s", json_dump) return json_dump if json_dump else ""
async def mfa_validate(self): return json.loads(await super().mfa_validate())["data"]