Source code for q2_cores.JXchange.core

import logging
from typing import Optional

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.demographic import DemographicInfo
from q2_cores.JXchange import mappers, queries


[docs] class Core(BaseCore): CONFIG_FILE_NAME = "JXchange_Core" OPTIONAL_CONFIGURATIONS = { "CHANGE_ELEMENT_TO_MsgRqHdr": False, } def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: Optional[HqCredentials] = None, **kwargs, ): super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs)
[docs] async def build_demographic_info(self) -> mappers.DemographicInfoMapper: change_tag = getattr(self.config, "CHANGE_ELEMENT_TO_MsgRqHdr", False) query = queries.DemographicInfoQuery( self.logger, self.core_user.online_user.customer_primary_cif, change_tag ) mapper = mappers.DemographicInfoMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user ) return mapper
[docs] async def build_update_demographic_info( self, demographic_info: DemographicInfo ) -> mappers.UpdateDemographicInfoMapper: query = queries.UpdateDemographicInfoQuery( self.logger, self.core_user.online_user.customer_primary_cif, demographic_info, ) mapper = mappers.UpdateDemographicInfoMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user ) return mapper