Source code for q2_cores.FiservCommunicatorSignature.core

import logging

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.models.core_user import CoreUser

from .mappers.demographic_info_mapper import DemographicInfoMapper
from .mappers.initial_search_mapper import InitialSearchMapper
from .queries.demographic_info_query import DemographicInfoQuery
from .queries.initial_search_query import InitialSearchQuery


[docs] class Core(BaseCore): """ Summary Flow: Uses an initial search system if a CIF isn't provided, using the user's social security to retrieve cif information prior to attempting the demographic information core request """ CONFIG_FILE_NAME = "FiservCommunicatorSignature_Core" REQUIRED_CONFIGURATIONS = { "CLIENTAPPKEY": "", "INSTITUTIONCODE": "", "CUSTLOGINID": "", "PSWD": "", "ENVIRONMENTNAME": "", } OPTIONAL_CONFIGURATIONS = { "SECRET_OVERRIDE_DEMO_ATTR": "", "THROW_ERROR_ON_MULTIPLE_CUSTPERM_ID": False, } def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: HqCredentials = None, **kwargs, ): super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs)
[docs] async def build_demographic_info(self, cif=None) -> DemographicInfoMapper: """ Builds a request to the core to retrieve customer's demographic information using CIF. :param cif: a string of the customer's permanent ID, defaults to None (if no CIF is provided, method will attempt to grab the CIF from online user. If not found, a call will be made to begin the initial search to query CIF). Once a CIF is found, a call is made to retrieve demographic information from the core. """ if cif: self.logger.debug(f"Searching core with provided cif: {cif}") return await self._build_demo_mapper(cif) cif = self.configured_user.customer_primary_cif if not cif: self.logger.debug( f"cif not found from online user. Beginning initial search: {cif}" ) initial_mapper = await self.build_initial_search() cif = await initial_mapper.execute() self.logger.debug(f"cif found: {cif}") return await self._build_demo_mapper(cif)
async def _build_demo_mapper(self, cif) -> DemographicInfoMapper: """ Builds the demographic info query and mapper once a cif has been provided. """ req_dict = { "cif": cif, } demographic_info_query = DemographicInfoQuery( logger=self.logger, config=self.config, req_dict=req_dict ) return DemographicInfoMapper( [demographic_info_query], hq_credentials=self.hq_credentials )