Source code for q2_cores.XP2.mappers.demographic_info_mapper

from typing import List

from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
    Address,
    DemographicInfo,
    DriverLicense,
    Phone,
    PhoneType,
)

from q2_cores.XP2.queries.demographic_info_query import DemographicInfoQuery
from q2_cores.exceptions import CoreException
from .xp2_mapper import XP2Mapper

PHONE_TYPE_MAPPER = {
    "Cellular": PhoneType.CELL,
    "Home": PhoneType.PERSONAL,
    "Business": PhoneType.BUSINESS,
}


[docs] class DemographicInfoMapper(XP2Mapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of XP2.queries.InitialSearchQuery" ) response = list_of_queries[0].raw_core_response root = super(DemographicInfoMapper, DemographicInfoMapper).unpack_return_mesage( response ) if hasattr(root, "ErrorMessage"): raise CoreException( f"Core returned unsuccessful status: {root.ErrorMessage.Message.text}" ) member = root.IndividualDetail first_name = member.findtext("FirstName") middle_name = member.findtext("MiddleName") last_name = member.findtext("LastName") title = member.findtext("Title") ssn = member.findtext("TaxId") dob = member.findtext("BirthDate") drivers_lic = DriverLicense( member.findtext("DriversLicense"), member.findtext("DriversLicenseState") ) mothers_maiden_name = member.findtext("MothersMaidenName") primary_address = member.PrimaryAddressId.text for address in member.Address: this_id = address.findtext("AddressId") if primary_address == this_id: address_node = address break else: raise CoreException("Could Not identify primary address") address = Address( address_node.findtext("Line1"), address_node.findtext("Line2") if address_node.findtext("Line2") else "", address_node.findtext("City"), address_node.findtext("State"), address_node.findtext("ZipCode").split("-")[0], country=address_node.findtext("Country"), ) emails = [] if member.findtext("Email1"): emails.append(member.findtext("Email1")) if member.findtext("Email2"): emails.append(member.findtext("Email2")) phones = [] for phone in member.findall("Phone"): if str(phone.findtext("IsPrevious")).lower() == "true": continue phone_type = phone.PhoneKey.PhoneType.text number = f"{phone.findtext('AreaCode')}{phone.findtext('PhoneNumber')}" if number and number != "0000000000": found_phone = Phone.build_from_str( number, PHONE_TYPE_MAPPER.get(phone_type, PhoneType.PERSONAL) ) phones.append(found_phone) primary_cif = member.findtext("IndividualId") demo_object = DemographicInfo( dob, emails, phones, [address], first_name, last_name, ssn, mothers_maiden_name=mothers_maiden_name, middle_name=middle_name, title=title, driver_license=drivers_lic, primary_cif=primary_cif, ) return demo_object