Source code for q2_cores.UltraData.mappers.demographic_info

from typing import List, Optional
from lxml import objectify

from q2_sdk.core import cache
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_sdk.models.demographic import (
    Address,
    DemographicInfo,
    DriverLicense,
    Phone,
    PhoneType,
)
from q2_cores.UltraData.queries import DemographicInfoQuery


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper): def __init__( self, list_of_queries: List[BaseQuery], cache_key: str, hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, ): super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context ) self.cache_key = cache_key
[docs] async def execute(self) -> DemographicInfo: parsed_queries = await super().execute() if self.cache_key: demo_query = self.list_of_queries[0] try: cache.get_cache().set(self.cache_key, demo_query.raw_core_response) except Exception: demo_query.logger.warning( "Unable to connect to cache. Will need to requery core" ) return parsed_queries
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> DemographicInfo: assert len(list_of_queries) == 1, ( "UltraData only knows how to deal with a single demographicinfo query" ) assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of UltraData.queries.DemographicInfoQuery" ) response = list_of_queries[0].raw_core_response root = objectify.fromstring(response) dob = root.find("{*}BirthDt").text email = root.find("{*}EMailAddr").text work_phone = Phone.build_from_str( root.find("{*}WorkPhone").text, PhoneType.BUSINESS ) work_phone.extension = root.find("{*}PhoneExtension").text phones = [ Phone.build_from_str(root.find("{*}HomePhone").text, PhoneType.PERSONAL), work_phone, Phone.build_from_str(root.find("{*}CellPhone").text, PhoneType.CELL), ] phones = [x for x in phones if x is not None] addresses = [ Address( root.find("{*}ResAddr").Addr1.text, root.find("{*}ResAddr").Addr2.text, root.find("{*}ResAddr").City.text, root.find("{*}ResAddr").State.text, root.find("{*}ResAddr").Zip.text, ) ] first_name = root.find("{*}FirstName").text middle_name = root.find("{*}MiddleName").text last_name = root.find("{*}LastName").text title = root.find("{*}CustTitle").text ssn = root.find("{*}TaxId").text drivers_license = DriverLicense( root.find("{*}DriversLicense").text, root.find("{*}LicenseState").text ) return DemographicInfo( dob, [email], phones, addresses, first_name, last_name, ssn, middle_name=middle_name, title=title, driver_license=drivers_license, )