import logging
import uuid
from typing import Optional
from lxml.builder import E
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_cores.SymXchange.mappers import SymXchangeBaseMapper
from q2_cores.SymXchange import queries
from q2_cores.SymXchange import mappers
from q2_cores.SymXchange.models.credentials import CardCredentials
from q2_cores.SymXchange.models.device_information import DeviceInformation
[docs]
class Core(BaseCore):
CONFIG_FILE_NAME = "Symitar_Core_SymXchange"
REQUIRED_CONFIGURATIONS = {
"CARD_PREFIX": "1",
"DEVICE_TYPE": "Q2",
"UNIT_NUMBER": "1",
}
def __init__(
self,
logger: logging.Logger,
core_user: CoreUser,
hq_credentials: Optional[HqCredentials] = None,
**kwargs,
):
super().__init__(logger, core_user, hq_credentials, **kwargs)
[docs]
async def build_poweron_query(
self,
account_number: str,
rg_name: str,
rg_state: str,
rg_usrchr_params: list,
rg_usrnum_params: list,
mock_response_string: str = None,
mock_failure: bool = False,
) -> SymXchangeBaseMapper:
card_credentials = CardCredentials(self.config.CARD_PREFIX, account_number)
device_info = DeviceInformation(
self.config.DEVICE_TYPE, self.config.UNIT_NUMBER
)
query = queries.PoweronQuery(
self.logger,
rg_name,
rg_state,
rg_usrchr_params,
rg_usrnum_params,
card_credentials,
device_info,
mock_response_string=mock_response_string,
mock_failure=mock_failure,
)
return SymXchangeBaseMapper(
[query], hq_credentials=self.hq_credentials, zone_context=self.core_user
)
[docs]
async def build_account_query(self):
acct = queries.AccountQuery.create_request()
get_account_select_fields = acct.getAccountSelectFields(
E.Request(
E.SelectableFields(
E.NameSelectableFields(E.IncludeAllNameFields("true"))
),
MessageId=str(uuid.uuid4()),
)
)
account_number = self.configured_user.customer_primary_cif
query = queries.AccountQuery(
self.logger,
account_number,
CardCredentials(self.config.CARD_PREFIX, account_number),
DeviceInformation(self.config.DEVICE_TYPE, self.config.UNIT_NUMBER),
get_account_select_fields,
)
return SymXchangeBaseMapper(
[query], hq_credentials=self.hq_credentials, zone_context=self.core_user
)
[docs]
async def get_account_demographic(self) -> mappers.AccountDemographicMapper:
query = queries.AccountDemographicQuery(
self.logger,
self.configured_user.customer_primary_cif,
self.config.UNIT_NUMBER,
self.config.DEVICE_TYPE,
self.config.CARD_PREFIX,
)
return mappers.AccountDemographicMapper(
[query], hq_credentials=self.hq_credentials, zone_context=self.core_user
)