Source code for q2_cores.FiservCommunicatorOpenPremier.mappers.demographic_info_mapper

import json
from typing import List, Optional
import phonenumbers

from q2_cores.exceptions import CoreException
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from q2_sdk.models.demographic import (
    DemographicInfo,
    Address,
    AddressType,
    Phone,
    PhoneType,
)

PHONE_TYPE_MAPPING = {
    "Home": PhoneType.PERSONAL,
    "Work": PhoneType.BUSINESS,
    "Mobile": PhoneType.CELL,
    "Other": PhoneType.OTHER,
    "Fax": PhoneType.OTHER,
    "Modem": PhoneType.OTHER,
    "Pager": PhoneType.OTHER,
}


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper): def __init__( self, list_of_queries: List[BaseQuery] = None, hq_credentials: Optional[HqCredentials] = None, get_name_ids=False, is_business=None, ): self.get_name_ids = get_name_ids self.is_business = is_business super().__init__(list_of_queries, hq_credentials=hq_credentials)
[docs] def parse_returned_queries( self, list_of_queries: List[BaseQuery] ) -> DemographicInfo: """ Parses a list of queries to return demographic information from the core. """ assert len(list_of_queries) == 1 raw_core_resp = list_of_queries[0].raw_core_response response_obj = ( json.loads(raw_core_resp) if not isinstance(raw_core_resp, dict) else raw_core_resp ) self.list_of_queries[0].logger.info(self.list_of_queries[0].raw_core_response) status = response_obj.get("Status", {}) error_code = status.get("StatusCode") if error_code != "0": error_message = status.get("StatusDesc") error_details = status.get("ServerStatusDesc") raise CoreException( f"Core Demographic Call failed: {error_code} - {error_message} - {error_details}" ) party_list = response_obj.get("PartyListRec") dob = "" first_name = "" middle_name = "" last_name = "" ssn = "" party_id = "" address_list = [] phone_list = [] email_list = [] full_name = "" user_info = {"name_ids": []} for record in party_list: party_id = record.get("PartyKeys", {}).get("PartyId") if self.is_business and record.get("OrgPartyListInfo"): cust_info = record.get("OrgPartyListInfo", {}) name_details = record.get("OrgPartyListInfo", {}).get("OrgName", {}) full_name = name_details.get("Name", {}) last_name = name_details.get("Name", {}) elif not self.is_business and record.get("PersonPartyListInfo"): cust_info = record.get("PersonPartyListInfo", {}) name_details = record.get("PersonPartyListInfo", {}).get( "PersonName", {} ) first_name = name_details.get("GivenName", "") last_name = name_details.get("FamilyName", "") middle_name = name_details.get("MiddleName", "") else: self.list_of_queries[0].logger.info( "Skip record - because we may get Business info (OrgPartyListInfo) and BUSINESS config " "is not set to True or we didn't get PersonPartyListInfo in the " "core response" ) continue contact_info = cust_info.get("Contact") dob = cust_info.get("BirthDt", "") ssn = cust_info.get("TaxIdent", "") for data in contact_info: if data.get("PostAddr"): core_address = data["PostAddr"] addresses = Address( core_address["Addr1"], core_address.get("Addr2", ""), core_address.get("City", ""), core_address.get("StateProv", ""), core_address.get("PostalCode", ""), address_type=AddressType.HOME, # we are getting ForeignFlag but don't see any country code in the response ) address_list.append(addresses) if data.get("PhoneNum"): phone_numbers = data["PhoneNum"] number = f"{phone_numbers.get('Phone')}" phone_type = phone_numbers.get("PhoneType") try: self.list_of_queries[0].logger.info( f"parsing phone number: {number}" ) parse_number = phonenumbers.parse(number) except: # noqa: E722 continue self.list_of_queries[0].logger.info( f"Parsed phone number: {parse_number}" ) found_phone = Phone.build_from_str( str(parse_number.national_number), PHONE_TYPE_MAPPING.get( str(phone_type).capitalize(), PhoneType.PERSONAL ), ) phone_list.append(found_phone) if data.get("Email"): emails = data["Email"] email_list.append(emails.get("EmailAddr")) if self.get_name_ids: user_info["name_ids"].append(party_id) demo_obj = DemographicInfo( date_of_birth=dob, list_of_emails=email_list, list_of_phones=phone_list, list_of_addresses=address_list, first_name=first_name, middle_name=middle_name, last_name=last_name, ssn=ssn, primary_cif=party_id, additional_details={"full_name": full_name}, user_info=user_info, ) return demo_obj