Source code for q2_cores.FISIBSOpenAPI.queries.demographic_info_query

import json
import logging
from typing import Union
import uuid

from q2_sdk.models.cores.queries.base_query import BaseQuery

from ..utils import SearchMethod
from .mock_responses import (
    mock_demographic_info_response_acc,
    mock_demographic_info_response_cif,
    mock_demographic_info_response_dynamic_profile,
    mock_demographic_info_response_tax_id,
)


[docs] class DemographicInfoQuery(BaseQuery): """ Builds the payload for the demographic call to the FIS IBS OpenAPI core """ def __init__( self, logger: logging.Logger, req_val: Union[str, tuple, None], search_method: SearchMethod = None, signature: str = "", ): """ :param req_dict: Request data :param search_method: Method used to get demographic data :param signature: DigitalSignature for dynamic profile requests """ self.req_val = req_val self.search_method = search_method self.signature = signature super().__init__(logger)
[docs] def build(self) -> str: """ Creates an OpenAPI query to get the customer's demographic info :return: OpenAPI request query as a JSON string Example query to search by SSN (prior to JSON string conversion): .. code-block:: json { "Endpoint": "rest/IBSCI/v2/customers/search/taxnbr?CIKeySSN=_CIKeySSN", "HttpVerb": "GET", "Entity": null, "RequestGuid": "00000000-0000-0000-0000-000000000000", "ApplicationID": "CI", "UrlEncryptionParams": { "_CIKeySSN": "123456789" } } Example query to search by CIF (prior to JSON string conversion): .. code-block:: json { "Endpoint": "rest/IBSCI/v2/customers/_CIApplNbr", "HttpVerb": "GET", "Entity": null, "RequestGuid": "00000000-0000-0000-0000-000000000000", "ApplicationID": "CI", "UrlEncryptionParams": { "_CIApplNbr": "123456789" } } Example query to search by AccountNumber and AccountType (prior to JSON string conversion): .. code-block:: json { "Endpoint": "rest/IBSCI/v2/accounts/DP/_AppNbr/dynamic-profile", "HttpVerb": "GET", "Entity": null, "RequestGuid": "00000000-0000-0000-0000-000000000000", "ApplicationID": "CI", "UrlEncryptionParams": { "_AppNbr": "000112233" } } """ query_data = self._get_search_method_request_details() query = { "Endpoint": query_data["endpoint"], "HttpVerb": "GET", "Entity": None, "RequestGuid": str(uuid.uuid4()), "ApplicationID": "CI", "UrlEncryptionParams": query_data["params"], } if self.search_method.value == "DynamicProfile": query["DigitalSignature"] = self.signature json_string = json.dumps(query) return json_string
def _get_search_method_request_details(self): match self.search_method: case SearchMethod.SSN: search_info = { "endpoint": "rest/IBSCI/v2/customers/search/taxnbr?CIKeySSN=_CIKeySSN", "params": {"_CIKeySSN": self.req_val}, } case SearchMethod.CIF: search_info = { "endpoint": "rest/IBSCI/v2/customers/_CIApplNbr", "params": {"_CIApplNbr": self.req_val}, } case SearchMethod.ACCOUNT: acct_num, acct_type = self.req_val search_info = { "endpoint": f"rest/IBSCI/v2/accounts/{acct_type}/_AppNbr/dynamic-profile", "params": {"_AppNbr": acct_num}, } case SearchMethod.DYNAMICPROFILE: search_info = { "endpoint": "rest/IBSCI/v2/customers/_CustNbr/dynamic-profile", "params": {"_CustNbr": self.req_val}, } return search_info
[docs] def mock_response(self): match self.search_method: case SearchMethod.CIF: return mock_demographic_info_response_cif() case SearchMethod.SSN: return mock_demographic_info_response_tax_id() case SearchMethod.ACCOUNT: return mock_demographic_info_response_acc() case SearchMethod.DYNAMICPROFILE: return mock_demographic_info_response_dynamic_profile()