Source code for q2_cores.Symitar.mappers.demographic_info

from typing import List

from lxml import objectify
from q2_cores.Symitar.models import parser
from q2_cores.Symitar.queries import DemographicInfoQuery
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
    Address,
    AddressType,
    DemographicInfo,
    Phone,
    PhoneType,
)

TYPE_MAP = {
    "00": AddressType.HOME,  # Primary Holder Physical Address
    "01": AddressType.HOME,  # Joint Holder Physical Address
    "02": AddressType.POSTAL,  # Primary Holder Mailing Address
    "03": AddressType.HOME,  # Alternate Mailing Address
    "04": AddressType.HOME,  # Beneficiary Physical Address
    "05": AddressType.HOME,  # Custodian Mailing Address
    "06": AddressType.HOME,  # Trustee Physical Address
    "08": AddressType.HOME,  # POA Physical Address
    "09": AddressType.HOME,  # Auth Signer Physical Address
    "13": AddressType.HOME,  # Next of Kin Physical Address
    "14": AddressType.HOME,  # Representative Payee Physical Address
    "22": AddressType.HOME,  # CTR Owner Physical Address
    "23": AddressType.HOME,  # CTR Transactor Physical Address
    "24": AddressType.HOME,  # DBA Physical Address
    "27": AddressType.HOME,  # SDB Deputy Physical Address
    "28": AddressType.HOME,  # CTR Courier Physical Address
    "29": AddressType.HOME,  # Property Address
    "30": AddressType.HOME,  # Successor in Interest
}


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of Symitar.queries.DemographicInfoQuery" ) query = list_of_queries[0] raw_response = query.raw_core_response symitar_response = parser.parse(raw_response) root = objectify.fromstring(symitar_response.payload) profile_1 = root.profiles.profile dob = profile_1.BIRTHDATE.text first_name = profile_1.firstname.text middle_name = profile_1.middlename.text last_name = profile_1.lastname.text mothers_maiden_name = profile_1.MMN.text ssn = profile_1.ssn.text cif = ( root.profiles.membernumber.text if query.set_member_number_as_demo_cif else None ) if cif and query.strip_zeroes_from_cif: cif = cif.lstrip("0") addresses = [] phones = [] emails = [] for profile in root.profiles.profile: address_1 = profile.STREET.text address_2 = profile.EXTRAADDRESS.text or "" city = profile.CITY.text state = profile.STATE.text zipcode = profile.ZIPCODE.text emails.append(profile.email.text) address_type = profile.profiletype.text addresses.append( Address( address_1, address_2, city, state, zipcode, address_type=TYPE_MAP.get(address_type, AddressType.HOME), ) ) if profile.homephone.text: phones.append( Phone.build_from_str(profile.homephone.text, PhoneType.PERSONAL) ) if profile.workphone.text: phones.append( Phone.build_from_str(profile.workphone.text, PhoneType.BUSINESS) ) if profile.mobilephone.text: phones.append( Phone.build_from_str(profile.mobilephone.text, PhoneType.CELL) ) return DemographicInfo( dob, emails, phones, addresses, first_name, last_name, ssn, mothers_maiden_name=mothers_maiden_name, middle_name=middle_name, primary_cif=cif, )