Source code for q2_sdk.core.http_handlers.fx_rate_handler

from copy import deepcopy
import base64
from decimal import Decimal
from enum import Enum
from lxml import objectify, etree
from q2_sdk.core.http_handlers.adapter_handler import Q2AdapterRequestHandler
from q2_sdk.models.adapters.fx_rates import (
    FxRateQuoteReply,
    FxRateQuoteReplyUSDEquivalent,
    FxRateSheetsReply,
    FxRateExchangeDealReply,
    FxRateExchangeDealReplyUSDEquivalent,
)


class RequestType(Enum):
    Quote = "ForeignExchangeQuote"
    QuoteUSDEquivalent = "ForeignExchangeQuoteUSDEquivalent"
    Book = "ForeignExchangeBook"
    BookUSDEquivalent = "ForeignExchangeBookUSDEquivalent"
    Cancel = "ForeignExchangeCancel"
    CancelUSDEqivalent = "ForeignExchangeCancelUSDEquivalent"
    RateSheets = "FxRateSheets"
    RateSheetsUSDEquivalent = "FxRateSheetsUSDEquivalent"

    @classmethod
    def from_request(cls, request, is_usd_equivalent):
        if is_usd_equivalent:
            request += "USDEquivalent"
        return cls(request)


[docs] class Q2FxRateRequestHandler(Q2AdapterRequestHandler): """ Adapter type for Foreign Currency Exchange (FOREX) rate calculation. Invoked when constructing an International Wire. """ async def handle_adapter(self, message: dict, *args, **kwargs) -> dict: response_as_dict = deepcopy(message) xml_payload = base64.b64decode(message["RequestorData"][0]["XmlPayload"]) element = objectify.fromstring(xml_payload) currency_code = element.CurrencyCode.pyval amount_usd_equivalent = False try: amount = Decimal(element.Amount.pyval) except AttributeError: amount = Decimal(element.UsdEquivalentAmount.pyval) amount_usd_equivalent = True group_id = element.GroupId.pyval request_string = element.attrib["request"] type_to_match = RequestType.from_request(request_string, amount_usd_equivalent) try: match type_to_match: # noqa: E999 case RequestType.Quote: result = await self.get_fx_quote(currency_code, amount, group_id) case RequestType.QuoteUSDEquivalent: result = await self.get_fx_quote_usd_equivalent( currency_code, amount, group_id ) case RequestType.Book: result = await self.book_fx_quote(currency_code, amount, group_id) case RequestType.BookUSDEquivalent: result = await self.book_fx_quote_usd_equivalent( currency_code, amount, group_id ) case RequestType.Cancel: result = await self.cancel_fx_quote(currency_code, amount, group_id) case RequestType.CancelUSDEqivalent: result = await self.cancel_fx_quote_usd_equivalent( currency_code, amount, group_id ) case RequestType.RateSheets: result = await self.get_fx_rate_sheet( currency_code, amount, group_id ) case RequestType.RateSheetsUSDEquivalent: result = await self.get_fx_rate_sheet_usd_equivalent( currency_code, amount, group_id ) root = etree.Element("Q2Bridge") elem = etree.SubElement(root, "Status") elem.text = str(0) elem = etree.SubElement(root, "StatusDescription") elem.text = str(result.status_description) if hasattr(result, "usd_equivalent_amount"): elem = etree.SubElement(root, "UsdEquivalentAmount") elem.text = str(result.usd_equivalent_amount) if hasattr(result, "quote_id"): elem = etree.SubElement(root, "QuoteId") elem.text = str(result.quote_id) if hasattr(result, "base_rate"): elem = etree.SubElement(root, "BaseRate") elem.text = str(result.base_rate) if hasattr(result, "exchange_rate"): elem = etree.SubElement(root, "ExchangeRate") elem.text = str(result.exchange_rate) if hasattr(result, "expiration_date"): elem = etree.SubElement(root, "ExpirationDate") elem.text = str(result.expiration_date) if hasattr(result, "top_tier"): elem = etree.SubElement(root, "TopTier") elem.text = str(result.top_tier) if hasattr(result, "value_date"): elem = etree.SubElement(root, "ValueDate") elem.text = str(result.value_date) if hasattr(result, "deal_id"): elem = etree.SubElement(root, "DealId") elem.text = str(result.deal_id) if hasattr(result, "amount"): elem = etree.SubElement(root, "Amount") elem.text = str(result.amount) except Exception as err: self.logger.error(err) root = etree.Element("Q2Bridge") elem = etree.SubElement(root, "Status") elem.text = str(1) xml_payload_reply = base64.b64encode(etree.tostring(root)).decode() response_as_dict["RequestorData"][0]["XmlPayload"] = xml_payload_reply return response_as_dict
[docs] async def get_fx_quote_usd_equivalent( self, currency_code: str, amount: Decimal, group_id: int, ) -> FxRateQuoteReplyUSDEquivalent: """ Quote for FX Rate :param currency_code: The 3 letter currency code as found in the Q2_CurrencyCode table :param amount: USD equivalent amount of the currency to be converted. Example: currency_code is CAD, amount is 100 USD, then the request is to send the Canadian Dollar equivalent of 100 USD. :param group_id: The online banking user's group id :return: ``FxRateQuoteReply`` instance """ raise NotImplementedError
[docs] async def get_fx_quote( self, currency_code: str, amount: Decimal, group_id: int ) -> FxRateQuoteReply: """ Quote for FX Rate :param currency_code: The 3 letter currency code as found in the Q2_CurrencyCode table :param amount: The amount of the currency to be converted :param group_id: The online banking user's group id :return: ``FxRateQuoteReply`` instance """ raise NotImplementedError
[docs] async def book_fx_quote( self, currency_code: str, amount: Decimal, group_id: int ) -> FxRateExchangeDealReply: """ Execution of the order :param currency_code: The 3 letter currency code as found in the Q2_CurrencyCode table :param amount: The amount of the currency to be converted :param group_id: The online banking user's group id :return: FxRateExchangeDealReply instance """ raise NotImplementedError
[docs] async def book_fx_quote_usd_equivalent( self, currency_code: str, amount: Decimal, group_id: int, ) -> FxRateExchangeDealReplyUSDEquivalent: """ Execution of the order :param currency_code: The 3 letter currency code as found in the Q2_CurrencyCode table :param amount: USD equivalent amount of the currency to be converted. Example: currency_code is CAD, amount is 100 USD, then the request is to send the Canadian Dollar equivalent of 100 USD. :param group_id: The online banking user's group id :return: FxRateExchangeDealReply instance """ raise NotImplementedError
[docs] async def cancel_fx_quote( self, currency_code: str, amount: Decimal, group_id: int ) -> FxRateExchangeDealReply: """ Reversal of the order :param currency_code: The 3 letter currency code as found in the Q2_CurrencyCode table :param amount: The amount of the currency to be converted :param group_id: The online banking user's group id :return: FxRateExchangeDealReply instance """ raise NotImplementedError
[docs] async def cancel_fx_quote_usd_equivalent( self, currency_code: str, amount: Decimal, group_id: int, ) -> FxRateExchangeDealReplyUSDEquivalent: """ Reversal of the order :param currency_code: The 3 letter currency code as found in the Q2_CurrencyCode table :param amount: USD equivalent amount of the currency to be converted. Example: currency_code is CAD, amount is 100 USD, then the request is to send the Canadian Dollar equivalent of 100 USD. :param group_id: The online banking user's group id :return: FxRateExchangeDealReply instance """ raise NotImplementedError
async def get_fx_rate_sheet( self, currency_code, amount, group_id ) -> FxRateSheetsReply: raise NotImplementedError async def get_fx_rate_sheet_usd_equivalent( self, currency_code, amount, group_id ) -> FxRateSheetsReply: raise NotImplementedError