import re
from lxml import objectify
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.demographic import (
Address,
AddressType,
Phone,
PhoneType,
DemographicInfo,
)
from q2_cores import exceptions
from ..queries.demographic_info_query import DemographicInfoQuery
ADDRESS_TYPE_MAP = {
"Personal": AddressType.HOME,
"Business": AddressType.BUSINESS,
}
[docs]
class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: list[BaseQuery]) -> DemographicInfo:
"""
Parses a list of queries into an object containing member demographic information.
:param list_of_queries: a :py:class:`List` of py:class:`BaseQuery` objects to operate against.
:return: a :py:class:`DemographicInfo` object
"""
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of Phoenix.queries.DemographicInfoQuery"
)
raw_response = re.sub(r"<\?xml+.+\?>", "", list_of_queries[0].raw_core_response)
response_obj = objectify.fromstring(raw_response)
error_return_code = response_obj.RESPONSE.findtext("ReturnCode")
if error_return_code != "0":
raise exceptions.CoreException(
"Could not retrieve demographic info from the core"
)
record = response_obj.find("./RESPONSE/RECORD")
rim_type = record.RimType.text.strip()
demographic_data = DemographicInfo(
date_of_birth=record.BirthDt.text,
list_of_emails=[record.EmailAddr1.text, record.EmailAddr2.text],
list_of_phones=[
Phone.build_from_str(
f"{record.Phone1.text} x{record.Phone1Ext.text}", PhoneType.PERSONAL
),
Phone.build_from_str(
f"{record.Phone2.text} x{record.Phone2Ext.text}", PhoneType.BUSINESS
),
Phone.build_from_str(
f"{record.Phone3.text} x{record.Phone3Ext.text}", PhoneType.CELL
),
],
list_of_addresses=[
Address(
address_1=record.AddrLine1.text,
address_2=record.AddrLine2.text,
city=record.City.text.strip(),
state=record.State.text.strip(),
zipcode=record.PostalCode.text.strip(),
address_type=ADDRESS_TYPE_MAP.get(rim_type, "Personal"),
province=record.findtext("Province", "").strip(),
country=record.CountryCode.text.strip(),
)
],
first_name=record.FirstName.text,
last_name=record.LastName.text,
ssn=record.Tin.text.strip(),
mothers_maiden_name=record.MotherMaidenName.text,
middle_name=record.MiddleInitial.text,
title=record.Title.text,
primary_cif=record.RimNo.text,
)
return demographic_data