Source code for q2_cores.Users.core

import logging
from typing import Optional

from q2_sdk.core import cache
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.models.core_user import CoreUser

from q2_cores.Users import mappers, queries


[docs] class Core(BaseCore): def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: Optional[HqCredentials] = None, use_cache=True, **kwargs, ): super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs) self.sub_account_cache_key = None if use_cache: self.sub_account_cache_key = "USERS_SUB_ACCOUNT_{}_{}".format( self.hq_credentials.aba, self.configured_user.customer_primary_cif )
[docs] async def build_demographic_info(self) -> BaseDemographicInfoMapper: query = queries.DemographicInfoQuery( self.logger, self.configured_user.customer_primary_cif ) return mappers.DemographicInfoMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )
[docs] async def get_sub_account_list(self) -> mappers.GetSubAccountMapper: query = queries.GetSubAccountListQuery( self.logger, self.configured_user.customer_primary_cif ) return mappers.GetSubAccountMapper( [query], self.sub_account_cache_key, hq_credentials=self.hq_credentials, zone_context=self.core_user, )
[docs] async def get_sub_account_details( self, account_id: str, account_type: str, funding_amount: float ) -> mappers.GetSubAccountDetailsMapper: query = queries.GetSubAccountDetailsQuery( self.logger, self.configured_user.customer_primary_cif, account_id, account_type, funding_amount, ) if self.sub_account_cache_key: cache_obj = cache.get_cache() query.raw_core_response = cache_obj.get(self.sub_account_cache_key) if not query.raw_core_response: self.logger.debug( "Unable to find cached demo data. Requerying core for sub account list" ) sub_account_list_mapper = await self.get_sub_account_list() await sub_account_list_mapper.execute() sub_account_list_query = sub_account_list_mapper.list_of_queries[0] query.raw_core_response = sub_account_list_query.raw_core_response return mappers.GetSubAccountDetailsMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )