from typing import List
from lxml import objectify
from q2_sdk.models.demographic import (
DemographicInfo,
Phone,
PhoneType,
Address,
AddressType,
DriverLicense,
)
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_cores.JXchange.queries import DemographicInfoQuery
PHONE_TYPE_MAPPING = {
"Home Phone": PhoneType.PERSONAL,
"Business Phone": PhoneType.BUSINESS,
"Home Cell Phone": PhoneType.CELL,
"Personal Cell Phone": PhoneType.CELL,
"Business Cell Phone": PhoneType.CELL,
"Home Fax Number": PhoneType.BUSINESS,
"Business Fax Number": PhoneType.BUSINESS,
}
[docs]
class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]):
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of JXchange.queries.DemographicInfoQuery"
)
response = list_of_queries[0].raw_core_response
root = objectify.fromstring(response)
cust_detail_node = root.CustRec.CustDetail
person_name = cust_detail_node.PersonName
social_security = root.x_TaxDetail.TaxDetail.TINInfo.TaxId.text
first_name = person_name.findtext("{*}FirstName", "")
middle_name = person_name.findtext("{*}MiddleName", "")
last_name = person_name.findtext("{*}LastName", "")
date_of_birth = cust_detail_node.findtext("{*}BirthDt", "")
primary_cif = root.CustRec.findtext("{*}CustId", "")
phones = []
for node in cust_detail_node.PhoneArray.PhoneInfo:
phone_number = node.findtext("{*}PhoneNum")
if not phone_number:
continue
if len(phone_number) < 10:
continue
phones.append(
Phone.build_from_str(
phone_number, PHONE_TYPE_MAPPING[node.PhoneType.text]
)
)
emails = []
for entry in cust_detail_node.EmailArray.EmailInfo:
email = entry.findtext("{*}EmailAddr")
if email:
emails.append(email)
address_node = cust_detail_node.Addr
addresses = [
Address(
address_node.findtext("{*}StreetAddr1"),
"",
address_node.findtext("{*}City"),
address_node.findtext("{*}StateCode"),
address_node.findtext("{*}PostalCode"),
AddressType.HOME,
)
]
drivers_license = None
try:
for node in root.x_IdVerify.IdVerifyArray.IdVerify:
id_code = node.findtext("{*}IdVerifyCode")
if id_code and id_code.upper() == "DL":
dl_num = node.findtext("{*}IdVerifyVal", "")
dl_state = node.findtext("{*}IdIssueBy", "")
drivers_license = DriverLicense(dl_num, dl_state)
break
except AttributeError:
pass
return DemographicInfo(
date_of_birth,
emails,
phones,
addresses,
first_name,
last_name,
social_security,
middle_name=middle_name,
primary_cif=primary_cif,
driver_license=drivers_license,
)