from typing import List
from lxml import objectify
from q2_sdk.models.demographic import (
Address,
AddressType,
DemographicInfo,
Phone,
PhoneType,
)
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores.Metavante.queries import DemographicInfoQuery, EmailQuery
ATTR_MAP = {
"first_name": "E10102",
"last_name": "E10101",
"dob": "E10036",
"mothers_maiden_name": "E10169",
"ssn": "E10132",
"address_1": "E10042",
"address_2": "E10043",
"city": "E10094",
"state": "E10114",
"zipcode": "E10122",
"phone_number": "E10109",
"email": "E10528",
"customer_id": "E10033",
}
[docs]
class DemographicInfoMapper(BaseDemographicInfoMapper):
async def _run_queries(self):
"""DemographicQuery has the customer_id that EmailQuery needs"""
demo_query = self.list_of_queries[0]
await demo_query.execute(self.hq_credentials)
customer_id = self.get_customer_id_from_demo_query(demo_query)
email_query = EmailQuery(demo_query.logger, customer_id)
self.list_of_queries.append(email_query)
await email_query.execute(self.hq_credentials)
[docs]
@staticmethod
def get_customer_id_from_demo_query(demo_query: DemographicInfoQuery) -> str:
assert isinstance(demo_query, DemographicInfoQuery), (
"Query must be an instance of Metavante.queries.DemographicInfoQuery"
)
demographic_response = demo_query.raw_core_response.encode()
root = objectify.fromstring(demographic_response)
demo_info = root.Svc.MsgData.CICustTaxNbrSrchResData.CICustInfoLst.CICustInfo
customer_id = demo_info.findtext(ATTR_MAP["customer_id"])
return customer_id
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo:
assert len(list_of_queries) == 2
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of Metavante.queries.DemographicInfoQuery"
)
assert isinstance(list_of_queries[1], EmailQuery), (
"Query must be an instance of Metavante.queries.EmailQuery"
)
demographic_response = list_of_queries[0].raw_core_response
email_response = list_of_queries[1].raw_core_response
root = objectify.fromstring(demographic_response.encode())
email_root = objectify.fromstring(email_response.encode())
demo_info = root.Svc.MsgData.CICustTaxNbrSrchResData.CICustInfoLst.CICustInfo
email_data = email_root.Svc.MsgData.CICustEmailInqResData.CICustEmailInqRptDataLst.CICustEmailInqRptData
email = email_data.findtext(ATTR_MAP["email"])
dob = demo_info.findtext(ATTR_MAP["dob"])
phone_raw = demo_info.findtext(ATTR_MAP["phone_number"])
address_1 = demo_info.findtext(ATTR_MAP["address_1"])
address_2 = demo_info.findtext(ATTR_MAP["address_2"])
first_name = demo_info.findtext(ATTR_MAP["first_name"])
last_name = demo_info.findtext(ATTR_MAP["last_name"])
ssn = demo_info.findtext(ATTR_MAP["ssn"])
mothers_maiden_name = demo_info.findtext(ATTR_MAP["mothers_maiden_name"])
city = demo_info.findtext(ATTR_MAP["city"])
state = demo_info.findtext(ATTR_MAP["state"])
zipcode = demo_info.findtext(ATTR_MAP["zipcode"])
address = Address(
address_1,
address_2,
city,
state,
zipcode,
address_type=AddressType.RESIDENTIAL,
)
phone_number = Phone.build_from_str(phone_raw, PhoneType.PERSONAL)
return DemographicInfo(
dob,
[email],
[phone_number],
[address],
first_name,
last_name,
ssn,
mothers_maiden_name=mothers_maiden_name,
)