from typing import List, Optional
from lxml import objectify
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.demographic import Address, DemographicInfo, Phone, PhoneType
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_cores.exceptions import CoreException
from ..queries import CoreAPIBaseQuery
[docs]
class DemographicInfoMapper(BaseDemographicInfoMapper):
def __init__(
self,
list_of_queries: List[CoreAPIBaseQuery],
get_cif_from_user_fields: Optional[dict] = None,
hq_credentials: Optional[HqCredentials] = None,
zone_context: Optional[CoreUser] = None,
):
super().__init__(
list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context
)
self.get_cif_from_user_fields = get_cif_from_user_fields
[docs]
def parse_returned_queries(self, list_of_queries: List[CoreAPIBaseQuery]):
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], CoreAPIBaseQuery)
root = objectify.fromstring(list_of_queries[0].raw_core_response)
if hasattr(root.SubmitRequestResult.Output.UserAuthentication.Errors, "Error"):
raise CoreException(
f"Core returned Error: "
f"{root.SubmitRequestResult.Output.UserAuthentication.Errors.Error.ErrorMessage.text}"
)
response_base = root.SubmitRequestResult.Output.Responses.ResponseBase
if hasattr(response_base.Errors, "Error"):
raise CoreException(
f"Core returned Error: {response_base.Errors.Error.ErrorMessage.text}"
)
primary_cif = response_base.findtext("MemberNumber", "")
first_name = response_base.Name.findtext("FirstName", "")
last_name = response_base.Name.findtext("LastName", "")
middle_name = response_base.Name.findtext("MiddleName", " ")
title = response_base.Name.findtext("Suffix", " ")
ssn = response_base.Demographics.findtext("TaxId", "")
dob = response_base.Demographics.findtext("DateOfBirth").split("T")[0]
for address in response_base.Addresses.PersOrgAddress:
if address.AddressUseCode.text == "PRI":
address_line1 = ""
address_line2 = ""
for address_line in address.AddressLines.PersOrgAddressLine:
if address_line.AddressLineNumber.text == "1":
address_line1 = address_line.AddressLineText.text
if address_line.AddressLineNumber.text == "2":
address_line2 = address_line.AddressLineText.text
zipcode = address.findtext("ZipCode")
if address.findtext("ZipSuffix"):
zipcode = f"{zipcode}-{address.ZipSuffix.text}"
addresses = Address(
address_line1,
address_line2,
address.findtext("CityName", ""),
address.findtext("StateCode", ""),
zipcode,
)
emails = []
email_details = {}
if hasattr(response_base.ElectronicAddresses, "PersOrgElectronicAddress"):
for email in response_base.ElectronicAddresses.PersOrgElectronicAddress:
if email.ElectronicAddressLines.PersOrgElectronicAddressLine.findtext(
"AddressLineText"
):
email_addr = email.ElectronicAddressLines.PersOrgElectronicAddressLine.findtext(
"AddressLineText"
)
emails.append(email_addr)
email_details[email_addr] = email.findtext("AddressUseCode", "")
additional_details = {"email_details": email_details, "phone_sequence_data": {}}
phone_list = []
if hasattr(response_base.Phones, "PersOrgPhone"):
for phone in response_base.Phones.PersOrgPhone:
try:
phone_type = phone.PhoneUseCode.text
if phone_type == "PER":
home_phone = Phone(
area_code=phone.AreaCode.text,
phone_number=phone.Exchange.text + phone.PhoneNumber.text,
phone_type=PhoneType.PERSONAL,
)
phone_list.append(home_phone)
additional_details["phone_sequence_data"][phone_type] = (
phone.PhoneSequenceNumber.text
)
if phone_type == "BUS":
work_phone = Phone(
area_code=phone.AreaCode.text,
phone_number=phone.Exchange.text + phone.PhoneNumber.text,
phone_type=PhoneType.BUSINESS,
extension=phone.PhoneExtension.text,
)
phone_list.append(work_phone)
additional_details["phone_sequence_data"][phone_type] = (
phone.PhoneSequenceNumber.text
)
if phone_type == "CELL":
mobile_phone = Phone(
area_code=phone.AreaCode.text,
phone_number=phone.Exchange.text + phone.PhoneNumber.text,
phone_type=PhoneType.CELL,
)
phone_list.append(mobile_phone)
additional_details["phone_sequence_data"][phone_type] = (
phone.PhoneSequenceNumber.text
)
except AttributeError:
continue
person_number = response_base.findtext("PersonNumber", "")
if self.get_cif_from_user_fields:
if hasattr(response_base, "UserFields"):
if hasattr(response_base.UserFields, "PersOrgUserField"):
for user_fields in response_base.UserFields.PersOrgUserField:
for name, code in self.get_cif_from_user_fields.items():
if user_fields.findtext(".//{*}%s" % name) == code:
primary_cif = user_fields.Value.text
break
additional_details["person_number"] = person_number
return DemographicInfo(
dob,
emails,
phone_list,
[addresses],
first_name,
last_name,
ssn,
middle_name=middle_name,
primary_cif=primary_cif,
title=title,
additional_details=additional_details,
)