Source code for q2_sdk.core.http_handlers.sso_handler

import base64
import json

from lxml import etree
from q2_sdk.core.configuration import settings
from q2_sdk.core.http_handlers.hq_handler import Q2HqRequestHandler
from q2_sdk.hq.models.account_list import AccountList
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.hq.models.sso_account import SSOAccount
from q2_sdk.hq.models.sso_parameter_bitflags import ParameterBitFlags
from q2_sdk.hq.models.sso_response import Q2SSOResponse, ResponseType
from q2_sdk.hq.models.sso_session import SSOSession
from q2_sdk.hq.models.sso_user import SSOUser
from q2_sdk.hq.models.sso_vendor_config import VendorConfig


[docs] class Q2SSORequestHandler(Q2HqRequestHandler): """ RequestHandler meant to be used for authenticating to a third party vendor """ VENDOR_CONFIGS = {} SEND_USER_INFO = True SEND_ACCOUNT_INFO = False SEND_DEP_ONLY_ACCOUNTS = False SEND_VIEW_ONLY_ACCOUNTS = False SEND_WDL_ONLY_ACCOUNTS = False SEND_DEP_VIEW_ACCOUNTS = False SEND_DEP_WDL_ACCOUNTS = False SEND_VIEW_WDL_ACCOUNTS = False SEND_DEP_VIEW_WDL_ACCOUNTS = False RENDER_IN_NEW_WINDOW = True PURGE_ACCOUNT_LIST_ON_SUCCESS = False BALANCE_HADE_SYSTEM_PROPERTY = None def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self._account_model_proxy = SSOAccount self.online_user: SSOUser = None self.online_session: SSOSession = None self.account_list = AccountList[SSOAccount]() self.return_as_html = False self.vendor_id: int = None self._db_configs = {} self._parameter_bit_flags = None self._request_as_json = None self.sso_query_params = None @property def default_summary_log(self): summary_log = super().default_summary_log summary_log["HQ_ID"] = self.online_session.hq_session_id summary_log["login_name"] = self.online_user.login_name return summary_log async def default(self): return Q2SSOResponse(ResponseType.EXCEPTION, "", success=False) @property def request_as_json(self): if not self._request_as_json: self._request_as_json = json.loads(self.request.body.decode()) return self._request_as_json @property def vendor_configs(self): """Alias to self.db_config""" return self.db_config @property def db_config(self): return self._db_config @classmethod def gather_parameter_bit_flags(cls): return ParameterBitFlags( cls.SEND_USER_INFO, cls.SEND_ACCOUNT_INFO, cls.SEND_DEP_ONLY_ACCOUNTS, cls.SEND_VIEW_ONLY_ACCOUNTS, cls.SEND_WDL_ONLY_ACCOUNTS, cls.SEND_DEP_VIEW_ACCOUNTS, cls.SEND_DEP_WDL_ACCOUNTS, cls.SEND_VIEW_WDL_ACCOUNTS, cls.SEND_DEP_VIEW_WDL_ACCOUNTS, ) async def _handle_form_request(self, *args, **kwargs): await self._build_models_from_json() return await super()._handle_form_request(*args, **kwargs) @property def _hq_credentials_from_db_config(self) -> HqCredentials: hq_creds = self.hq_credentials hq_creds.customer_key = self._db_config.get( "InsightCustomerKey", hq_creds.customer_key ) if not hq_creds.customer_key and settings.LOCAL_DEV: hq_creds.customer_key = settings.VAULT_KEY override_dict = self._db_config.get("_overrides", {}) override_customer_key = override_dict.get("customer_key") if override_customer_key: hq_creds = self._get_hq_from_key(override_customer_key) elif not hq_creds.aba and hq_creds.customer_key: hq_creds = self._get_hq_from_key(hq_creds.customer_key) hq_creds.auth_token = self._db_config.get("HqAuthToken", hq_creds.auth_token) hq_creds.database_name = self._db_config.get("DbName", hq_creds.database_name) hq_creds.db_schema_name = self._db_config.get( "DbSchemaName", hq_creds.db_schema_name ) hq_creds.env_stack = self._db_config.get("EnvStack", hq_creds.env_stack) hq_creds.reported_hq_url = hq_creds.hq_url if settings.USE_INCOMING_HQ_URL: hq_creds.reported_hq_url = self._db_config.get( "HqBaseUrl", hq_creds.reported_hq_url ) return hq_creds async def _build_models_from_json(self): adapter_config = self.request_as_json["AdaptorConfig"] new_vendor_config = VendorConfig(adapter_config) self._db_config = new_vendor_config.vendor_configs self.online_session = new_vendor_config.sso_session self.vendor_id = new_vendor_config.vendor_id self.hq_credentials = self._hq_credentials_from_db_config base64_query_str = new_vendor_config.vendor_configs.get( "Session.RequestQueryStringParameters" ) if base64_query_str: decoded_params = base64.b64decode(base64_query_str).decode(encoding="UTF-8") self.sso_query_params = self._convert_xml_to_dict(decoded_params) try: user_info = self.request_as_json["UserInfo"]["Q2_UserInfoView"][0] self.online_user = SSOUser( request_json=user_info, customer_key=self.hq_credentials.customer_key ) self.online_user.customer_primary_cif = ( self.online_session.primary_cif if self.online_session else None ) except KeyError: self.logger.debug("No online user could be found in request") if self.request_as_json.get("AccountInformations") is not None: account_list = self.request_as_json["AccountInformations"][ "Q2_AccountInformation" ] for item in account_list: account = self._account_model_proxy(item) self.account_list.append(account) async def _build_models_from_hq(self, element: etree.Element): await super()._build_models_from_hq(element) adapter_config = self.request_as_obj.find(".//{*}AdaptorConfig") new_vendor_config = VendorConfig(adapter_config) self._db_config = new_vendor_config.vendor_configs self.online_session = new_vendor_config.sso_session self.vendor_id = new_vendor_config.vendor_id self.hq_credentials = self._hq_credentials_from_db_config try: self.online_user = SSOUser( element, customer_key=self.hq_credentials.customer_key ) except AttributeError: self.logger.debug("No online user could be found in request") if self.online_session and self.online_user: self.online_user.customer_primary_cif = self.online_session.primary_cif for account_node in element.findall(".//{*}Q2_AccountInformation"): account = self._account_model_proxy(account_node) self.account_list.append(account)
[docs] def wrap_soap_response(self, custom_response: Q2SSOResponse): nsmap = { "soap": "http://schemas.xmlsoap.org/soap/envelope/", "xsd": "http://www.w3.org/2001/XMLSchema", "xsi": "http://www.w3.org/2001/XMLSchema-instance", } envelope = etree.Element("{%s}Envelope" % nsmap["soap"], nsmap=nsmap) body = etree.SubElement(envelope, "{%s}Body" % nsmap["soap"]) adapter_response = etree.SubElement( body, "LoginToAdapterResponse", xmlns="http://Q2Software.com/webservices/3rdPartyAdaptors", ) adapter_response.append(custom_response.serialize_as_xml()) return etree.tostring(envelope, encoding="utf-8", xml_declaration=True).decode()
@staticmethod def _convert_xml_to_dict(decoded_params): root = etree.fromstring(decoded_params) result = {} for child in root: result[child.findtext("key")] = child.findtext("value") return result