from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.demographic import DemographicInfo
from . import mappers
from .queries import (
DemographicInfoQuery,
UpdateDemographicInfoQuery,
AccountDetailsQuery,
)
from .queries.base_query import LookupType
from ..exceptions import CoreException
[docs]
class Core(BaseCore):
CONFIG_FILE_NAME = "CoreAPI_Core"
OPTIONAL_CONFIGURATIONS = {
"PREFER_USER_LEVEL_CIF": False,
"GET_DEMOGRAPHIC_BY_CIF": True,
"get_cif_from_user_fields": {},
}
@property
def cif(self):
if getattr(self.config, "PREFER_USER_LEVEL_CIF", False):
cif = self.configured_user.user_primary_cif
else:
cif = self.configured_user.customer_primary_cif
return cif.lstrip(
"P"
) # Sometimes Q2 imports the user with a leading P, which isn't what matches the core
[docs]
async def build_demographic_info(self) -> mappers.DemographicInfoMapper:
lookup_type = LookupType.SOCIAL_SECURITY_NUMBER
lookup_key = self.configured_user.ssn
if getattr(self.config, "GET_DEMOGRAPHIC_BY_CIF", True):
lookup_type = LookupType.MEMBER_NUMBER
lookup_key = self.configured_user.customer_primary_cif
query = DemographicInfoQuery(self.logger, lookup_key, lookup_type, cif=self.cif)
return mappers.DemographicInfoMapper(
[query],
getattr(self.config, "get_cif_from_user_fields", None),
hq_credentials=self.hq_credentials,
zone_context=self.core_user,
)
[docs]
async def build_update_demographic_info(
self, demographic_info: DemographicInfo
) -> mappers.UpdateDemographicInfoMapper:
query = UpdateDemographicInfoQuery(self.logger, demographic_info)
return mappers.UpdateDemographicInfoMapper(
[query], hq_credentials=self.hq_credentials, zone_context=self.core_user
)
[docs]
async def build_get_account_details(
self, account_number
) -> mappers.CoreAPIBaseMapper:
query = AccountDetailsQuery(self.logger, account_number, cif=self.cif)
return mappers.CoreAPIBaseMapper(
[query], hq_credentials=self.hq_credentials, zone_context=self.core_user
)
[docs]
@staticmethod
def check_for_errors(response_obj):
if response_obj.find(".//faultcode"):
fault_code = response_obj.find(".//faultcode").text
raise CoreException(f"Core error: {fault_code}")
if not response_obj.find(".//UserAuthentication/WasSuccessful"):
core_error = response_obj.find(
".//UserAuthentication/Errors/Error/ErrorMessage"
).text
raise CoreException(core_error)
if not response_obj.find(".//ResponseBase/WasSuccessful"):
core_error = response_obj.find(
".//ResponseBase/Errors/Error/ErrorMessage"
).text
raise CoreException(core_error)