Source code for q2_sdk.core.http_handlers.domestic_wire_handler

import asyncio
from copy import deepcopy

from q2_sdk.core.http_handlers.adapter_handler import Q2AdapterRequestHandler
from q2_sdk.hq.models.host_account_request import HostAccountRequest
from q2_sdk.hq.models.domestic_wire_request import DomesticWireRequest


[docs] class Q2DomesticWireRequestHandler(Q2AdapterRequestHandler): """Adapter type for Domestic wires invoked by commercial users through UUX""" async def handle_adapter(self, message: dict, *args, **kwargs) -> dict: response_as_dict = deepcopy(message) try: wire_reqs = message["WireDomestic_Req"] account_reqs = message["HostAccount_Req"] except KeyError: self.logger.error("Invalid wire request shape from Online") raise if len(wire_reqs) != len(account_reqs): raise ValueError( f"WireDomestic_Req / HostAccount_Req length mismatch: " f"{len(wire_reqs)} vs {len(account_reqs)}" ) items = [ (i, DomesticWireRequest(wire_reqs[i]), HostAccountRequest(account_reqs[i])) for i in range(len(wire_reqs)) ] # this can be added to a vendor Config to control how many requests are sent through the adapter to the # processor at "the same time". because of the default implementer does not have to set it or worry about it... # unless they need to. config key = processing_chunk_size chunk_size = int(self.db_config.get("processing_chunk_size", 10)) results = [None] * len(items) for chunk in self._chunks(items, chunk_size): chunk_results = await asyncio.gather( *[ self.handle_domestic_wire(wire, account) for _, wire, account in chunk ], return_exceptions=True, ) for (i, _, _), result in zip(chunk, chunk_results): results[i] = result errors = [] for i, result in enumerate(results): if isinstance(result, BaseException): response_as_dict["HostAccount_Req"][i]["HostErrorCode"] = 4 self.logger.error("Domestic Wire transfer failed. index: %s", i) errors.append(result) elif result: response_as_dict["HostAccount_Req"][i]["HostErrorCode"] = 0 self.logger.info("Domestic Wire transfer is successful. index: %s", i) else: response_as_dict["HostAccount_Req"][i]["HostErrorCode"] = 4 self.logger.error("Domestic Wire transfer failed. index: %s", i) if errors: raise Exception( f"{len(errors)} wire(s) failed: " + "; ".join(str(e) for e in errors) ) return response_as_dict @staticmethod def _chunks(lst, size): for i in range(0, len(lst), size): yield lst[i : i + size]
[docs] async def handle_domestic_wire( self, domestic_wire: DomesticWireRequest, source_account: HostAccountRequest ) -> bool: """ Perform your domestic wire transfer here. If you return true, a transaction will be generated and visible throughout the q2 platform-- only return 'true' on a success. :param domestic_wire: Wire destination and details submitted by user :param source_account: The account from which to source the transfer :return: True if the transaction should be generated. False if not. """ raise NotImplementedError