Source code for q2_cores.EPL.mappers.demographic_info_mapper

from typing import List
from lxml import objectify

from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.demographic import (
    Address,
    AddressType,
    DemographicInfo,
    Phone,
    PhoneType,
)

from q2_cores.EPL.queries.demographic_info_query import DemographicInfoQuery
from q2_cores.exceptions import CoreException

PHONE_TYPE_MAPPER = {
    "Other": PhoneType.PERSONAL,
    "Home": PhoneType.PERSONAL,
    "Work": PhoneType.BUSINESS,
}

ADDRESS_TYPE_MAPPER = {"Home": AddressType.HOME, "Work": AddressType.BUSINESS}


[docs] class DemographicInfoMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of EPL.queries.DemographicInfoQuery" ) response = list_of_queries[0].raw_core_response root = objectify.fromstring(response) has_error = bool(root.findtext(".//{*}hasErrors") == "true") if has_error: raise CoreException("Core returned an error") first_name = root.findtext(".//{*}MemberFirstName") middle_name = root.findtext(".//{*}MemberMiddleName") last_name = root.findtext(".//{*}MemberLastName") ssn = root.findtext(".//{*}TIN").replace("-", "") dob = root.findtext(".//{*}BirthDate") mothers_maiden_name = root.findtext(".//{*}MotherMaidenName") found_emails = [] emails = root.findall(".//{*}Email") for email in emails: found_emails.append(email.findtext(".//{*}EmailAddress")) found_phones = [] phones = root.findall(".//{*}Phone") for phone in phones: number = phone.findtext(".//{*}PhoneNumber") if not number: continue phone_type = PHONE_TYPE_MAPPER.get(phone.findtext(".//{*}PhoneType")) if not phone_type: continue phone_sub_type = phone.findtext(".//{*}PhoneSubType") if phone_sub_type.lower() == "mobile": phone_type = PhoneType.CELL found_phones.append(Phone.build_from_str(number, phone_type)) found_addresses = [] addresses = root.findall(".//{*}Address") for address in addresses: address_type = ADDRESS_TYPE_MAPPER.get( address.findtext(".//{*}AddressType") ) if not address_type: address_type = AddressType.HOME line_1 = address.findtext(".//{*}AddressLine1") line_2 = address.findtext(".//{*}AddressLine2") county = address.findtext(".//{*}County") city = address.findtext(".//{*}City") state = address.findtext(".//{*}State") zip_code = address.findtext(".//{*}zip") country = "USA" international = bool(address.findtext(".//{*}ForeignInd") == "true") if international: country = address.findtext(".//{*}CountryCode") this_address = Address( line_1, line_2, city, state, zip_code, address_type=address_type, province=county, country=country, ) found_addresses.append(this_address) demo_object = DemographicInfo( dob, found_emails, found_phones, found_addresses, first_name, last_name, ssn, mothers_maiden_name=mothers_maiden_name, middle_name=middle_name, ) return demo_object