Source code for q2_sdk.hq.db.recipient

from argparse import _SubParsersAction
from dataclasses import dataclass, fields
from functools import partial
from typing import List

from lxml import objectify
from lxml.objectify import BoolElement, IntElement, StringElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.hq_api.q2_api import GetUserLoginInfoByLoginName
from q2_sdk.hq.models.hq_params.stored_procedure import Param

from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] @dataclass class RecipientTemplate: RecipientID: str DisplayName: str CustomerID: str DefaultAccountID: str EmailAddress: str AlwaysSendEmail: str IsInternational: str ACHClassCodeID: str AchName: str WireName: str IdentificationNumber: str
[docs] @dataclass class AccountTemplate: AccountID: str RecipientID: str AccountNumber: str AccountTypeID: str ABA: str
[docs] @dataclass class RecipientFITemplate: RecipientFIDetailID: str AccountID: str Name: str BIC: str BranchBIC: str Address1: str Address2: str Address3: str City: str State: str PostalCode: str CountryID: str IntermediaryABA: str IsIntermedFI: str IBAN: str ReceivingFiShortName: str ReceivingFiAba: str
[docs] @dataclass class AddressTemplate: RecipientAddressID: str RecipientID: str Address1: str Address2: str City: str State: str PostalCode: str CountryID: str Address3: str
[docs] class RecipientRow(RepresentationRowBase): RecipientID: IntElement = "RecipientID" DisplayName: StringElement = "DisplayName" WireName: StringElement = "WireName" CustomerID: IntElement = "CustomerID" EmailAddress: StringElement = "EmailAddress" AlwaysSendEmail: BoolElement = "AlwaysSendEmail" IsInternational: BoolElement = "IsInternational" ACHClassCodeID: IntElement = "ACHClassCodeID" AchName: StringElement = "AchName" IdentificationNumber: StringElement = "IdentificationNumber" IsIndividual: BoolElement = "IsIndividual" AccountID: IntElement = "AccountID" AccountNumber: StringElement = "AccountNumber" ABA: StringElement = "ABA" AccountType: StringElement = "AccountType" AccountDescription: StringElement = "AccountDescription" RecipientFIName: StringElement = "RecipientFIName" BIC: StringElement = "BIC" BranchBIC: StringElement = "BranchBIC" RecipientFIAddress1: StringElement = "RecipientFIAddress1" RecipientFIAddress2: StringElement = "RecipientFIAddress2" RecipientFIAddress3: StringElement = "RecipientFIAddress3" RecipientFICity: StringElement = "RecipientFICity" RecipientFIState: StringElement = "RecipientFIState" RecipientFIPostalCode: StringElement = "RecipientFIPostalCode" RecipientFICountry: StringElement = "RecipientFICountry" IntermediaryABA: StringElement = "IntermediaryABA" IBAN: StringElement = "IBAN" ReceivingFiShortName: StringElement = "ReceivingFiShortName" ReceivingFiAba: StringElement = "ReceivingFiAba" Address1: StringElement = "Address1" Address2: StringElement = "Address2" Address3: StringElement = "Address3" City: StringElement = "City" State: StringElement = "State" PostalCode: StringElement = "PostalCode" Country: StringElement = "Country"
[docs] def from_row_to_dict(row, dataclass_type): """ Converts a given XML row returned from get_recipients to a dictionary """ return {field.name: row.get(field.name) for field in fields(dataclass_type)}
[docs] class Recipient(DbObject): GET_BY_NAME_KEY = "DisplayName" NAME = "Recipient" REPRESENTATION_ROW_CLASS = RecipientRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_by_recipient_id") subparser.set_defaults(parser="get_by_recipient_id") subparser.set_defaults( func=partial(self.get_by_recipient, serialize_for_cli=True) ) subparser.add_argument("recipient_id", help="Q2_Recipient.RecipientID") subparser = parser.add_parser("get_recipients_by_customer") subparser.set_defaults(parser="get_recipients_by_customer") subparser.set_defaults( func=partial(self.get_by_customer, serialize_for_cli=True) ) subparser.add_argument("customer_id", help="Q2_Recipient.CustomerID")
[docs] async def get_by_customer( self, customer_id: int, serialize_for_cli=False ) -> List[RecipientRow]: customer_id = int(customer_id) assert isinstance(customer_id, int), "Please supply a valid customer id" params = [] Param(customer_id, D_TYPES.Int, "customer_id").add_to_param_list(params) response = await self.call_hq( "sdk_GetRecipients", ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: response = self.serialize_for_cli(response) return response
[docs] async def get_by_recipient( self, recipient_id: int, serialize_for_cli=False ) -> List[RecipientRow]: recipient_id = int(recipient_id) assert isinstance(recipient_id, int), "Please supply a valid recipient id" params = [] Param(recipient_id, D_TYPES.Int, "recipient_id").add_to_param_list(params) response = await self.call_hq( "sdk_GetRecipients", ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: response = self.serialize_for_cli(response) return response
[docs] async def get_recipients( self, customer_id: int = None, logon_name: str = "", recipient_id: int = None, serialize_for_cli=False, ): if not any([customer_id, logon_name]): return "Failure: Missing parameter. Either Customer ID or Logon Name must be provided" if logon_name and not customer_id: params_obj = GetUserLoginInfoByLoginName.ParamsObj( self.logger, logon_name, "OnlineBanking", hq_credentials=self.hq_credentials, ) get_user = await GetUserLoginInfoByLoginName.execute(params_obj) self.logger.debug(f"user: {get_user}") try: parent_node = get_user.result_node.Data.DalUserLogonList if hasattr(parent_node, "Q2_UserLogonList"): customer_id = parent_node.Q2_UserLogonList.CustomerID.pyval except AttributeError: return "Failure: User not found" if not customer_id: return "Failure: User not found" customer_id = int(customer_id) params = [] Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(params) if recipient_id: Param(recipient_id, D_TYPES.Int, "RecipientID").add_to_param_list(params) response = await self.call_hq( "sdkapi_get_recipients", ExecuteStoredProcedure.SqlParameters(params) ) response_details = response[0].find(".//") if not response_details: return "Failure: Recipient not found" rows = str(response_details) self.logger.debug(rows) recipients = objectify.fromstring("".join(rows)) return recipients