Source code for q2_cores.Corelation.mappers.demographic_info_mapper

from typing import List, Optional, Union

from lxml import objectify
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
    Address,
    DemographicInfo,
    DriverLicense,
    Phone,
    PhoneType,
)

from ...exceptions import (
    CoreException,
    CoreValidationException,
    DeceasedCustomerException,
)
from ..queries.demographic_info_query import DemographicInfoQuery
from ..utils import SearchType


PHONE_TYPE_MAPPING = {
    "HP": PhoneType.PERSONAL,
    "PC": PhoneType.CELL,
    "BP": PhoneType.BUSINESS,
}


[docs] class DemographicInfoMapper(BaseMapper): def __init__( self, list_of_queries: List[BaseQuery], ssn: str, cif: str, hq_credentials: Optional[HqCredentials] = None, search_type: SearchType = SearchType.LOGIN_VERIFY, configs=None, ): self.ssn = ssn self.cif = cif self.search_type = search_type if self.search_type == SearchType.PERSON_VERIFICATION: self.addr_typ_priority_list = getattr( configs, "ADDR_TYP_PRIORITY_ORDER_FOR_DEMO_INFO", ["R"] ) alert_descriptions = ["deceased", "new address needed"] self.alert_descriptions = getattr( configs, "ALERT_DESCRIPTIONS", alert_descriptions ) super().__init__(list_of_queries, hq_credentials=hq_credentials)
[docs] def parse_returned_queries( self, list_of_queries: List[BaseQuery] ) -> Union[DemographicInfo, list[DemographicInfo]]: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of Corelation.queries.demographic_info_query.DemographicInfoQuery" ) response = list_of_queries[0].raw_core_response root = objectify.fromstring(response) match self.search_type: case SearchType.LOGIN_VERIFY: demo_obj = self._build_from_login_verify_response(root) case SearchType.PERSON_VERIFICATION: demo_obj = self._build_from_person_verification_response(root) return demo_obj
def _build_from_login_verify_response(self, root) -> DemographicInfo: person_info = root.sequence.transaction.step.search.resultRow.person first_name = person_info.findtext("firstName", "") middle_name = person_info.findtext("middleName") last_name = person_info.lastName.text dob = person_info.birthDate.text mother_maiden = "" drivers_lic = "" if hasattr(person_info, "identification"): for identity in person_info.identification: id_type = identity.category.get("option") if id_type == "MM": mother_maiden = identity.value.text elif id_type == "DL": drivers_lic = DriverLicense( identity.value.text, identity.issuer.text ) emails = [] phones = [] address = [] contact_records = {} for tac in person_info.contact: tac_type = tac.category.get("option") if PHONE_TYPE_MAPPING.get(tac_type): number = tac.value.text found_phone = Phone.build_from_str( number, PHONE_TYPE_MAPPING.get(tac_type, PhoneType.PERSONAL) ) contact_records[tac.serial.text] = found_phone phones.append(found_phone) else: emails.append(tac.value.text) contact_records[tac.serial.text] = tac.value.text for addr in person_info.address: if addr.category.get("option") == "R": address.append( Address( addr.street.text, "", addr.city.text, addr.state.text, addr.postalCode.text, ) ) break return DemographicInfo( dob, emails, phones, address, first_name, last_name, self.ssn, mother_maiden, middle_name, driver_license=drivers_lic, primary_cif=self.cif, additional_details={ "contact_records": contact_records, }, ) def _build_from_person_verification_response(self, root) -> DemographicInfo: try: core_err_msg = root.sequence.transaction.step.search.exception.message.text if core_err_msg == "Invalid Account Number": raise CoreValidationException( f"Core returned exception message: {core_err_msg}" ) except AttributeError as ae_inst: # noqa: F841 self.list_of_queries[0].logger.debug("No core exception message found.") user_info = {} try: user_info["serial"] = ( root.sequence.transaction.step.search.resultRow.serial.text ) except AttributeError as ae_msg: self.list_of_queries[0].logger.error( "No records were found using the submitted data" ) raise CoreException(ae_msg) # demo_info = None demo_info_objs = [] for result in root.sequence.transaction.step.search.resultRow: if hasattr(result.person, "alert"): for alert in result.person.alert: self._verify_alert(root, result, alert) is_non_person_entity = hasattr(result.person, "category") first_name = ( result.person.firstName.text if not is_non_person_entity else "" ) middle_name = ( result.person.findtext("middleName") if not is_non_person_entity else "" ) last_name = result.person.lastName.text dob = result.person.findtext("birthDate") mother_maiden = "" drivers_lic = "" if hasattr(result.person, "identification"): for identity in result.person.identification: id_type = identity.category.get("option") if id_type == "MM": mother_maiden = identity.value.text elif id_type == "DL": drivers_lic = DriverLicense( identity.value.text, identity.issuer.text ) if not hasattr(result.person, "contact") or not hasattr( result.person, "address" ): continue emails, phones = _get_contact_methods_by_person_record(result.person) addresses = _get_addresses_by_person_record( result.person, self.addr_typ_priority_list ) demo_info = DemographicInfo( dob, emails, phones, addresses, first_name, last_name, self.ssn, mother_maiden, middle_name, driver_license=drivers_lic, user_info=user_info, primary_cif=self.cif, ) demo_info_objs.append(demo_info) demo_return = demo_info_objs[0] if demo_info_objs else None return demo_return def _verify_alert(self, root, result, alert) -> None: description = alert.typeDescription.text if description.lower() in (alert.lower() for alert in self.alert_descriptions): if description.lower() == "deceased": if len(root.sequence.transaction.step.search.resultRow) == 1: raise DeceasedCustomerException("Deceased Customer") else: self.list_of_queries[0].logger.warning( f"The user with SSN {result.person.tinSuffix.text} is deceased so skip" ) else: raise CoreException(description)
def _get_contact_methods_by_person_record(person_record) -> tuple: """Returns email addresses and phones associated with a Person record :param person_record: Person record :return: Tuple containing list of emails and a list of phones """ emails = [] phones = [] for tac in person_record.contact: tac_type = tac.category.get("option") if PHONE_TYPE_MAPPING.get(tac_type): number = tac.value.text found_phone = Phone.build_from_str( number, PHONE_TYPE_MAPPING.get(tac_type, PhoneType.PERSONAL) ) phones.append(found_phone) else: emails.append(tac.value.text) return emails, phones def _get_addresses_by_person_record( person_record, addr_typ_priority_list: list ) -> list: """Returns addresses associated with a Person record :param person_record: Person record :param addr_typ_priority_list: List of strings indicating the priority order of the addresses :return: List of addresses """ addresses = [] address_serial = None for addr_type in addr_typ_priority_list: for addr in person_record.address: if ( not address_serial and addr.category.get("option") == addr_type ) or addr.addressSerial.text == str(address_serial): addresses.append( Address( addr.street.text, "", addr.city.text, addr.state.text if hasattr(addr, "state") else "", addr.postalCode.text, ) ) break if addresses: break return addresses