import functools
import json
from typing import List, Optional, Callable, Union
from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.base_handler import ConfigurationCheck
from q2_sdk.core.http_handlers.tecton_base_handler import (
Q2TectonBaseRequestHandler,
MFAMessage,
)
from q2_sdk.models.tecton import InternalServerError
from q2_sdk.tools import utils, jinja
from q2_sdk.core.cli import colored, textui
from q2_sdk.ui import forms
def inner_mfa_function(
func,
error_handler: Optional[Callable] = None,
audit_action_name: Optional[str] = None,
audit_details: Optional[Union[dict, str, Callable]] = None,
):
"""
Decorator for checking if session has been validated by the MFA workflow
"""
async def wrapper(handler):
validated = handler.validate_mfa_token()
if validated:
handler.logger.debug(MFAMessage.VALIDATED.value)
return await func(handler)
else:
handler.logger.debug(MFAMessage.NOT_FOUND.value)
if handler.form_fields.get("mfa_exited"):
del handler.form_fields["mfa_exited"]
handler.logger.error(MFAMessage.EXITED.value)
if error_handler:
return error_handler(handler, MFAMessage.EXITED.value)
else:
handler.logger.warning(MFAMessage.NO_ERROR_HANDLER.value)
else:
if audit_action_name:
details = (
audit_details(handler) if callable(audit_details) else audit_details
)
audit_response = await handler._create_audit_record(
audit_action_name=audit_action_name,
audit_details=details,
)
if audit_response.get("is_edv_code"):
handler.logger.debug(MFAMessage.REQUIRED.value)
html = handler._generate_mfa_html(audit_response)
return html
elif audit_response.get("is_edv_denied"):
handler.logger.error(MFAMessage.DENIED.value)
if error_handler:
return error_handler(handler, MFAMessage.DENIED.value)
else:
handler.logger.debug(MFAMessage.NO_ERROR_HANDLER.value)
else:
handler.logger.debug(MFAMessage.VALIDATED.value)
return await func(handler)
else:
mfa_type_response = await handler._get_mfa_type()
html = handler._generate_mfa_html(mfa_type_response)
return html
return wrapper
def mfa_validation_required(
error_handler: Optional[Callable] = None,
audit_action_name: Optional[str] = None,
audit_details: Optional[Union[dict, str, Callable]] = None,
):
return functools.partial(
inner_mfa_function,
error_handler=error_handler,
audit_action_name=audit_action_name,
audit_details=audit_details,
)
[docs]
class Q2TectonServerRequestHandler(Q2TectonBaseRequestHandler):
"""
RequestHandler meant to be used for requests incoming for Tecton Server Side Rendered Content.
"""
def __init__(self, application, request, **kwargs):
self.auth_tokens = []
super().__init__(application, request, **kwargs)
async def mfa_validate(self):
try:
self.return_as_html = False
return await super().mfa_validate()
except TectonError as exc:
return json.dumps(
InternalServerError(
exc.args[0], exc.args[1] if len(exc.args) > 1 else None
).to_json()
)
def _generate_mfa_html(self, response):
replay_fields = {
key: value
for key, value in self.form_fields.items()
if not key.startswith("tct-")
}
replay_fields["q2token"] = self.online_session.session_id
prompt_options = {"code": response.get("return_code")}
if response.get("is_external_mfa"):
prompt_options["externalMfaShortName"] = response.get(
"external_mfa_short_name"
)
html = jinja.jinja_generate(
"q2_mfa_handler.html.jinja2",
{
"extension_name": self.extension_name,
"form_fields": replay_fields,
"prompt_options": prompt_options,
"tecton_url": self.TECTON_URL,
},
)
return html
@classmethod
async def _fix_manifest_url_paths(cls):
from q2_sdk.entrypoints import tecton
conf_file_name = cls.CONFIG_FILE_NAME or cls.extension_name
conf_file = tecton.import_config_file_as_dict(conf_file_name)
manifest = conf_file["FEATURE"]
for mod in manifest["modules"]:
tecton.set_url_paths(
config=manifest,
entrypoint_name=mod,
is_server_side=True,
set_unauth=cls.IS_UNAUTHENTICATED,
set_mfa=cls.IS_MFA,
)
tecton.write_config_file(cls.extension_name, conf_file, log=False)
textui.puts(colored.green(f"Fixes applied to {conf_file_name} config file"))
@classmethod
async def validate_configuration(cls) -> List[ConfigurationCheck]:
config_checks = await super().validate_configuration()
config_checks.append(cls._validate_feature_manifest())
return config_checks
@classmethod
def _validate_feature_manifest(cls) -> ConfigurationCheck:
manifest = utils.get_feature_manifest(cls.extension_name)
configuration_check = ConfigurationCheck(True)
for module in manifest["modules"].values():
url_is_unauth = module["url"].startswith("./sdk/{featureName}")
url_is_mfa = module["url"].startswith("./sdk/mfa/{featureName}")
class_is_unauth = cls.IS_UNAUTHENTICATED
class_is_mfa = cls.IS_MFA
unauth_url_discrepancy = (
class_is_unauth and class_is_unauth != url_is_unauth
)
mfa_url_discrepancy = class_is_mfa and class_is_mfa != url_is_mfa
if unauth_url_discrepancy:
return ConfigurationCheck(
False,
(
"Authentication choice in Class must match Tecton config\n"
f"Class is unauth: {class_is_unauth}\n"
f"Tecton config is unauth: {url_is_unauth}"
),
cls._fix_manifest_url_paths(),
)
if mfa_url_discrepancy:
return ConfigurationCheck(
False,
(
"MFA choice in Class must match Tecton config\n"
f"Class is MFA: {class_is_mfa}\n"
f"Tecton config is MFA: {url_is_mfa}"
),
cls._fix_manifest_url_paths(),
)
return configuration_check