from datetime import datetime
from typing import List, Optional
from lxml import objectify
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.demographic import (
Address,
DemographicInfo,
DriverLicense,
Phone,
PhoneType,
)
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_cores.SOA.queries.demographic_info_query import DemographicInfoQuery
from q2_cores.SOA.queries.address_query import AddressQuery
PHONE_TYPE_MAPPING = {
"10": PhoneType.PERSONAL,
"11": PhoneType.BUSINESS,
"13": PhoneType.CELL,
}
CONTACT_DATA_MAPPING = {"1": "Email", "2": "Email"}
[docs]
class DemographicInfoMapper(BaseMapper):
def __init__(
self,
list_of_queries: List[BaseQuery],
ssn: str,
ident_info_mapping: dict,
flex_field_mapping: dict,
address_name_mapping: dict,
primary_cif: str,
hq_credentials: Optional[HqCredentials] = None,
zone_context: Optional[CoreUser] = None,
):
self.ssn_used_in_initial_search = ssn
self.ident_info_mapping = ident_info_mapping
self.flex_field_mapping = flex_field_mapping
self.cleaned_queries = None
self.address_name_mapping = address_name_mapping
self.primary_cif = primary_cif
super().__init__(
list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context
)
[docs]
def parse_returned_queries(
self, list_of_queries: List[BaseQuery]
) -> DemographicInfo:
assert len(list_of_queries) >= 2
demo_queries = [
x for x in list_of_queries if isinstance(x, DemographicInfoQuery)
]
address_queries = [x for x in list_of_queries if isinstance(x, AddressQuery)]
assert len(demo_queries) >= 1, (
"At least one query needs to be an instance of SOA.queries.DemographicInfoQuery"
)
assert len(address_queries) >= 1, (
"At least one query needs to be an instance of SOA.queries.AddressQuery"
)
self.cleaned_queries = self.check_ssn_from_queries(demo_queries)
address_ids = self.sort_through_address_ids()
first_name, middle_initial, last_name = self.get_names_from_demos()
dob = self.get_dob_from_demos()
phones = self.gather_phones_from_demo()
emails = self.gather_emails_from_demo()
drivers_license = self.get_drivers_license()
mothers_maiden_name = self.get_mothers_maiden_name()
addresses = self.get_addresses(address_queries, address_ids)
demo_object = DemographicInfo(
dob,
emails,
phones,
addresses,
first_name,
last_name,
self.ssn_used_in_initial_search,
mothers_maiden_name=mothers_maiden_name,
middle_name=middle_initial,
driver_license=drivers_license,
primary_cif=self.primary_cif,
)
return demo_object
[docs]
def check_ssn_from_queries(self, list_of_queries) -> list:
"""
Searches a list of demo queries and checks that the ssn matches the given ssn
:param list_of_queries: a list of demographic queries
:return: list of queries already objectified
"""
verified_objects = []
for query in list_of_queries:
root = objectify.fromstring(query.raw_core_response)
cifn = root.CIFN
this_ssn = cifn.findtext("TaxIdNumber")
if this_ssn.zfill(9) != self.ssn_used_in_initial_search:
continue
verified_objects.append(root)
return verified_objects
[docs]
def sort_through_address_ids(self) -> list:
address_ids = []
for demo in self.cleaned_queries:
this_name = demo.CIFN.findtext("NameId")
address_id = self.address_name_mapping.get(this_name)
if address_id:
address_ids.append(address_id)
return address_ids
[docs]
def get_names_from_demos(self) -> tuple:
"""
Searches a list of demo queries to find the names in the object
:return: first name, middle initial, last name
"""
first_name = None
last_name = None
middle_initial = None
for demo in self.cleaned_queries:
if first_name and middle_initial and last_name:
break
if not first_name:
first_name = demo.CIFN.findtext("ShortFirstName")
if not last_name:
last_name = demo.CIFN.findtext("ShortLastName")
if not middle_initial:
middle_initial = demo.CIFN.findtext("MiddleInitial")
return first_name, middle_initial, last_name
[docs]
def get_dob_from_demos(self) -> str:
"""
Searches a list of demo queries to find the dob and format it correctly
:return: dob
"""
dob = None
for demo in self.cleaned_queries:
if not dob:
found_dob = (
str(demo.CIFN.findtext("DateOfBirth")).split(".")[0].zfill(8)
)
if not found_dob or found_dob == "00000000":
continue
initial_dob = datetime.strptime(found_dob, "%m%d%Y")
dob = initial_dob.strftime("%m-%d-%Y")
break
return dob
[docs]
def gather_phones_from_demo(self) -> list:
"""
Searches a list of demo queries to all phone numbers and builds phone objects from them
:return: A list of phone objects
"""
phones = []
for query in self.cleaned_queries:
try:
for node in query.CIFN.PhoneData.CifnPhoneData:
phone_type = PHONE_TYPE_MAPPING.get(node.findtext("PhoneCode"))
if not phone_type:
continue
area_code = node.findtext("PhoneArea")
if area_code and area_code != "000":
phones.append(
Phone.build_from_str(
f"{area_code}{node.findtext('PhoneNumber')}", phone_type
)
)
except AttributeError:
continue
return phones
[docs]
def gather_emails_from_demo(self) -> list:
"""
Searches a list of demo queries to all emails
:param list_of_queries: A list of already cleared demo objects
:return: A list of emails
"""
emails = []
for query in self.cleaned_queries:
try:
for node in query.CIFN.ContactData.CifnContactData:
contact_code = node.findtext("ContactCode")
if contact_code:
mapped_value = CONTACT_DATA_MAPPING.get(contact_code)
if not mapped_value:
continue
emails.append(node.findtext("ContactInfo"))
except AttributeError:
continue
return emails
[docs]
@staticmethod
def search_ident_fields(xml_object, type_code) -> tuple:
"""
Searches the ident fields in the demo object for various data
:param xml_object: The xml object to search in
:param type_code: The type code to look for
:return: The value for the ident field if found and issuing value
"""
value = None
issued_by = None
try:
nodes = xml_object.CIFN.IdentInfo.CifnIdentInfo
except AttributeError:
nodes = []
for node in nodes:
if issued_by and value:
break
id_type = node.findtext("IdentType")
if id_type:
if id_type == type_code:
value = node.findtext("IdentValue")
issued_by = node.findtext("IdentIssuedBy")
return value, issued_by
[docs]
@staticmethod
def search_flex_fields(xml_object, type_code) -> str:
"""
Searches the flex fields in the demo object for various data
:param xml_object: The xml object to search in
:param type_code: The type code to look for
:return: The value for the ident field if found and issuing value
"""
value = None
try:
nodes = xml_object.BFDF
except AttributeError:
nodes = []
for node in nodes:
if value:
break
field_code = node.findtext("FieldCode")
if field_code:
if field_code == type_code:
value = node.findtext("Value")
return value
[docs]
def get_drivers_license(self) -> DriverLicense:
"""
Searches a list of demo queries to find the drivers license wherever it might be
:return: drivers license object
"""
dl_object = None
ident_fields_to_search = [
key
for key, value in self.ident_info_mapping.items()
if value == "DriversLicense"
]
flex_fields_to_search = [
key
for key, value in self.flex_field_mapping.items()
if value == "DriversLicense"
]
for query in self.cleaned_queries:
if dl_object:
break
if ident_fields_to_search:
for code in ident_fields_to_search:
dl_number, dl_state = self.search_ident_fields(query, code)
if dl_number:
dl_object = DriverLicense(dl_number, dl_state)
break
if flex_fields_to_search:
for code in flex_fields_to_search:
dl_number = self.search_flex_fields(query, code)
if dl_number:
dl_object = DriverLicense(dl_number, "")
break
return dl_object
[docs]
def get_mothers_maiden_name(self) -> str:
"""
Searches a list of demo queries to find the Mothers Maiden Name wherever it might be
:return: drivers license object
"""
mothers_maiden_name = None
ident_fields_to_search = [
key
for key, value in self.ident_info_mapping.items()
if value == "MothersMaidenName"
]
flex_fields_to_search = [
key
for key, value in self.flex_field_mapping.items()
if value == "MothersMaidenName"
]
for query in self.cleaned_queries:
if mothers_maiden_name:
break
if ident_fields_to_search:
for code in ident_fields_to_search:
mmn = self.search_ident_fields(query, code)
if mmn:
mothers_maiden_name = mmn[0]
break
if flex_fields_to_search:
for code in flex_fields_to_search:
mmn = self.search_flex_fields(query, code)
if mmn:
mothers_maiden_name = mmn
break
return mothers_maiden_name
[docs]
def get_addresses(self, list_of_address_queries, address_ids) -> List[Address]:
"""
Creates Address objects for each address query given
:param list_of_address_queries: A list of address queries
:param address_ids: A list of address IDs to look for specifically
:return: A list of Address objects
"""
addresses = []
if not address_ids: # if no address ids, get all addresses
for query in list_of_address_queries:
root = objectify.fromstring(query.raw_core_response)
for node in root.CIFA:
address = self.unpack_address(node)
if address:
addresses.append(address)
else:
for query in list_of_address_queries:
root = objectify.fromstring(query.raw_core_response)
for node in root.CIFA:
this_address_id = node.AddressId.text
if this_address_id in address_ids:
address = self.unpack_address(node)
if address:
addresses.append(address)
return addresses
[docs]
@staticmethod
def unpack_address(address_object) -> Optional[Address]:
"""
Unpacks the address from an xml object and returns it as an Address object
:param address_object: xml object of address response
:return: Address object
"""
address_1 = address_object.findtext("Address")
address_2 = address_object.findtext("Address2")
csz = address_object.findtext("CityStZip")
if not csz:
return None
city_state_zip = csz.split(" ")
cleaned_csz = [x for x in city_state_zip if x]
if len(cleaned_csz) < 3:
return None
zip_code = cleaned_csz[-1].split("-")[0]
state = cleaned_csz[-2]
city = cleaned_csz[:-2]
city = " ".join(city)
return Address(address_1, address_2, city, state, zip_code)