Source code for q2_sdk.core.http_handlers.tecton_server_handler

import functools
import json
from typing import List, Optional, Callable, Union

from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.base_handler import ConfigurationCheck
from q2_sdk.core.http_handlers.tecton_base_handler import (
    Q2TectonBaseRequestHandler,
    MFAMessage,
)
from q2_sdk.models.tecton import InternalServerError
from q2_sdk.tools import utils, jinja
from q2_sdk.core.cli import colored, textui
from q2_sdk.ui import forms


def inner_mfa_function(
    func,
    error_handler: Optional[Callable] = None,
    audit_action_name: Optional[str] = None,
    audit_details: Optional[Union[dict, str, Callable]] = None,
):
    """
    Decorator for checking if session has been validated by the MFA workflow
    """

    async def wrapper(handler):
        validated = handler.validate_mfa_token()
        if validated:
            handler.logger.debug(MFAMessage.VALIDATED.value)
            return await func(handler)
        else:
            handler.logger.debug(MFAMessage.NOT_FOUND.value)

        if handler.form_fields.get("mfa_exited"):
            del handler.form_fields["mfa_exited"]
            handler.logger.error(MFAMessage.EXITED.value)
            if error_handler:
                return error_handler(handler, MFAMessage.EXITED.value)
            else:
                handler.logger.warning(MFAMessage.NO_ERROR_HANDLER.value)
        else:
            if audit_action_name:
                details = (
                    audit_details(handler) if callable(audit_details) else audit_details
                )
                audit_response = await handler._create_audit_record(
                    audit_action_name=audit_action_name,
                    audit_details=details,
                )
                if audit_response.get("is_edv_code"):
                    handler.logger.debug(MFAMessage.REQUIRED.value)
                    html = handler._generate_mfa_html(audit_response)
                    return html
                elif audit_response.get("is_edv_denied"):
                    handler.logger.error(MFAMessage.DENIED.value)
                    if error_handler:
                        return error_handler(handler, MFAMessage.DENIED.value)
                    else:
                        handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
                else:
                    handler.logger.debug(MFAMessage.VALIDATED.value)
                    return await func(handler)
            else:
                mfa_type_response = await handler._get_mfa_type()
                html = handler._generate_mfa_html(mfa_type_response)
                return html

    return wrapper


def mfa_validation_required(
    error_handler: Optional[Callable] = None,
    audit_action_name: Optional[str] = None,
    audit_details: Optional[Union[dict, str, Callable]] = None,
):
    return functools.partial(
        inner_mfa_function,
        error_handler=error_handler,
        audit_action_name=audit_action_name,
        audit_details=audit_details,
    )


