import re
from typing import List
from lxml import objectify
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
Address,
AddressType,
DemographicInfo,
Phone,
PhoneType,
)
from q2_cores.OSI.queries import DemographicInfoQuery
from q2_cores.exceptions import CoreException
PHONE_TYPE_MAPPING = {
"PER": PhoneType.PERSONAL,
"BUS": PhoneType.BUSINESS,
"CELL": PhoneType.CELL,
}
ADDRESS_TYPE_MAPPING = {
"PRI": AddressType.HOME,
"HOME": AddressType.HOME,
"SEA": AddressType.VACATION,
"BUS": AddressType.BUSINESS,
}
[docs]
def cleanse_response(response: str) -> str:
"""
Example response
ITC 85 2Security Violation INVALID SSN/PIN Offset:INVALID SSN/PIN Offset - IRBH - IRB - TC96ITC 1 0
"""
try:
response_message = response.split("\t")[
2
] # third column contains response data
response_code = int(response_message[0])
response_message = response_message[1:]
except (TypeError, KeyError, IndexError, ValueError):
return response[
10:-7
] # if unable to parse message then fall back to old method
else:
if (
response_code == 2
): # 2 means an error occured, possible values are 0, 1, 2, 3
raise CoreException(response_message)
return re.search(
"(<ENTITY_DATA_INQ.*</ENTITY_DATA_INQ>).+", response_message, flags=re.S
).group(1)
[docs]
class DemographicInfoMapper(BaseDemographicInfoMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo:
assert len(list_of_queries) == 1, (
"OSI only knows how to deal with a single demographicinfo query"
)
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of OSI.queries.DemographicInfoQuery"
)
response = cleanse_response(list_of_queries[0].raw_core_response)
root = objectify.fromstring(response)
dob = root.ENTITY_DATA.DOB.text
email = root.ENTITY_DATA.EMAIL.text
phones = []
if root.ENTITY_DATA.PHONES.getchildren() != []:
for node in root.ENTITY_DATA.PHONES.PHONE:
phones.append(
Phone(
node.AREACD.text,
"{}{}".format(node.EXCHANGE.text, node.PHONENBR.text),
PHONE_TYPE_MAPPING.get(
node.PHONEUSECD.text, PhoneType.PERSONAL
),
extension=node.PHONEEXTEN.text,
)
)
addresses = []
if root.ENTITY_DATA.ADDRESSES.getchildren() != []:
for node in root.ENTITY_DATA.ADDRESSES.ADDRESS:
if "AL" not in node.ADDRUSECD.text:
addresses.append(
Address(
node.ADDRLINE1.text,
node.ADDRLINE2.text,
node.CITY.text,
node.STATE.text,
node.ZIP.text,
address_type=ADDRESS_TYPE_MAPPING.get(
node.ADDRUSECD, AddressType.HOME
),
)
)
first_name = root.ENTITY_DATA.FIRST_NAME.text
last_name = root.ENTITY_DATA.LAST_NAME.text
mothers_maiden_name = root.ENTITY_DATA.MMNM.text
ssn = root.ENTITY_DATA.TAXID.text
primary_cif = root.ENTITY_DATA.ENTITYNBR.text
return DemographicInfo(
dob,
[email],
phones,
addresses,
first_name,
last_name,
ssn,
mothers_maiden_name=mothers_maiden_name,
primary_cif=primary_cif,
)