Source code for q2_cores.MiserCohesion.core

import logging
from typing import Optional

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.mappers.card_list import BaseCardListMapper
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.models.core_user import CoreUser

from q2_cores.MiserCohesion import mappers, queries
from q2_cores.exceptions import ConfigException


[docs] class Core(BaseCore): CONFIG_FILE_NAME = "Miser_Core" REQUIRED_CONFIGURATIONS = { "DEMO_KEY": "CIF" # Can be either CIF or SSN } def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: Optional[HqCredentials] = None, **kwargs, ): super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs) def _build_demo_info_query(self): demo_key = self.config.DEMO_KEY.upper() if demo_key == "CIF": query = queries.DemographicInfoQuery( self.logger, cif_nbr=self.configured_user.customer_id ) elif demo_key == "SSN": query = queries.DemographicInfoQuery( self.logger, tax_id=self.configured_user.ssn ) else: raise ConfigException("Must set config DEMO_KEY to either CIF or SSN") return query
[docs] async def build_demographic_info(self) -> BaseDemographicInfoMapper: query = self._build_demo_info_query() return mappers.DemographicInfoMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )
[docs] async def build_get_cards(self) -> BaseCardListMapper: query = self._build_demo_info_query() return mappers.CardListMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )