import json
import logging
from typing import Union
import uuid
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ..utils import SearchMethod
from .mock_responses import (
mock_demographic_info_response_acc,
mock_demographic_info_response_cif,
mock_demographic_info_response_dynamic_profile,
mock_demographic_info_response_tax_id,
)
[docs]
class DemographicInfoQuery(BaseQuery):
"""
Builds the payload for the demographic call to the FIS IBS OpenAPI core
"""
def __init__(
self,
logger: logging.Logger,
req_val: Union[str, tuple, None],
search_method: SearchMethod = None,
signature: str = "",
):
"""
:param req_dict: Request data
:param search_method: Method used to get demographic data
:param signature: DigitalSignature for dynamic profile requests
"""
self.req_val = req_val
self.search_method = search_method
self.signature = signature
super().__init__(logger)
[docs]
def build(self) -> str:
"""
Creates an OpenAPI query to get the customer's demographic info
:return: OpenAPI request query as a JSON string
Example query to search by SSN (prior to JSON string conversion):
.. code-block:: json
{
"Endpoint": "rest/IBSCI/v2/customers/search/taxnbr?CIKeySSN=_CIKeySSN",
"HttpVerb": "GET",
"Entity": null,
"RequestGuid": "00000000-0000-0000-0000-000000000000",
"ApplicationID": "CI",
"UrlEncryptionParams": {
"_CIKeySSN": "123456789"
}
}
Example query to search by CIF (prior to JSON string conversion):
.. code-block:: json
{
"Endpoint": "rest/IBSCI/v2/customers/_CIApplNbr",
"HttpVerb": "GET",
"Entity": null,
"RequestGuid": "00000000-0000-0000-0000-000000000000",
"ApplicationID": "CI",
"UrlEncryptionParams": {
"_CIApplNbr": "123456789"
}
}
Example query to search by AccountNumber and AccountType (prior to JSON string conversion):
.. code-block:: json
{
"Endpoint": "rest/IBSCI/v2/accounts/DP/_AppNbr/dynamic-profile",
"HttpVerb": "GET",
"Entity": null,
"RequestGuid": "00000000-0000-0000-0000-000000000000",
"ApplicationID": "CI",
"UrlEncryptionParams": {
"_AppNbr": "000112233"
}
}
"""
query_data = self._get_search_method_request_details()
query = {
"Endpoint": query_data["endpoint"],
"HttpVerb": "GET",
"Entity": None,
"RequestGuid": str(uuid.uuid4()),
"ApplicationID": "CI",
"UrlEncryptionParams": query_data["params"],
}
if self.search_method.value == "DynamicProfile":
query["DigitalSignature"] = self.signature
json_string = json.dumps(query)
return json_string
def _get_search_method_request_details(self):
match self.search_method:
case SearchMethod.SSN:
search_info = {
"endpoint": "rest/IBSCI/v2/customers/search/taxnbr?CIKeySSN=_CIKeySSN",
"params": {"_CIKeySSN": self.req_val},
}
case SearchMethod.CIF:
search_info = {
"endpoint": "rest/IBSCI/v2/customers/_CIApplNbr",
"params": {"_CIApplNbr": self.req_val},
}
case SearchMethod.ACCOUNT:
acct_num, acct_type = self.req_val
search_info = {
"endpoint": f"rest/IBSCI/v2/accounts/{acct_type}/_AppNbr/dynamic-profile",
"params": {"_AppNbr": acct_num},
}
case SearchMethod.DYNAMICPROFILE:
search_info = {
"endpoint": "rest/IBSCI/v2/customers/_CustNbr/dynamic-profile",
"params": {"_CustNbr": self.req_val},
}
return search_info
[docs]
def mock_response(self):
match self.search_method:
case SearchMethod.CIF:
return mock_demographic_info_response_cif()
case SearchMethod.SSN:
return mock_demographic_info_response_tax_id()
case SearchMethod.ACCOUNT:
return mock_demographic_info_response_acc()
case SearchMethod.DYNAMICPROFILE:
return mock_demographic_info_response_dynamic_profile()