Source code for q2_cores.OSI.mappers.demographic_info

import re
from typing import List
from lxml import objectify

from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
    Address,
    AddressType,
    DemographicInfo,
    Phone,
    PhoneType,
)
from q2_cores.OSI.queries import DemographicInfoQuery
from q2_cores.exceptions import CoreException

PHONE_TYPE_MAPPING = {
    "PER": PhoneType.PERSONAL,
    "BUS": PhoneType.BUSINESS,
    "CELL": PhoneType.CELL,
}

ADDRESS_TYPE_MAPPING = {
    "PRI": AddressType.HOME,
    "HOME": AddressType.HOME,
    "SEA": AddressType.VACATION,
    "BUS": AddressType.BUSINESS,
}


[docs] def cleanse_response(response: str) -> str: """ Example response ITC 85 2Security Violation INVALID SSN/PIN Offset:INVALID SSN/PIN Offset - IRBH - IRB - TC96ITC 1 0 """ try: response_message = response.split("\t")[ 2 ] # third column contains response data response_code = int(response_message[0]) response_message = response_message[1:] except (TypeError, KeyError, IndexError, ValueError): return response[ 10:-7 ] # if unable to parse message then fall back to old method else: if ( response_code == 2 ): # 2 means an error occured, possible values are 0, 1, 2, 3 raise CoreException(response_message) return re.search( "(<ENTITY_DATA_INQ.*</ENTITY_DATA_INQ>).+", response_message, flags=re.S ).group(1)
[docs] class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: assert len(list_of_queries) == 1, ( "OSI only knows how to deal with a single demographicinfo query" ) assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of OSI.queries.DemographicInfoQuery" ) response = cleanse_response(list_of_queries[0].raw_core_response) root = objectify.fromstring(response) dob = root.ENTITY_DATA.DOB.text email = root.ENTITY_DATA.EMAIL.text phones = [] if root.ENTITY_DATA.PHONES.getchildren() != []: for node in root.ENTITY_DATA.PHONES.PHONE: phones.append( Phone( node.AREACD.text, "{}{}".format(node.EXCHANGE.text, node.PHONENBR.text), PHONE_TYPE_MAPPING.get( node.PHONEUSECD.text, PhoneType.PERSONAL ), extension=node.PHONEEXTEN.text, ) ) addresses = [] if root.ENTITY_DATA.ADDRESSES.getchildren() != []: for node in root.ENTITY_DATA.ADDRESSES.ADDRESS: if "AL" not in node.ADDRUSECD.text: addresses.append( Address( node.ADDRLINE1.text, node.ADDRLINE2.text, node.CITY.text, node.STATE.text, node.ZIP.text, address_type=ADDRESS_TYPE_MAPPING.get( node.ADDRUSECD, AddressType.HOME ), ) ) first_name = root.ENTITY_DATA.FIRST_NAME.text last_name = root.ENTITY_DATA.LAST_NAME.text mothers_maiden_name = root.ENTITY_DATA.MMNM.text ssn = root.ENTITY_DATA.TAXID.text primary_cif = root.ENTITY_DATA.ENTITYNBR.text return DemographicInfo( dob, [email], phones, addresses, first_name, last_name, ssn, mothers_maiden_name=mothers_maiden_name, primary_cif=primary_cif, )