Source code for q2_cores.CoreAPI.core

from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.demographic import DemographicInfo

from . import mappers
from .queries import (
    DemographicInfoQuery,
    UpdateDemographicInfoQuery,
    AccountDetailsQuery,
)
from .queries.base_query import LookupType
from ..exceptions import CoreException


[docs] class Core(BaseCore): CONFIG_FILE_NAME = "CoreAPI_Core" OPTIONAL_CONFIGURATIONS = { "PREFER_USER_LEVEL_CIF": False, "GET_DEMOGRAPHIC_BY_CIF": True, "get_cif_from_user_fields": {}, } @property def cif(self): if getattr(self.config, "PREFER_USER_LEVEL_CIF", False): cif = self.configured_user.user_primary_cif else: cif = self.configured_user.customer_primary_cif return cif.lstrip( "P" ) # Sometimes Q2 imports the user with a leading P, which isn't what matches the core
[docs] async def build_demographic_info(self) -> mappers.DemographicInfoMapper: lookup_type = LookupType.SOCIAL_SECURITY_NUMBER lookup_key = self.configured_user.ssn if getattr(self.config, "GET_DEMOGRAPHIC_BY_CIF", True): lookup_type = LookupType.MEMBER_NUMBER lookup_key = self.configured_user.customer_primary_cif query = DemographicInfoQuery(self.logger, lookup_key, lookup_type, cif=self.cif) return mappers.DemographicInfoMapper( [query], getattr(self.config, "get_cif_from_user_fields", None), hq_credentials=self.hq_credentials, zone_context=self.core_user, )
[docs] async def build_update_demographic_info( self, demographic_info: DemographicInfo ) -> mappers.UpdateDemographicInfoMapper: query = UpdateDemographicInfoQuery(self.logger, demographic_info) return mappers.UpdateDemographicInfoMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )
[docs] async def build_get_account_details( self, account_number ) -> mappers.CoreAPIBaseMapper: query = AccountDetailsQuery(self.logger, account_number, cif=self.cif) return mappers.CoreAPIBaseMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )
[docs] @staticmethod def check_for_errors(response_obj): if response_obj.find(".//faultcode"): fault_code = response_obj.find(".//faultcode").text raise CoreException(f"Core error: {fault_code}") if not response_obj.find(".//UserAuthentication/WasSuccessful"): core_error = response_obj.find( ".//UserAuthentication/Errors/Error/ErrorMessage" ).text raise CoreException(core_error) if not response_obj.find(".//ResponseBase/WasSuccessful"): core_error = response_obj.find( ".//ResponseBase/Errors/Error/ErrorMessage" ).text raise CoreException(core_error)