from typing import List
from lxml import objectify
from q2_cores.Symitar.models import parser
from q2_cores.Symitar.queries import DemographicInfoQuery
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
Address,
AddressType,
DemographicInfo,
Phone,
PhoneType,
)
TYPE_MAP = {
"00": AddressType.HOME, # Primary Holder Physical Address
"01": AddressType.HOME, # Joint Holder Physical Address
"02": AddressType.POSTAL, # Primary Holder Mailing Address
"03": AddressType.HOME, # Alternate Mailing Address
"04": AddressType.HOME, # Beneficiary Physical Address
"05": AddressType.HOME, # Custodian Mailing Address
"06": AddressType.HOME, # Trustee Physical Address
"08": AddressType.HOME, # POA Physical Address
"09": AddressType.HOME, # Auth Signer Physical Address
"13": AddressType.HOME, # Next of Kin Physical Address
"14": AddressType.HOME, # Representative Payee Physical Address
"22": AddressType.HOME, # CTR Owner Physical Address
"23": AddressType.HOME, # CTR Transactor Physical Address
"24": AddressType.HOME, # DBA Physical Address
"27": AddressType.HOME, # SDB Deputy Physical Address
"28": AddressType.HOME, # CTR Courier Physical Address
"29": AddressType.HOME, # Property Address
"30": AddressType.HOME, # Successor in Interest
}
[docs]
class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of Symitar.queries.DemographicInfoQuery"
)
query = list_of_queries[0]
raw_response = query.raw_core_response
symitar_response = parser.parse(raw_response)
root = objectify.fromstring(symitar_response.payload)
profile_1 = root.profiles.profile
dob = profile_1.BIRTHDATE.text
first_name = profile_1.firstname.text
middle_name = profile_1.middlename.text
last_name = profile_1.lastname.text
mothers_maiden_name = profile_1.MMN.text
ssn = profile_1.ssn.text
cif = (
root.profiles.membernumber.text
if query.set_member_number_as_demo_cif
else None
)
if cif and query.strip_zeroes_from_cif:
cif = cif.lstrip("0")
addresses = []
phones = []
emails = []
for profile in root.profiles.profile:
address_1 = profile.STREET.text
address_2 = profile.EXTRAADDRESS.text or ""
city = profile.CITY.text
state = profile.STATE.text
zipcode = profile.ZIPCODE.text
emails.append(profile.email.text)
address_type = profile.profiletype.text
addresses.append(
Address(
address_1,
address_2,
city,
state,
zipcode,
address_type=TYPE_MAP.get(address_type, AddressType.HOME),
)
)
if profile.homephone.text:
phones.append(
Phone.build_from_str(profile.homephone.text, PhoneType.PERSONAL)
)
if profile.workphone.text:
phones.append(
Phone.build_from_str(profile.workphone.text, PhoneType.BUSINESS)
)
if profile.mobilephone.text:
phones.append(
Phone.build_from_str(profile.mobilephone.text, PhoneType.CELL)
)
return DemographicInfo(
dob,
emails,
phones,
addresses,
first_name,
last_name,
ssn,
mothers_maiden_name=mothers_maiden_name,
middle_name=middle_name,
primary_cif=cif,
)