import base64
import json
from lxml import etree
from q2_sdk.core.configuration import settings
from q2_sdk.core.http_handlers.hq_handler import Q2HqRequestHandler
from q2_sdk.hq.models.account_list import AccountList
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.hq.models.sso_account import SSOAccount
from q2_sdk.hq.models.sso_parameter_bitflags import ParameterBitFlags
from q2_sdk.hq.models.sso_response import Q2SSOResponse, ResponseType
from q2_sdk.hq.models.sso_session import SSOSession
from q2_sdk.hq.models.sso_user import SSOUser
from q2_sdk.hq.models.sso_vendor_config import VendorConfig
[docs]
class Q2SSORequestHandler(Q2HqRequestHandler):
"""
RequestHandler meant to be used for authenticating
to a third party vendor
"""
VENDOR_CONFIGS = {}
SEND_USER_INFO = True
SEND_ACCOUNT_INFO = False
SEND_DEP_ONLY_ACCOUNTS = False
SEND_VIEW_ONLY_ACCOUNTS = False
SEND_WDL_ONLY_ACCOUNTS = False
SEND_DEP_VIEW_ACCOUNTS = False
SEND_DEP_WDL_ACCOUNTS = False
SEND_VIEW_WDL_ACCOUNTS = False
SEND_DEP_VIEW_WDL_ACCOUNTS = False
RENDER_IN_NEW_WINDOW = True
PURGE_ACCOUNT_LIST_ON_SUCCESS = False
BALANCE_HADE_SYSTEM_PROPERTY = None
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self._account_model_proxy = SSOAccount
self.online_user: SSOUser = None
self.online_session: SSOSession = None
self.account_list = AccountList[SSOAccount]()
self.return_as_html = False
self.vendor_id: int = None
self._db_configs = {}
self._parameter_bit_flags = None
self._request_as_json = None
self.sso_query_params = None
@property
def default_summary_log(self):
summary_log = super().default_summary_log
summary_log["HQ_ID"] = self.online_session.hq_session_id
summary_log["login_name"] = self.online_user.login_name
return summary_log
async def default(self):
return Q2SSOResponse(ResponseType.EXCEPTION, "", success=False)
@property
def request_as_json(self):
if not self._request_as_json:
self._request_as_json = json.loads(self.request.body.decode())
return self._request_as_json
@property
def vendor_configs(self):
"""Alias to self.db_config"""
return self.db_config
@property
def db_config(self):
return self._db_config
@classmethod
def gather_parameter_bit_flags(cls):
return ParameterBitFlags(
cls.SEND_USER_INFO,
cls.SEND_ACCOUNT_INFO,
cls.SEND_DEP_ONLY_ACCOUNTS,
cls.SEND_VIEW_ONLY_ACCOUNTS,
cls.SEND_WDL_ONLY_ACCOUNTS,
cls.SEND_DEP_VIEW_ACCOUNTS,
cls.SEND_DEP_WDL_ACCOUNTS,
cls.SEND_VIEW_WDL_ACCOUNTS,
cls.SEND_DEP_VIEW_WDL_ACCOUNTS,
)
async def _handle_form_request(self, *args, **kwargs):
await self._build_models_from_json()
return await super()._handle_form_request(*args, **kwargs)
@property
def _hq_credentials_from_db_config(self) -> HqCredentials:
hq_creds = self.hq_credentials
hq_creds.customer_key = self._db_config.get(
"InsightCustomerKey", hq_creds.customer_key
)
if not hq_creds.customer_key and settings.LOCAL_DEV:
hq_creds.customer_key = settings.VAULT_KEY
override_dict = self._db_config.get("_overrides", {})
override_customer_key = override_dict.get("customer_key")
if override_customer_key:
hq_creds = self._get_hq_from_key(override_customer_key)
elif not hq_creds.aba and hq_creds.customer_key:
hq_creds = self._get_hq_from_key(hq_creds.customer_key)
hq_creds.auth_token = self._db_config.get("HqAuthToken", hq_creds.auth_token)
hq_creds.database_name = self._db_config.get("DbName", hq_creds.database_name)
hq_creds.db_schema_name = self._db_config.get(
"DbSchemaName", hq_creds.db_schema_name
)
hq_creds.env_stack = self._db_config.get("EnvStack", hq_creds.env_stack)
hq_creds.reported_hq_url = hq_creds.hq_url
if settings.USE_INCOMING_HQ_URL:
hq_creds.reported_hq_url = self._db_config.get(
"HqBaseUrl", hq_creds.reported_hq_url
)
return hq_creds
async def _build_models_from_json(self):
adapter_config = self.request_as_json["AdaptorConfig"]
new_vendor_config = VendorConfig(adapter_config)
self._db_config = new_vendor_config.vendor_configs
self.online_session = new_vendor_config.sso_session
self.vendor_id = new_vendor_config.vendor_id
self.hq_credentials = self._hq_credentials_from_db_config
base64_query_str = new_vendor_config.vendor_configs.get(
"Session.RequestQueryStringParameters"
)
if base64_query_str:
decoded_params = base64.b64decode(base64_query_str).decode(encoding="UTF-8")
self.sso_query_params = self._convert_xml_to_dict(decoded_params)
try:
user_info = self.request_as_json["UserInfo"]["Q2_UserInfoView"][0]
self.online_user = SSOUser(
request_json=user_info, customer_key=self.hq_credentials.customer_key
)
self.online_user.customer_primary_cif = (
self.online_session.primary_cif if self.online_session else None
)
except KeyError:
self.logger.debug("No online user could be found in request")
if self.request_as_json.get("AccountInformations") is not None:
account_list = self.request_as_json["AccountInformations"][
"Q2_AccountInformation"
]
for item in account_list:
account = self._account_model_proxy(item)
self.account_list.append(account)
async def _build_models_from_hq(self, element: etree.Element):
await super()._build_models_from_hq(element)
adapter_config = self.request_as_obj.find(".//{*}AdaptorConfig")
new_vendor_config = VendorConfig(adapter_config)
self._db_config = new_vendor_config.vendor_configs
self.online_session = new_vendor_config.sso_session
self.vendor_id = new_vendor_config.vendor_id
self.hq_credentials = self._hq_credentials_from_db_config
try:
self.online_user = SSOUser(
element, customer_key=self.hq_credentials.customer_key
)
except AttributeError:
self.logger.debug("No online user could be found in request")
if self.online_session and self.online_user:
self.online_user.customer_primary_cif = self.online_session.primary_cif
for account_node in element.findall(".//{*}Q2_AccountInformation"):
account = self._account_model_proxy(account_node)
self.account_list.append(account)
[docs]
def wrap_soap_response(self, custom_response: Q2SSOResponse):
nsmap = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"xsd": "http://www.w3.org/2001/XMLSchema",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
}
envelope = etree.Element("{%s}Envelope" % nsmap["soap"], nsmap=nsmap)
body = etree.SubElement(envelope, "{%s}Body" % nsmap["soap"])
adapter_response = etree.SubElement(
body,
"LoginToAdapterResponse",
xmlns="http://Q2Software.com/webservices/3rdPartyAdaptors",
)
adapter_response.append(custom_response.serialize_as_xml())
return etree.tostring(envelope, encoding="utf-8", xml_declaration=True).decode()
@staticmethod
def _convert_xml_to_dict(decoded_params):
root = etree.fromstring(decoded_params)
result = {}
for child in root:
result[child.findtext("key")] = child.findtext("value")
return result