from typing import List
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
Address,
DemographicInfo,
DriverLicense,
Phone,
PhoneType,
)
from q2_cores.XP2.queries.demographic_info_query import DemographicInfoQuery
from q2_cores.exceptions import CoreException
from .xp2_mapper import XP2Mapper
PHONE_TYPE_MAPPER = {
"Cellular": PhoneType.CELL,
"Home": PhoneType.PERSONAL,
"Business": PhoneType.BUSINESS,
}
[docs]
class DemographicInfoMapper(XP2Mapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of XP2.queries.InitialSearchQuery"
)
response = list_of_queries[0].raw_core_response
root = super(DemographicInfoMapper, DemographicInfoMapper).unpack_return_mesage(
response
)
if hasattr(root, "ErrorMessage"):
raise CoreException(
f"Core returned unsuccessful status: {root.ErrorMessage.Message.text}"
)
member = root.IndividualDetail
first_name = member.findtext("FirstName")
middle_name = member.findtext("MiddleName")
last_name = member.findtext("LastName")
title = member.findtext("Title")
ssn = member.findtext("TaxId")
dob = member.findtext("BirthDate")
drivers_lic = DriverLicense(
member.findtext("DriversLicense"), member.findtext("DriversLicenseState")
)
mothers_maiden_name = member.findtext("MothersMaidenName")
primary_address = member.PrimaryAddressId.text
for address in member.Address:
this_id = address.findtext("AddressId")
if primary_address == this_id:
address_node = address
break
else:
raise CoreException("Could Not identify primary address")
address = Address(
address_node.findtext("Line1"),
address_node.findtext("Line2") if address_node.findtext("Line2") else "",
address_node.findtext("City"),
address_node.findtext("State"),
address_node.findtext("ZipCode").split("-")[0],
country=address_node.findtext("Country"),
)
emails = []
if member.findtext("Email1"):
emails.append(member.findtext("Email1"))
if member.findtext("Email2"):
emails.append(member.findtext("Email2"))
phones = []
for phone in member.findall("Phone"):
if str(phone.findtext("IsPrevious")).lower() == "true":
continue
phone_type = phone.PhoneKey.PhoneType.text
number = f"{phone.findtext('AreaCode')}{phone.findtext('PhoneNumber')}"
if number and number != "0000000000":
found_phone = Phone.build_from_str(
number, PHONE_TYPE_MAPPER.get(phone_type, PhoneType.PERSONAL)
)
phones.append(found_phone)
primary_cif = member.findtext("IndividualId")
demo_object = DemographicInfo(
dob,
emails,
phones,
[address],
first_name,
last_name,
ssn,
mothers_maiden_name=mothers_maiden_name,
middle_name=middle_name,
title=title,
driver_license=drivers_lic,
primary_cif=primary_cif,
)
return demo_object