Source code for q2_cores.SymXchange.core

import logging
import uuid
from typing import Optional
from lxml.builder import E

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_cores.SymXchange.mappers import SymXchangeBaseMapper
from q2_cores.SymXchange import queries
from q2_cores.SymXchange import mappers
from q2_cores.SymXchange.models.credentials import CardCredentials
from q2_cores.SymXchange.models.device_information import DeviceInformation


[docs] class Core(BaseCore): CONFIG_FILE_NAME = "Symitar_Core_SymXchange" REQUIRED_CONFIGURATIONS = { "CARD_PREFIX": "1", "DEVICE_TYPE": "Q2", "UNIT_NUMBER": "1", } def __init__( self, logger: logging.Logger, core_user: CoreUser, hq_credentials: Optional[HqCredentials] = None, **kwargs, ): super().__init__(logger, core_user, hq_credentials, **kwargs)
[docs] async def build_poweron_query( self, account_number: str, rg_name: str, rg_state: str, rg_usrchr_params: list, rg_usrnum_params: list, mock_response_string: str = None, mock_failure: bool = False, ) -> SymXchangeBaseMapper: card_credentials = CardCredentials(self.config.CARD_PREFIX, account_number) device_info = DeviceInformation( self.config.DEVICE_TYPE, self.config.UNIT_NUMBER ) query = queries.PoweronQuery( self.logger, rg_name, rg_state, rg_usrchr_params, rg_usrnum_params, card_credentials, device_info, mock_response_string=mock_response_string, mock_failure=mock_failure, ) return SymXchangeBaseMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )
[docs] async def build_account_query(self): acct = queries.AccountQuery.create_request() get_account_select_fields = acct.getAccountSelectFields( E.Request( E.SelectableFields( E.NameSelectableFields(E.IncludeAllNameFields("true")) ), MessageId=str(uuid.uuid4()), ) ) account_number = self.configured_user.customer_primary_cif query = queries.AccountQuery( self.logger, account_number, CardCredentials(self.config.CARD_PREFIX, account_number), DeviceInformation(self.config.DEVICE_TYPE, self.config.UNIT_NUMBER), get_account_select_fields, ) return SymXchangeBaseMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )
[docs] async def get_account_demographic(self) -> mappers.AccountDemographicMapper: query = queries.AccountDemographicQuery( self.logger, self.configured_user.customer_primary_cif, self.config.UNIT_NUMBER, self.config.DEVICE_TYPE, self.config.CARD_PREFIX, ) return mappers.AccountDemographicMapper( [query], hq_credentials=self.hq_credentials, zone_context=self.core_user )