Source code for q2_sdk.core.http_handlers.auth_token_handler

from copy import deepcopy
import base64
from dataclasses import dataclass
from lxml import objectify
from q2_sdk.core.http_handlers.adapter_handler import Q2AdapterRequestHandler
from q2_sdk.hq.models.auth_token_incoming_request import AuthTokenRequest
from q2_sdk.core.configuration import settings


@dataclass
class PassthruParams:
    secret: str
    otp: str
    otp2: str = ""


[docs] class Q2AuthTokenRequestHandler(Q2AdapterRequestHandler): """Adapter type for Multi Factor Authentication (MFA) after logging in via username/password in UUX""" def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.incoming_request: AuthTokenRequest = None self._search_overrides_first = False async def handle_adapter(self, message: dict, *args, **kwargs) -> dict: configs = getattr(self, "DB_CONFIGS", self.WEDGE_ADDRESS_CONFIGS) if configs.db_configs and self.hq_credentials: if settings.MULTITENANT: config_suffix = self.hq_credentials.customer_key else: config_suffix = settings.VAULT_KEY db_config_cache_key = f"AuthTokenDbConfigs_{config_suffix}" vendor_name_cache_key = f"AuthTokenVendorName_{config_suffix}" self._db_config = self.cache.get(db_config_cache_key) if not self._db_config: vendor_name = self.cache.get(vendor_name_cache_key) if not vendor_name: interface = await self.db.adapter_interface.get_by_name( "Authentication Token" ) vendor_id = interface.AdditionalConfigVendorID vendor = await self.db.vendor.get( vendor_id=int(vendor_id), show_all=True ) vendor_name = vendor[0].VendorName self.cache.set(vendor_name_cache_key, vendor_name, expire=60 * 10) vendor_configs = await self.db.vendor_config.get(vendor_name) self._db_config = { x.ConfigName.text: x.ConfigValue.text for x in vendor_configs } self.cache.set(db_config_cache_key, self._db_config, expire=60) response_as_dict = deepcopy(message) self.incoming_request = AuthTokenRequest(message) passthru = base64.b64decode( response_as_dict["HostAccount_Req"][0]["PassThruData"] ) element = objectify.fromstring(passthru) passthru_params = self.parse_passthru(element) validated = await self.handle_token( passthru_params.secret, passthru_params.otp, passthru_params.otp2 ) response_as_dict["HostAccount_Req"][0]["HostErrorCode"] = 0 if validated else 4 return response_as_dict @staticmethod def parse_passthru(passthru_element) -> PassthruParams: secret = passthru_element.TokenID.pyval otp = passthru_element.OTP1.pyval if hasattr(passthru_element, "OTP2"): otp2 = passthru_element.OTP2.pyval else: otp2 = "" return PassthruParams(secret, otp, otp2)
[docs] async def handle_token(self, secret: str, otp: str, otp2: str) -> bool: """ Evaluate token here, and return true to validate this token request, or false if it is not valid. The parameters will be passed in from UUX. :param secret: The token secret configured in Q2 Central :param otp: The token submitted by the user :param otp2: The second submitted user token, if applicable :return: True if token is valid. False if not. """ raise NotImplementedError