Source code for q2_cores.JXchange.mappers.demographic_info

from typing import List

from lxml import objectify
from q2_sdk.models.demographic import (
    DemographicInfo,
    Phone,
    PhoneType,
    Address,
    AddressType,
    DriverLicense,
)
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper

from q2_cores.JXchange.queries import DemographicInfoQuery

PHONE_TYPE_MAPPING = {
    "Home Phone": PhoneType.PERSONAL,
    "Business Phone": PhoneType.BUSINESS,
    "Home Cell Phone": PhoneType.CELL,
    "Personal Cell Phone": PhoneType.CELL,
    "Business Cell Phone": PhoneType.CELL,
    "Home Fax Number": PhoneType.BUSINESS,
    "Business Fax Number": PhoneType.BUSINESS,
}


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]): assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of JXchange.queries.DemographicInfoQuery" ) response = list_of_queries[0].raw_core_response root = objectify.fromstring(response) cust_detail_node = root.CustRec.CustDetail person_name = cust_detail_node.PersonName social_security = root.x_TaxDetail.TaxDetail.TINInfo.TaxId.text first_name = person_name.findtext("{*}FirstName", "") middle_name = person_name.findtext("{*}MiddleName", "") last_name = person_name.findtext("{*}LastName", "") date_of_birth = cust_detail_node.findtext("{*}BirthDt", "") primary_cif = root.CustRec.findtext("{*}CustId", "") phones = [] for node in cust_detail_node.PhoneArray.PhoneInfo: phone_number = node.findtext("{*}PhoneNum") if not phone_number: continue if len(phone_number) < 10: continue phones.append( Phone.build_from_str( phone_number, PHONE_TYPE_MAPPING[node.PhoneType.text] ) ) emails = [] for entry in cust_detail_node.EmailArray.EmailInfo: email = entry.findtext("{*}EmailAddr") if email: emails.append(email) address_node = cust_detail_node.Addr addresses = [ Address( address_node.findtext("{*}StreetAddr1"), "", address_node.findtext("{*}City"), address_node.findtext("{*}StateCode"), address_node.findtext("{*}PostalCode"), AddressType.HOME, ) ] drivers_license = None try: for node in root.x_IdVerify.IdVerifyArray.IdVerify: id_code = node.findtext("{*}IdVerifyCode") if id_code and id_code.upper() == "DL": dl_num = node.findtext("{*}IdVerifyVal", "") dl_state = node.findtext("{*}IdIssueBy", "") drivers_license = DriverLicense(dl_num, dl_state) break except AttributeError: pass return DemographicInfo( date_of_birth, emails, phones, addresses, first_name, last_name, social_security, middle_name=middle_name, primary_cif=primary_cif, driver_license=drivers_license, )