Source code for q2_cores.Phoenix.mappers.demographic_info_mapper

import re
from lxml import objectify
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.demographic import (
    Address,
    AddressType,
    Phone,
    PhoneType,
    DemographicInfo,
)
from q2_cores import exceptions
from ..queries.demographic_info_query import DemographicInfoQuery


ADDRESS_TYPE_MAP = {
    "Personal": AddressType.HOME,
    "Business": AddressType.BUSINESS,
}


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: list[BaseQuery]) -> DemographicInfo: """ Parses a list of queries into an object containing member demographic information. :param list_of_queries: a :py:class:`List` of py:class:`BaseQuery` objects to operate against. :return: a :py:class:`DemographicInfo` object """ assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of Phoenix.queries.DemographicInfoQuery" ) raw_response = re.sub(r"<\?xml+.+\?>", "", list_of_queries[0].raw_core_response) response_obj = objectify.fromstring(raw_response) error_return_code = response_obj.RESPONSE.findtext("ReturnCode") if error_return_code != "0": raise exceptions.CoreException( "Could not retrieve demographic info from the core" ) record = response_obj.find("./RESPONSE/RECORD") rim_type = record.RimType.text.strip() demographic_data = DemographicInfo( date_of_birth=record.BirthDt.text, list_of_emails=[record.EmailAddr1.text, record.EmailAddr2.text], list_of_phones=[ Phone.build_from_str( f"{record.Phone1.text} x{record.Phone1Ext.text}", PhoneType.PERSONAL ), Phone.build_from_str( f"{record.Phone2.text} x{record.Phone2Ext.text}", PhoneType.BUSINESS ), Phone.build_from_str( f"{record.Phone3.text} x{record.Phone3Ext.text}", PhoneType.CELL ), ], list_of_addresses=[ Address( address_1=record.AddrLine1.text, address_2=record.AddrLine2.text, city=record.City.text.strip(), state=record.State.text.strip(), zipcode=record.PostalCode.text.strip(), address_type=ADDRESS_TYPE_MAP.get(rim_type, "Personal"), province=record.findtext("Province", "").strip(), country=record.CountryCode.text.strip(), ) ], first_name=record.FirstName.text, last_name=record.LastName.text, ssn=record.Tin.text.strip(), mothers_maiden_name=record.MotherMaidenName.text, middle_name=record.MiddleInitial.text, title=record.Title.text, primary_cif=record.RimNo.text, ) return demographic_data