Source code for q2_cores.Phoenix.core

import logging
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.models.core_user import CoreUser
from .mappers.demographic_info_mapper import DemographicInfoMapper
from .mappers.initial_search_demographic_mapper import InitialSearchMapper
from .queries.demographic_info_query import DemographicInfoQuery


[docs] class Core(BaseCore): CONFIG_FILE_NAME = "Phoenix_Core" REQUIRED_CONFIGURATIONS = { # These can be found in the bridge config "SERVICE_ID": 0, "EMP_ID": 0, } OPTIONAL_CONFIGURATIONS = { "BUILD_DEMOGRAPHIC_WITH_ACC_NUM": False, } def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: str = None, **kwargs, ): super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs)
[docs] async def build_demographic_info(self) -> DemographicInfoMapper: """ Build a core request to get demographic information for a customer, using either SSN or the account number as the \ identifier. :return: a :py:class:`DemographicInfoMapper` object """ configuration_dict = vars(self.config) build_demographic_with_account = configuration_dict.get( "BUILD_DEMOGRAPHIC_WITH_ACC_NUM" ) if build_demographic_with_account: for account in self.core_user.account_list: acn = account.acct_number_internal_unmasked acct_type = account.hydra_product_type_code demographic_query = DemographicInfoQuery( logger=self.logger, service_id=self.config.SERVICE_ID, emp_id=self.config.EMP_ID, acct_no=acn, acct_type=acct_type, ) demographic_info_mapper = DemographicInfoMapper( [demographic_query], hq_credentials=self.hq_credentials ) else: # build with social security number demographic_query = DemographicInfoQuery( logger=self.logger, service_id=self.config.SERVICE_ID, emp_id=self.config.EMP_ID, ssn=self.configured_user.ssn, ) demographic_info_mapper = DemographicInfoMapper( [demographic_query], hq_credentials=self.hq_credentials ) return demographic_info_mapper