[docs] class Q2TectonServerRequestHandler(Q2TectonBaseRequestHandler): """ RequestHandler meant to be used for requests incoming for Tecton Server Side Rendered Content. """ def __init__(self, application, request, **kwargs): self.auth_tokens = [] super().__init__(application, request, **kwargs)
[docs] def get_tecton_form( self, header, head_content: str = "", foot_content: str = "", page_padding: bool = True, load_platform_css: bool = True, load_utilities: bool = True, load_elements: bool = True, load_default_theme: bool = True, routing_key: str = "", hide_submit_button: bool = False, **kwargs, ) -> forms.Q2TectonForm: """ Returns Q2TectonForm object that is automatically serialized into html when returned from a handler :param header: If populated, is rendered as an <h1> at the top of the page :param head_content: HTML injected into the <head> tag :param foot_content: HTML injected at the bottom of the <html> tag :param page_padding: If True, adds a pad around the Form Element :param load_platform_css: If True, add css variables for default tecton props and the current theme. :param load_utilities: If True, add utility css properties from q2-tecton-utilities.css :param load_elements: If True, add script tags that loads Tecton UI components. :param load_default_theme: If True, add default theme css properties from q2-tecton-theme.css :param routing_key: Creates a hidden input named routing_key which can be used by the extension to determine what step to run next of a multi-step flow. :param hide_submit_button: If True, this form will not have a submit button """ if self.auth_tokens is None: self.auth_tokens = [ { "key": "q2token", "value": self.online_session.session_id if self.online_session else "", } ] elif not any( token.get("value", None) == self.online_session.session_id for token in self.auth_tokens ): self.auth_tokens.append({ "key": "q2token", "value": self.online_session.session_id if self.online_session else "", }) return forms.Q2TectonForm( header, auth_tokens=self.auth_tokens, tecton_version=self.TECTON_URL, tecton_beta=self.TECTON_BETA or False, page_padding=page_padding, load_platform_css=load_platform_css, load_utilities=load_utilities, load_elements=load_elements, load_default_theme=load_default_theme, head_content=head_content, foot_content=foot_content, routing_key=routing_key, hide_submit_button=hide_submit_button, base_assets_url=self.base_assets_url, extension_name=self.form_info.form_short_name if self.form_info else self.extension_name, **kwargs, )
async def mfa_validate(self): try: self.return_as_html = False return await super().mfa_validate() except TectonError as exc: return json.dumps( InternalServerError( exc.args[0], exc.args[1] if len(exc.args) > 1 else None ).to_json() ) def _generate_mfa_html(self, response): replay_fields = { key: value for key, value in self.form_fields.items() if not key.startswith("tct-") } replay_fields["q2token"] = self.online_session.session_id prompt_options = {"code": response.get("return_code")} if response.get("is_external_mfa"): prompt_options["externalMfaShortName"] = response.get( "external_mfa_short_name" ) html = jinja.jinja_generate( "q2_mfa_handler.html.jinja2", { "extension_name": self.extension_name, "form_fields": replay_fields, "prompt_options": prompt_options, "tecton_url": self.TECTON_URL, }, ) return html @classmethod async def _fix_manifest_url_paths(cls): from q2_sdk.entrypoints import tecton conf_file_name = cls.CONFIG_FILE_NAME or cls.extension_name conf_file = tecton.import_config_file_as_dict(conf_file_name) manifest = conf_file["FEATURE"] for mod in manifest["modules"]: tecton.set_url_paths( config=manifest, entrypoint_name=mod, is_server_side=True, set_unauth=cls.IS_UNAUTHENTICATED, set_mfa=cls.IS_MFA, ) tecton.write_config_file(cls.extension_name, conf_file, log=False) textui.puts(colored.green(f"Fixes applied to {conf_file_name} config file")) @classmethod async def validate_configuration(cls) -> List[ConfigurationCheck]: config_checks = await super().validate_configuration() config_checks.append(cls._validate_feature_manifest()) return config_checks @classmethod def _validate_feature_manifest(cls) -> ConfigurationCheck: manifest = utils.get_feature_manifest(cls.extension_name) configuration_check = ConfigurationCheck(True) for module in manifest["modules"].values(): url_is_unauth = module["url"].startswith("./sdk/{featureName}") url_is_mfa = module["url"].startswith("./sdk/mfa/{featureName}") class_is_unauth = cls.IS_UNAUTHENTICATED class_is_mfa = cls.IS_MFA unauth_url_discrepancy = ( class_is_unauth and class_is_unauth != url_is_unauth ) mfa_url_discrepancy = class_is_mfa and class_is_mfa != url_is_mfa if unauth_url_discrepancy: return ConfigurationCheck( False, ( "Authentication choice in Class must match Tecton config\n" f"Class is unauth: {class_is_unauth}\n" f"Tecton config is unauth: {url_is_unauth}" ), cls._fix_manifest_url_paths(), ) if mfa_url_discrepancy: return ConfigurationCheck( False, ( "MFA choice in Class must match Tecton config\n" f"Class is MFA: {class_is_mfa}\n" f"Tecton config is MFA: {url_is_mfa}" ), cls._fix_manifest_url_paths(), ) return configuration_check