Source code for q2_cores.FiservCleartouch.mappers.demographic_info_mapper

from typing import List, Optional

from lxml import objectify

from q2_cores.exceptions import CoreValidationException
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import Address, DemographicInfo, PhoneType, Phone

PHONE_TYPE_MAPPING = {
    "HomePhone": PhoneType.PERSONAL,
    "BusinessPhone": PhoneType.BUSINESS,
    "Mobile": PhoneType.CELL,
    "Other": PhoneType.OTHER,
    "Fax": PhoneType.OTHER,
    "Modem": PhoneType.OTHER,
    "Pager": PhoneType.OTHER,
}


[docs] class DemographicInfoMapper(BaseMapper): def __init__( self, list_of_queries: List[BaseQuery], hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, populate_mobile_phone_from_main_phone: Optional[bool] = False, ): super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context ) self.populate_mobile_phone_from_main_phone = ( populate_mobile_phone_from_main_phone )
[docs] def parse_returned_queries( self, list_of_queries: List[BaseQuery] ) -> DemographicInfo: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], BaseQuery), ( "Query must be an instance of FiservClearTouch.queries.InitialSearchQuery" ) root = objectify.fromstring(list_of_queries[0].raw_core_response) if root.Response.Status.StatusCode.text != "0": raise CoreValidationException( f"Core returned unsuccessful status: {root.Response.Status.StatusDesc.text}" ) customer_info = root.Response.CustomerGeneralInformation ssn = customer_info.TINId.TaxId.text secret_word = customer_info.findtext("Keyword") primary_cif = ssn dob = customer_info.findtext("{*}BirthDate", "") if customer_info.Name.CustomerType.text == "Personal": first_name = customer_info.Name.PersonalName.findtext("{*}FirstName", "") middle_name = customer_info.Name.PersonalName.findtext("{*}MiddleName", "") last_name = customer_info.Name.PersonalName.findtext("{*}LastName", "") title = customer_info.Name.PersonalName.findtext("{*}TitlePrefix", "") else: # Type is Non-personal (Commercial) first_name = "" middle_name = "" title = "" last_name = customer_info.Name.NonPersonalName.findtext( "{*}Name1", "" ) # Name1 contains the full name of a commercial/non-personal record keyword = customer_info.findtext( "{*}Keyword", "" ) # Some FIs use this field for mother's maiden name drivers_lic = None address = Address( customer_info.Address.findtext("{*}Address1"), ( customer_info.Address.findtext("{*}Address2") if customer_info.Address.findtext("{*}Address2") else "" ), customer_info.Address.findtext("{*}City", "N/A"), customer_info.Address.findtext("{*}State", ""), customer_info.Address.findtext("{*}PostalCode")[:5], country="USA", ) contact_info = root.Response.CustomerContact emails = [] phones = [] if contact_info.Email1.EmailAddress.text: emails.append(contact_info.Email1.EmailAddress.text) if contact_info.Email2.EmailAddress.text: emails.append(contact_info.Email2.EmailAddress.text) main_phone = Phone( contact_info.MainPhone.AreaCode.text, contact_info.MainPhone.PhoneNumber.text, PHONE_TYPE_MAPPING[contact_info.MainPhone.PhoneType.text], extension=( contact_info.MainPhone.PhoneExtension.text if contact_info.MainPhone.PhoneExtension.text != "0" else None ), ) phones.append(main_phone) for record_count in range(5): try: if hasattr(contact_info, f"AdditionalPhone{record_count + 1}"): phone_record = contact_info.find( f"AdditionalPhone{record_count + 1}" ) if phone_record.PhoneType.text != "AdditionalPhoneNotUsed": phones.append( Phone( phone_record.AreaCode.text, phone_record.PhoneNumber.text, PHONE_TYPE_MAPPING[phone_record.PhoneType.text], extension=( phone_record.PhoneExtension.text if phone_record.PhoneExtension.text != "0" else None ), ) ) else: break except AttributeError: continue if self.populate_mobile_phone_from_main_phone: mobile_phone = Phone( main_phone.area_code, main_phone.phone_number, PhoneType.CELL, extension=main_phone.extension, ) if mobile_phone not in phones: phones.append(mobile_phone) demo_object = DemographicInfo( dob, emails, phones, [address], first_name, last_name, ssn, middle_name=middle_name, title=title, driver_license=drivers_lic, primary_cif=primary_cif, mothers_maiden_name=keyword, additional_details={"security_word": secret_word}, ) return demo_object