Source code for q2_sdk.core.http_handlers.domestic_wire_handler

from copy import deepcopy
from q2_sdk.core.http_handlers.adapter_handler import Q2AdapterRequestHandler
from q2_sdk.hq.models.host_account_request import HostAccountRequest
from q2_sdk.hq.models.domestic_wire_request import DomesticWireRequest


[docs] class Q2DomesticWireRequestHandler(Q2AdapterRequestHandler): """Adapter type for Domestic wires invoked by commercial users through UUX""" async def handle_adapter(self, message: dict, *args, **kwargs) -> dict: response_as_dict = deepcopy(message) try: wire = DomesticWireRequest(message["WireDomestic_Req"][0]) except (KeyError, IndexError) as err: self.logger.error("Invalid wire request shape from Online") raise err try: source_account = HostAccountRequest(message["HostAccount_Req"][0]) except (KeyError, IndexError) as err: self.logger.error("Invalid source account shape from Online") raise err validated = await self.handle_domestic_wire(wire, source_account) response_as_dict["HostAccount_Req"][0]["HostErrorCode"] = 0 if validated else 4 return response_as_dict
[docs] async def handle_domestic_wire( self, domestic_wire: DomesticWireRequest, source_account: HostAccountRequest ) -> bool: """ Perform your domestic wire transfer here. If you return true, a transaction will be generated and visible throughout the q2 platform-- only return 'true' on a success. :param domestic_wire: Wire destination and details submitted by user :param source_account: The account from which to source the transfer :return: True if the transaction should be generated. False if not. """ raise NotImplementedError