Source code for q2_cores.FiservCommunicatorSignature.mappers.demographic_info_mapper

from typing import List, Optional

from lxml import objectify
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
    Address,
    DemographicInfo,
    DriverLicense,
    Phone,
    PhoneType,
)

from q2_cores.exceptions import CoreException, DeceasedCustomerException

from ..queries.demographic_info_query import DemographicInfoQuery

PHONE_TYPE_MAPPING = {
    "CellPhone": PhoneType.CELL,
    "EvePhone": PhoneType.PERSONAL,
    "DayPhone": PhoneType.BUSINESS,
}


[docs] class DemographicInfoMapper(BaseMapper): def __init__( self, list_of_queries: List[BaseQuery], hq_credentials: Optional[HqCredentials] = None, ): super().__init__(list_of_queries, hq_credentials)
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: """ Handles the demographic information response from the core. if ``SECRET_OVERRIDE_DEMO_ATTR`` configuration is set, demographic object return will be overriden with the value returned from the core "Secret" field """ assert len(list_of_queries) == 1, ( "Assertion Error: The length of the query list should be one" ) assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of q2_cores.FiservCommunicatorSignature.queries.demographic_info_query" ) root = objectify.fromstring(list_of_queries[0].raw_core_response) if hasattr(root.CIFSvcRs.CustInqRs.Status, "Error"): raise CoreException( f"Core returned Error: {root.CIFSvcRs.CustInqRs.Status.Error.ErrDesc.text}" ) cust_info = root.CIFSvcRs.CustInqRs.Customer.CustProf if hasattr(cust_info, "DeathDt"): dod = "-".join(x.text for x in cust_info.DeathDt.getchildren()) raise DeceasedCustomerException(f"Customer Deceased on : {dod}") person = cust_info.PersonName ssn = cust_info.TaxId.text first_name = person.FirstName.text if person.findtext("FirstName") else "" middle_name = person.MiddleName.text if person.findtext("FirstName") else "" # Taking business name as last_name for BusinessEnrollment , as no user info will be returned is the SSN belongs to Business # noqa: E501 last_name = ( person.LastName.text if person.findtext("FirstName") else person.BusinessName.text ) title = person.TitlePrefix.text if person.findtext("FirstName") else "" primary_cif = root.CIFSvcRs.CustInqRs.Customer.CustId.CustPermId.text dob = DemographicInfoMapper.get_users_dob(cust_info) drivers_license = DemographicInfoMapper.get_users_drivers_license(cust_info) address = DemographicInfoMapper.get_users_address(cust_info) # some FIs store Mothers maiden name or any other security word in the Secret SF-03518127 additional_details = {} secret_word = cust_info.findtext("Secret", "") additional_details["security_word"] = secret_word email = cust_info.findtext("EmailAddr") phones = DemographicInfoMapper.get_users_phones(cust_info) fi_info = root.CIFSvcRs.CustInqRs.Customer.FIInfo additional_details["branch_id"] = fi_info.findtext("BranchId") # add OpenDt and CustStatusCode to additional details of demographic info if hasattr(root.CIFSvcRs.CustInqRs.Customer, "OpenDt"): open_dt_info = root.CIFSvcRs.CustInqRs.Customer.OpenDt open_dt = f"{open_dt_info.Year.text}-{open_dt_info.Month.text}-{open_dt_info.Day.text}" additional_details["open_dt"] = open_dt if hasattr(root.CIFSvcRs.CustInqRs.Customer, "CustStatusCode"): cust_status_code = root.CIFSvcRs.CustInqRs.Customer.CustStatusCode.text additional_details["cust_status_code"] = cust_status_code demo_object = DemographicInfo( dob, [email], phones, [address], first_name, last_name, ssn, mothers_maiden_name=secret_word, middle_name=middle_name, title=title, driver_license=drivers_license, primary_cif=primary_cif, additional_details=additional_details, ) configs = list_of_queries[0].context.get("configs") attr_override = getattr(configs, "SECRET_OVERRIDE_DEMO_ATTR", "") if attr_override: secret = cust_info.Secret.text if hasattr(cust_info, "Secret") else "" if secret and attr_override: setattr(demo_object, attr_override, secret) return demo_object
[docs] def get_users_dob(cust_info): dob = None if cust_info.find("BirthDt") is not None: dob = "-".join(x.text for x in cust_info.BirthDt.getchildren()) return dob
[docs] def get_users_drivers_license(cust_info): drivers_lic = None if hasattr(cust_info, "CustIdType"): cust_id_info = cust_info.findall("CustIdType") for cust_id in cust_id_info: if cust_id.findtext("IdType").strip().lower() == "dl": drivers_lic = DriverLicense( cust_id.findtext("IdNum"), cust_id.findtext("IdIssuer", "") ) break return drivers_lic
[docs] def get_users_address(cust_info): addr = cust_info.CustAddr return Address( address_1=f"{addr.findtext('HouseNum')} {addr.findtext('Street')}", address_2=addr.findtext("ApartmentNum"), city=addr.findtext("City"), state=addr.findtext("StateProv"), zipcode=addr.findtext("ZipCode"), )
[docs] def get_users_phones(cust_info): phones = [] if hasattr(cust_info, "PhoneNum"): for phone in cust_info.PhoneNum: try: phone_type = phone.PhoneType.text number = f"{phone.findtext('Phone')}" if number and number != "0000000000" and len(number) >= 10: found_phone = Phone.build_from_str( number, PHONE_TYPE_MAPPING.get(phone_type, PhoneType.PERSONAL), ) phones.append(found_phone) except AttributeError: continue return phones