Source code for q2_cores.Symitar.queries.open_account_query

import logging

from q2_cores.Symitar.queries import mock_responses
from q2_cores.Symitar.queries.base_query import SymitarBaseQuery


[docs] class GetSubAccountListQuery(SymitarBaseQuery): """Returns all available shares that can be opened""" def __init__( self, logger: logging.Logger, customer_id, unit_number, device_type, card_prefix, guid=None, use_symxchange=False, ): super().__init__( logger, customer_id, unit_number, device_type, card_prefix, "Q2.SUBACCOUNT.GEN", "GETDATA", guid=guid, use_symxchange=use_symxchange, )
[docs] def mock_response(self): return mock_responses.mock_get_sub_accounts_response()
[docs] class GetSubAccountDetailsQuery(SymitarBaseQuery): """Returns rates and other details from core based on the selected funding amount""" def __init__( self, logger: logging.Logger, customer_id, unit_number, device_type, card_prefix, rg_params, guid=None, use_symxchange=False, ): assert isinstance(rg_params["funding_amount"], float), ( "Expecting funding amount to be a float" ) funding_amount = "{:.2f}".format(rg_params["funding_amount"]) clean_rg_params = { "JRGUSERNUM2": str(rg_params["share_type"]), "JRGUSERNUM3": funding_amount.replace(".", "").strip(), } super().__init__( logger, customer_id, unit_number, device_type, card_prefix, "Q2.SUBACCOUNT.GEN", "GETDETAILS", rg_params=clean_rg_params, guid=guid, use_symxchange=use_symxchange, )
[docs] def mock_response(self): return mock_responses.mock_get_sub_accounts_details_response()
[docs] class OpenAccountQuery(SymitarBaseQuery): """Opens new share on core""" def __init__( self, logger: logging.Logger, customer_id, unit_number, device_type, card_prefix, data_file_name, rg_params, guid=None, ): assert isinstance(rg_params["AMOUNT"], float), ( "Expecting funding amount to be a float" ) funding_amount = "{:.2f}".format(rg_params["AMOUNT"]) clean_rg_params = { "AMOUNT": funding_amount, "SHARETYPE": str(rg_params["SHARETYPE"]), "STARTID": str(rg_params["STARTID"]), } if rg_params.get("WITHHOLDINGS"): clean_rg_params["WITHHOLDINGS"] = str(rg_params["WITHHOLDINGS"]) if rg_params.get("JOINTSIGNERSLOCS"): joint_signer_loc = str(rg_params["JOINTSIGNERSLOCS"]).zfill(10) assert len(joint_signer_loc) == 10, "Invalid signer locator value" clean_rg_params["JOINTSIGNERSLOCS"] = joint_signer_loc super().__init__( logger, customer_id, unit_number, device_type, card_prefix, "Q2.SUBACCOUNT.GEN", "CREATESHARE", data_file_name=data_file_name, data_attrs=clean_rg_params, guid=guid, )
[docs] def mock_response(self): return mock_responses.mock_open_sub_accounts_response()