Source code for q2_cores.UltraData.core

import logging
from typing import Optional

from q2_sdk.core import cache
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.mappers.update_demographic_info import (
    BaseUpdateDemographicInfoMapper,
)
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.demographic import DemographicInfo

from q2_cores.UltraData import mappers, queries


[docs] class Core(BaseCore): def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: Optional[HqCredentials] = None, use_cache=True, **kwargs, ): super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs) self.cust_number = str(core_user.online_user.customer_primary_cif) self.demo_cache_key = None if use_cache: self.demo_cache_key = "UD_CORE_DEMOGRAPHIC_{}_{}".format( self.hq_credentials.aba, self.configured_user.customer_primary_cif )
[docs] async def build_demographic_info(self) -> BaseDemographicInfoMapper: query = queries.DemographicInfoQuery(self.logger, self.cust_number) mapper = mappers.DemographicInfoMapper( [query], self.demo_cache_key, hq_credentials=self.hq_credentials, zone_context=self.core_user, ) return mapper
[docs] async def build_update_demographic_info( self, demographic_info: DemographicInfo ) -> BaseUpdateDemographicInfoMapper: orig_demo_core_resp = None if self.demo_cache_key: orig_demo_core_resp = cache.get_cache().get(self.demo_cache_key) if not orig_demo_core_resp: self.logger.debug("Unable to find cached demo data. Requerying core") demo_mapper = await self.build_demographic_info() await demo_mapper.execute() orig_demo_core_resp = demo_mapper.list_of_queries[0].raw_core_response query = queries.UpdateDemographicQuery( self.logger, self.cust_number, demographic_info, orig_demo_core_resp ) return mappers.UpdateDemographicInfoMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )