from argparse import _SubParsersAction
from dataclasses import dataclass, fields
from functools import partial
from typing import List
from lxml import objectify
from lxml.objectify import BoolElement, IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.hq_api.q2_api import GetUserLoginInfoByLoginName
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
@dataclass
class RecipientTemplate:
RecipientID: str
DisplayName: str
CustomerID: str
DefaultAccountID: str
EmailAddress: str
AlwaysSendEmail: str
IsInternational: str
ACHClassCodeID: str
AchName: str
WireName: str
IdentificationNumber: str
[docs]
@dataclass
class AccountTemplate:
AccountID: str
RecipientID: str
AccountNumber: str
AccountTypeID: str
ABA: str
[docs]
@dataclass
class RecipientFITemplate:
RecipientFIDetailID: str
AccountID: str
Name: str
BIC: str
BranchBIC: str
Address1: str
Address2: str
Address3: str
City: str
State: str
PostalCode: str
CountryID: str
IntermediaryABA: str
IsIntermedFI: str
IBAN: str
ReceivingFiShortName: str
ReceivingFiAba: str
[docs]
@dataclass
class AddressTemplate:
RecipientAddressID: str
RecipientID: str
Address1: str
Address2: str
City: str
State: str
PostalCode: str
CountryID: str
Address3: str
[docs]
class RecipientRow(RepresentationRowBase):
RecipientID: IntElement = "RecipientID"
DisplayName: StringElement = "DisplayName"
WireName: StringElement = "WireName"
CustomerID: IntElement = "CustomerID"
EmailAddress: StringElement = "EmailAddress"
AlwaysSendEmail: BoolElement = "AlwaysSendEmail"
IsInternational: BoolElement = "IsInternational"
ACHClassCodeID: IntElement = "ACHClassCodeID"
AchName: StringElement = "AchName"
IdentificationNumber: StringElement = "IdentificationNumber"
IsIndividual: BoolElement = "IsIndividual"
AccountID: IntElement = "AccountID"
AccountNumber: StringElement = "AccountNumber"
ABA: StringElement = "ABA"
AccountType: StringElement = "AccountType"
AccountDescription: StringElement = "AccountDescription"
RecipientFIName: StringElement = "RecipientFIName"
BIC: StringElement = "BIC"
BranchBIC: StringElement = "BranchBIC"
RecipientFIAddress1: StringElement = "RecipientFIAddress1"
RecipientFIAddress2: StringElement = "RecipientFIAddress2"
RecipientFIAddress3: StringElement = "RecipientFIAddress3"
RecipientFICity: StringElement = "RecipientFICity"
RecipientFIState: StringElement = "RecipientFIState"
RecipientFIPostalCode: StringElement = "RecipientFIPostalCode"
RecipientFICountry: StringElement = "RecipientFICountry"
IntermediaryABA: StringElement = "IntermediaryABA"
IBAN: StringElement = "IBAN"
ReceivingFiShortName: StringElement = "ReceivingFiShortName"
ReceivingFiAba: StringElement = "ReceivingFiAba"
Address1: StringElement = "Address1"
Address2: StringElement = "Address2"
Address3: StringElement = "Address3"
City: StringElement = "City"
State: StringElement = "State"
PostalCode: StringElement = "PostalCode"
Country: StringElement = "Country"
[docs]
def from_row_to_dict(row, dataclass_type):
"""
Converts a given XML row returned from get_recipients to a dictionary
"""
return {field.name: row.get(field.name) for field in fields(dataclass_type)}
[docs]
class Recipient(DbObject):
GET_BY_NAME_KEY = "DisplayName"
NAME = "Recipient"
REPRESENTATION_ROW_CLASS = RecipientRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_by_recipient_id")
subparser.set_defaults(parser="get_by_recipient_id")
subparser.set_defaults(
func=partial(self.get_by_recipient, serialize_for_cli=True)
)
subparser.add_argument("recipient_id", help="Q2_Recipient.RecipientID")
subparser = parser.add_parser("get_recipients_by_customer")
subparser.set_defaults(parser="get_recipients_by_customer")
subparser.set_defaults(
func=partial(self.get_by_customer, serialize_for_cli=True)
)
subparser.add_argument("customer_id", help="Q2_Recipient.CustomerID")
[docs]
async def get_by_customer(
self, customer_id: int, serialize_for_cli=False
) -> List[RecipientRow]:
customer_id = int(customer_id)
assert isinstance(customer_id, int), "Please supply a valid customer id"
params = []
Param(customer_id, D_TYPES.Int, "customer_id").add_to_param_list(params)
response = await self.call_hq(
"sdk_GetRecipients", ExecuteStoredProcedure.SqlParameters(params)
)
if serialize_for_cli:
response = self.serialize_for_cli(response)
return response
[docs]
async def get_by_recipient(
self, recipient_id: int, serialize_for_cli=False
) -> List[RecipientRow]:
recipient_id = int(recipient_id)
assert isinstance(recipient_id, int), "Please supply a valid recipient id"
params = []
Param(recipient_id, D_TYPES.Int, "recipient_id").add_to_param_list(params)
response = await self.call_hq(
"sdk_GetRecipients", ExecuteStoredProcedure.SqlParameters(params)
)
if serialize_for_cli:
response = self.serialize_for_cli(response)
return response
[docs]
async def get_recipients(
self,
customer_id: int = None,
logon_name: str = "",
recipient_id: int = None,
serialize_for_cli=False,
):
if not any([customer_id, logon_name]):
return "Failure: Missing parameter. Either Customer ID or Logon Name must be provided"
if logon_name and not customer_id:
params_obj = GetUserLoginInfoByLoginName.ParamsObj(
self.logger,
logon_name,
"OnlineBanking",
hq_credentials=self.hq_credentials,
)
get_user = await GetUserLoginInfoByLoginName.execute(params_obj)
self.logger.debug(f"user: {get_user}")
try:
parent_node = get_user.result_node.Data.DalUserLogonList
if hasattr(parent_node, "Q2_UserLogonList"):
customer_id = parent_node.Q2_UserLogonList.CustomerID.pyval
except AttributeError:
return "Failure: User not found"
if not customer_id:
return "Failure: User not found"
customer_id = int(customer_id)
params = []
Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(params)
if recipient_id:
Param(recipient_id, D_TYPES.Int, "RecipientID").add_to_param_list(params)
response = await self.call_hq(
"sdkapi_get_recipients", ExecuteStoredProcedure.SqlParameters(params)
)
response_details = response[0].find(".//")
if not response_details:
return "Failure: Recipient not found"
rows = str(response_details)
self.logger.debug(rows)
recipients = objectify.fromstring("".join(rows))
return recipients