Source code for q2_cores.Metavante.mappers.demographic_info

from typing import List
from lxml import objectify

from q2_sdk.models.demographic import (
    Address,
    AddressType,
    DemographicInfo,
    Phone,
    PhoneType,
)
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from q2_cores.Metavante.queries import DemographicInfoQuery, EmailQuery


ATTR_MAP = {
    "first_name": "E10102",
    "last_name": "E10101",
    "dob": "E10036",
    "mothers_maiden_name": "E10169",
    "ssn": "E10132",
    "address_1": "E10042",
    "address_2": "E10043",
    "city": "E10094",
    "state": "E10114",
    "zipcode": "E10122",
    "phone_number": "E10109",
    "email": "E10528",
    "customer_id": "E10033",
}


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper): async def _run_queries(self): """DemographicQuery has the customer_id that EmailQuery needs""" demo_query = self.list_of_queries[0] await demo_query.execute(self.hq_credentials) customer_id = self.get_customer_id_from_demo_query(demo_query) email_query = EmailQuery(demo_query.logger, customer_id) self.list_of_queries.append(email_query) await email_query.execute(self.hq_credentials)
[docs] @staticmethod def get_customer_id_from_demo_query(demo_query: DemographicInfoQuery) -> str: assert isinstance(demo_query, DemographicInfoQuery), ( "Query must be an instance of Metavante.queries.DemographicInfoQuery" ) demographic_response = demo_query.raw_core_response.encode() root = objectify.fromstring(demographic_response) demo_info = root.Svc.MsgData.CICustTaxNbrSrchResData.CICustInfoLst.CICustInfo customer_id = demo_info.findtext(ATTR_MAP["customer_id"]) return customer_id
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: assert len(list_of_queries) == 2 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of Metavante.queries.DemographicInfoQuery" ) assert isinstance(list_of_queries[1], EmailQuery), ( "Query must be an instance of Metavante.queries.EmailQuery" ) demographic_response = list_of_queries[0].raw_core_response email_response = list_of_queries[1].raw_core_response root = objectify.fromstring(demographic_response.encode()) email_root = objectify.fromstring(email_response.encode()) demo_info = root.Svc.MsgData.CICustTaxNbrSrchResData.CICustInfoLst.CICustInfo email_data = email_root.Svc.MsgData.CICustEmailInqResData.CICustEmailInqRptDataLst.CICustEmailInqRptData email = email_data.findtext(ATTR_MAP["email"]) dob = demo_info.findtext(ATTR_MAP["dob"]) phone_raw = demo_info.findtext(ATTR_MAP["phone_number"]) address_1 = demo_info.findtext(ATTR_MAP["address_1"]) address_2 = demo_info.findtext(ATTR_MAP["address_2"]) first_name = demo_info.findtext(ATTR_MAP["first_name"]) last_name = demo_info.findtext(ATTR_MAP["last_name"]) ssn = demo_info.findtext(ATTR_MAP["ssn"]) mothers_maiden_name = demo_info.findtext(ATTR_MAP["mothers_maiden_name"]) city = demo_info.findtext(ATTR_MAP["city"]) state = demo_info.findtext(ATTR_MAP["state"]) zipcode = demo_info.findtext(ATTR_MAP["zipcode"]) address = Address( address_1, address_2, city, state, zipcode, address_type=AddressType.RESIDENTIAL, ) phone_number = Phone.build_from_str(phone_raw, PhoneType.PERSONAL) return DemographicInfo( dob, [email], [phone_number], [address], first_name, last_name, ssn, mothers_maiden_name=mothers_maiden_name, )