import asyncio
from copy import deepcopy
from q2_sdk.core.http_handlers.adapter_handler import Q2AdapterRequestHandler
from q2_sdk.hq.models.host_account_request import HostAccountRequest
from q2_sdk.hq.models.international_wire_request import InternationalWireRequest
[docs]
class Q2InternationalWireRequestHandler(Q2AdapterRequestHandler):
"""Adapter type for International wires invoked by commercial users through UUX"""
async def handle_adapter(self, message: dict, *args, **kwargs) -> dict:
response_as_dict = deepcopy(message)
try:
wire_reqs = message["WireInternational_Req"]
account_reqs = message["HostAccount_Req"]
except KeyError:
self.logger.error("Invalid wire request shape from Online")
raise
if len(wire_reqs) != len(account_reqs):
raise ValueError(
f"WireInternational_Req / HostAccount_Req length mismatch: "
f"{len(wire_reqs)} vs {len(account_reqs)}"
)
items = [
(
i,
InternationalWireRequest(wire_reqs[i]),
HostAccountRequest(account_reqs[i]),
)
for i in range(len(wire_reqs))
]
# this can be added to a vendor Config to control how many requests are sent through the adapter to the
# processor at "the same time". because of the default implementer does not have to set it or worry about it...
# unless they need to. config key = processing_chunk_size
chunk_size = int(self.db_config.get("processing_chunk_size", 10))
results = [None] * len(items)
for chunk in self._chunks(items, chunk_size):
chunk_results = await asyncio.gather(
*[
self.handle_international_wire(wire, account)
for _, wire, account in chunk
],
return_exceptions=True,
)
for (i, _, _), result in zip(chunk, chunk_results):
results[i] = result
errors = []
for i, result in enumerate(results):
if isinstance(result, BaseException):
response_as_dict["HostAccount_Req"][i]["HostErrorCode"] = 4
self.logger.error("International Wire transfer failed. index: %s", i)
errors.append(result)
elif result:
response_as_dict["HostAccount_Req"][i]["HostErrorCode"] = 0
self.logger.info(
"International Wire transfer is successful. index: %s", i
)
else:
response_as_dict["HostAccount_Req"][i]["HostErrorCode"] = 4
self.logger.info("International Wire transfer failed. index: %s", i)
if errors:
raise Exception(
f"{len(errors)} wire(s) failed: " + "; ".join(str(e) for e in errors)
)
return response_as_dict
@staticmethod
def _chunks(lst, size):
for i in range(0, len(lst), size):
yield lst[i : i + size]
[docs]
async def handle_international_wire(
self,
international_wire: InternationalWireRequest,
source_account: HostAccountRequest,
) -> bool:
"""
Perform your international wire transfer here. If you return true, a transaction will be generated and visible
throughout the q2 platform-- only return 'true' on a success.
:param international_wire: Wire destination and details submitted by user
:param source_account: The account from which to source the transfer
:return: True if the transaction should be generated. False if not.
"""
raise NotImplementedError