Source code for q2_cores.SOA.mappers.demographic_info_mapper

from datetime import datetime
from typing import List, Optional
from lxml import objectify

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.demographic import (
    Address,
    DemographicInfo,
    DriverLicense,
    Phone,
    PhoneType,
)
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_cores.SOA.queries.demographic_info_query import DemographicInfoQuery
from q2_cores.SOA.queries.address_query import AddressQuery


PHONE_TYPE_MAPPING = {
    "10": PhoneType.PERSONAL,
    "11": PhoneType.BUSINESS,
    "13": PhoneType.CELL,
}

CONTACT_DATA_MAPPING = {"1": "Email", "2": "Email"}


[docs] class DemographicInfoMapper(BaseMapper): def __init__( self, list_of_queries: List[BaseQuery], ssn: str, ident_info_mapping: dict, flex_field_mapping: dict, address_name_mapping: dict, primary_cif: str, hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, ): self.ssn_used_in_initial_search = ssn self.ident_info_mapping = ident_info_mapping self.flex_field_mapping = flex_field_mapping self.cleaned_queries = None self.address_name_mapping = address_name_mapping self.primary_cif = primary_cif super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context )
[docs] def parse_returned_queries( self, list_of_queries: List[BaseQuery] ) -> DemographicInfo: assert len(list_of_queries) >= 2 demo_queries = [ x for x in list_of_queries if isinstance(x, DemographicInfoQuery) ] address_queries = [x for x in list_of_queries if isinstance(x, AddressQuery)] assert len(demo_queries) >= 1, ( "At least one query needs to be an instance of SOA.queries.DemographicInfoQuery" ) assert len(address_queries) >= 1, ( "At least one query needs to be an instance of SOA.queries.AddressQuery" ) self.cleaned_queries = self.check_ssn_from_queries(demo_queries) address_ids = self.sort_through_address_ids() first_name, middle_initial, last_name = self.get_names_from_demos() dob = self.get_dob_from_demos() phones = self.gather_phones_from_demo() emails = self.gather_emails_from_demo() drivers_license = self.get_drivers_license() mothers_maiden_name = self.get_mothers_maiden_name() addresses = self.get_addresses(address_queries, address_ids) demo_object = DemographicInfo( dob, emails, phones, addresses, first_name, last_name, self.ssn_used_in_initial_search, mothers_maiden_name=mothers_maiden_name, middle_name=middle_initial, driver_license=drivers_license, primary_cif=self.primary_cif, ) return demo_object
[docs] def check_ssn_from_queries(self, list_of_queries) -> list: """ Searches a list of demo queries and checks that the ssn matches the given ssn :param list_of_queries: a list of demographic queries :return: list of queries already objectified """ verified_objects = [] for query in list_of_queries: root = objectify.fromstring(query.raw_core_response) cifn = root.CIFN this_ssn = cifn.findtext("TaxIdNumber") if this_ssn.zfill(9) != self.ssn_used_in_initial_search: continue verified_objects.append(root) return verified_objects
[docs] def sort_through_address_ids(self) -> list: address_ids = [] for demo in self.cleaned_queries: this_name = demo.CIFN.findtext("NameId") address_id = self.address_name_mapping.get(this_name) if address_id: address_ids.append(address_id) return address_ids
[docs] def get_names_from_demos(self) -> tuple: """ Searches a list of demo queries to find the names in the object :return: first name, middle initial, last name """ first_name = None last_name = None middle_initial = None for demo in self.cleaned_queries: if first_name and middle_initial and last_name: break if not first_name: first_name = demo.CIFN.findtext("ShortFirstName") if not last_name: last_name = demo.CIFN.findtext("ShortLastName") if not middle_initial: middle_initial = demo.CIFN.findtext("MiddleInitial") return first_name, middle_initial, last_name
[docs] def get_dob_from_demos(self) -> str: """ Searches a list of demo queries to find the dob and format it correctly :return: dob """ dob = None for demo in self.cleaned_queries: if not dob: found_dob = ( str(demo.CIFN.findtext("DateOfBirth")).split(".")[0].zfill(8) ) if not found_dob or found_dob == "00000000": continue initial_dob = datetime.strptime(found_dob, "%m%d%Y") dob = initial_dob.strftime("%m-%d-%Y") break return dob
[docs] def gather_phones_from_demo(self) -> list: """ Searches a list of demo queries to all phone numbers and builds phone objects from them :return: A list of phone objects """ phones = [] for query in self.cleaned_queries: try: for node in query.CIFN.PhoneData.CifnPhoneData: phone_type = PHONE_TYPE_MAPPING.get(node.findtext("PhoneCode")) if not phone_type: continue area_code = node.findtext("PhoneArea") if area_code and area_code != "000": phones.append( Phone.build_from_str( f"{area_code}{node.findtext('PhoneNumber')}", phone_type ) ) except AttributeError: continue return phones
[docs] def gather_emails_from_demo(self) -> list: """ Searches a list of demo queries to all emails :param list_of_queries: A list of already cleared demo objects :return: A list of emails """ emails = [] for query in self.cleaned_queries: try: for node in query.CIFN.ContactData.CifnContactData: contact_code = node.findtext("ContactCode") if contact_code: mapped_value = CONTACT_DATA_MAPPING.get(contact_code) if not mapped_value: continue emails.append(node.findtext("ContactInfo")) except AttributeError: continue return emails
[docs] @staticmethod def search_ident_fields(xml_object, type_code) -> tuple: """ Searches the ident fields in the demo object for various data :param xml_object: The xml object to search in :param type_code: The type code to look for :return: The value for the ident field if found and issuing value """ value = None issued_by = None try: nodes = xml_object.CIFN.IdentInfo.CifnIdentInfo except AttributeError: nodes = [] for node in nodes: if issued_by and value: break id_type = node.findtext("IdentType") if id_type: if id_type == type_code: value = node.findtext("IdentValue") issued_by = node.findtext("IdentIssuedBy") return value, issued_by
[docs] @staticmethod def search_flex_fields(xml_object, type_code) -> str: """ Searches the flex fields in the demo object for various data :param xml_object: The xml object to search in :param type_code: The type code to look for :return: The value for the ident field if found and issuing value """ value = None try: nodes = xml_object.BFDF except AttributeError: nodes = [] for node in nodes: if value: break field_code = node.findtext("FieldCode") if field_code: if field_code == type_code: value = node.findtext("Value") return value
[docs] def get_drivers_license(self) -> DriverLicense: """ Searches a list of demo queries to find the drivers license wherever it might be :return: drivers license object """ dl_object = None ident_fields_to_search = [ key for key, value in self.ident_info_mapping.items() if value == "DriversLicense" ] flex_fields_to_search = [ key for key, value in self.flex_field_mapping.items() if value == "DriversLicense" ] for query in self.cleaned_queries: if dl_object: break if ident_fields_to_search: for code in ident_fields_to_search: dl_number, dl_state = self.search_ident_fields(query, code) if dl_number: dl_object = DriverLicense(dl_number, dl_state) break if flex_fields_to_search: for code in flex_fields_to_search: dl_number = self.search_flex_fields(query, code) if dl_number: dl_object = DriverLicense(dl_number, "") break return dl_object
[docs] def get_mothers_maiden_name(self) -> str: """ Searches a list of demo queries to find the Mothers Maiden Name wherever it might be :return: drivers license object """ mothers_maiden_name = None ident_fields_to_search = [ key for key, value in self.ident_info_mapping.items() if value == "MothersMaidenName" ] flex_fields_to_search = [ key for key, value in self.flex_field_mapping.items() if value == "MothersMaidenName" ] for query in self.cleaned_queries: if mothers_maiden_name: break if ident_fields_to_search: for code in ident_fields_to_search: mmn = self.search_ident_fields(query, code) if mmn: mothers_maiden_name = mmn[0] break if flex_fields_to_search: for code in flex_fields_to_search: mmn = self.search_flex_fields(query, code) if mmn: mothers_maiden_name = mmn break return mothers_maiden_name
[docs] def get_addresses(self, list_of_address_queries, address_ids) -> List[Address]: """ Creates Address objects for each address query given :param list_of_address_queries: A list of address queries :param address_ids: A list of address IDs to look for specifically :return: A list of Address objects """ addresses = [] if not address_ids: # if no address ids, get all addresses for query in list_of_address_queries: root = objectify.fromstring(query.raw_core_response) for node in root.CIFA: address = self.unpack_address(node) if address: addresses.append(address) else: for query in list_of_address_queries: root = objectify.fromstring(query.raw_core_response) for node in root.CIFA: this_address_id = node.AddressId.text if this_address_id in address_ids: address = self.unpack_address(node) if address: addresses.append(address) return addresses
[docs] @staticmethod def unpack_address(address_object) -> Optional[Address]: """ Unpacks the address from an xml object and returns it as an Address object :param address_object: xml object of address response :return: Address object """ address_1 = address_object.findtext("Address") address_2 = address_object.findtext("Address2") csz = address_object.findtext("CityStZip") if not csz: return None city_state_zip = csz.split(" ") cleaned_csz = [x for x in city_state_zip if x] if len(cleaned_csz) < 3: return None zip_code = cleaned_csz[-1].split("-")[0] state = cleaned_csz[-2] city = cleaned_csz[:-2] city = " ".join(city) return Address(address_1, address_2, city, state, zip_code)