from copy import deepcopy
import base64
from dataclasses import dataclass
from lxml import objectify
from q2_sdk.core.http_handlers.adapter_handler import Q2AdapterRequestHandler
from q2_sdk.hq.models.auth_token_incoming_request import AuthTokenRequest
from q2_sdk.core.configuration import settings
@dataclass
class PassthruParams:
secret: str
otp: str
otp2: str = ""
[docs]
class Q2AuthTokenRequestHandler(Q2AdapterRequestHandler):
"""Adapter type for Multi Factor Authentication (MFA) after logging in via username/password in UUX"""
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.incoming_request: AuthTokenRequest = None
self._search_overrides_first = False
async def handle_adapter(self, message: dict, *args, **kwargs) -> dict:
configs = getattr(self, "DB_CONFIGS", self.WEDGE_ADDRESS_CONFIGS)
if configs.db_configs and self.hq_credentials:
if settings.MULTITENANT:
config_suffix = self.hq_credentials.customer_key
else:
config_suffix = settings.VAULT_KEY
db_config_cache_key = f"AuthTokenDbConfigs_{config_suffix}"
vendor_name_cache_key = f"AuthTokenVendorName_{config_suffix}"
self._db_config = self.cache.get(db_config_cache_key)
if not self._db_config:
vendor_name = self.cache.get(vendor_name_cache_key)
if not vendor_name:
interface = await self.db.adapter_interface.get_by_name(
"Authentication Token"
)
vendor_id = interface.AdditionalConfigVendorID
vendor = await self.db.vendor.get(
vendor_id=int(vendor_id), show_all=True
)
vendor_name = vendor[0].VendorName
self.cache.set(vendor_name_cache_key, vendor_name, expire=60 * 10)
vendor_configs = await self.db.vendor_config.get(vendor_name)
self._db_config = {
x.ConfigName.text: x.ConfigValue.text for x in vendor_configs
}
self.cache.set(db_config_cache_key, self._db_config, expire=60)
response_as_dict = deepcopy(message)
self.incoming_request = AuthTokenRequest(message)
passthru = base64.b64decode(
response_as_dict["HostAccount_Req"][0]["PassThruData"]
)
element = objectify.fromstring(passthru)
passthru_params = self.parse_passthru(element)
validated = await self.handle_token(
passthru_params.secret, passthru_params.otp, passthru_params.otp2
)
response_as_dict["HostAccount_Req"][0]["HostErrorCode"] = 0 if validated else 4
return response_as_dict
@staticmethod
def parse_passthru(passthru_element) -> PassthruParams:
secret = passthru_element.TokenID.pyval
otp = passthru_element.OTP1.pyval
if hasattr(passthru_element, "OTP2"):
otp2 = passthru_element.OTP2.pyval
else:
otp2 = ""
return PassthruParams(secret, otp, otp2)
[docs]
async def handle_token(self, secret: str, otp: str, otp2: str) -> bool:
"""
Evaluate token here, and return true to validate this token request, or false if it is not valid.
The parameters will be passed in from UUX.
:param secret: The token secret configured in Q2 Central
:param otp: The token submitted by the user
:param otp2: The second submitted user token, if applicable
:return: True if token is valid. False if not.
"""
raise NotImplementedError