from typing import List, Optional
from lxml import objectify
from q2_cores.exceptions import CoreValidationException
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import Address, DemographicInfo, PhoneType, Phone
PHONE_TYPE_MAPPING = {
"HomePhone": PhoneType.PERSONAL,
"BusinessPhone": PhoneType.BUSINESS,
"Mobile": PhoneType.CELL,
"Other": PhoneType.OTHER,
"Fax": PhoneType.OTHER,
"Modem": PhoneType.OTHER,
"Pager": PhoneType.OTHER,
}
[docs]
class DemographicInfoMapper(BaseMapper):
def __init__(
self,
list_of_queries: List[BaseQuery],
hq_credentials: Optional[HqCredentials] = None,
zone_context: Optional[CoreUser] = None,
populate_mobile_phone_from_main_phone: Optional[bool] = False,
):
super().__init__(
list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context
)
self.populate_mobile_phone_from_main_phone = (
populate_mobile_phone_from_main_phone
)
[docs]
def parse_returned_queries(
self, list_of_queries: List[BaseQuery]
) -> DemographicInfo:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], BaseQuery), (
"Query must be an instance of FiservClearTouch.queries.InitialSearchQuery"
)
root = objectify.fromstring(list_of_queries[0].raw_core_response)
if root.Response.Status.StatusCode.text != "0":
raise CoreValidationException(
f"Core returned unsuccessful status: {root.Response.Status.StatusDesc.text}"
)
customer_info = root.Response.CustomerGeneralInformation
ssn = customer_info.TINId.TaxId.text
secret_word = customer_info.findtext("Keyword")
primary_cif = ssn
dob = customer_info.findtext("{*}BirthDate", "")
if customer_info.Name.CustomerType.text == "Personal":
first_name = customer_info.Name.PersonalName.findtext("{*}FirstName", "")
middle_name = customer_info.Name.PersonalName.findtext("{*}MiddleName", "")
last_name = customer_info.Name.PersonalName.findtext("{*}LastName", "")
title = customer_info.Name.PersonalName.findtext("{*}TitlePrefix", "")
else: # Type is Non-personal (Commercial)
first_name = ""
middle_name = ""
title = ""
last_name = customer_info.Name.NonPersonalName.findtext(
"{*}Name1", ""
) # Name1 contains the full name of a commercial/non-personal record
keyword = customer_info.findtext(
"{*}Keyword", ""
) # Some FIs use this field for mother's maiden name
drivers_lic = None
address = Address(
customer_info.Address.findtext("{*}Address1"),
(
customer_info.Address.findtext("{*}Address2")
if customer_info.Address.findtext("{*}Address2")
else ""
),
customer_info.Address.findtext("{*}City", "N/A"),
customer_info.Address.findtext("{*}State", ""),
customer_info.Address.findtext("{*}PostalCode")[:5],
country="USA",
)
contact_info = root.Response.CustomerContact
emails = []
phones = []
if contact_info.Email1.EmailAddress.text:
emails.append(contact_info.Email1.EmailAddress.text)
if contact_info.Email2.EmailAddress.text:
emails.append(contact_info.Email2.EmailAddress.text)
main_phone = Phone(
contact_info.MainPhone.AreaCode.text,
contact_info.MainPhone.PhoneNumber.text,
PHONE_TYPE_MAPPING[contact_info.MainPhone.PhoneType.text],
extension=(
contact_info.MainPhone.PhoneExtension.text
if contact_info.MainPhone.PhoneExtension.text != "0"
else None
),
)
phones.append(main_phone)
for record_count in range(5):
try:
if hasattr(contact_info, f"AdditionalPhone{record_count + 1}"):
phone_record = contact_info.find(
f"AdditionalPhone{record_count + 1}"
)
if phone_record.PhoneType.text != "AdditionalPhoneNotUsed":
phones.append(
Phone(
phone_record.AreaCode.text,
phone_record.PhoneNumber.text,
PHONE_TYPE_MAPPING[phone_record.PhoneType.text],
extension=(
phone_record.PhoneExtension.text
if phone_record.PhoneExtension.text != "0"
else None
),
)
)
else:
break
except AttributeError:
continue
if self.populate_mobile_phone_from_main_phone:
mobile_phone = Phone(
main_phone.area_code,
main_phone.phone_number,
PhoneType.CELL,
extension=main_phone.extension,
)
if mobile_phone not in phones:
phones.append(mobile_phone)
demo_object = DemographicInfo(
dob,
emails,
phones,
[address],
first_name,
last_name,
ssn,
middle_name=middle_name,
title=title,
driver_license=drivers_lic,
primary_cif=primary_cif,
mothers_maiden_name=keyword,
additional_details={"security_word": secret_word},
)
return demo_object