Source code for q2_sdk.hq.db.host_account

from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from functools import partial
from typing import Optional
from zoneinfo import ZoneInfo

from lxml.etree import tostring
from lxml.objectify import E, fromstring

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.audit_record import AuditRecord
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.table_row import TableRow
from q2_sdk.tools.decorators import dev_only

from .db_object import DbObject

D_TYPES = ExecuteStoredProcedure.DataType


[docs] class HostAccountRow(TableRow): HostAccountID: int AccountNumberInternal: str AccountNumberExternal: str ProductID: int ProductName: str ProductNickName: str AccountDesc: str CIFInternal: str CIFExternal: str DefaultPwd: str DataAsOfDate: str BranchID: int SkipBalanceCheck: bool ABA: str IsExternalAccount: bool AccountInfo: str LastStatusChange: str AccountStatusID: int LastHistoryDate: str
[docs] class HostAccountCustomerRow(TableRow): CustomerID: int UserID: int HostAccountID: int AccountNumberInternal: str AccountNumberExternal: str ProductID: int ProductName: str AccountDesc: str CIFInternal: str CIFExternal: str DataAsOfDate: str ProductTypeName: str ABA: str IsExternalAccount: bool
[docs] class HostAccountCustomerWithProductRow(TableRow): CustomerID: int HostAccountID: int ProductName: str ProductID: int AccountStatus: str
[docs] @dataclass class UpdateAccountRightsParameters: hostAccountId: int userId: int can_view: bool can_deposit: bool can_withdraw: bool
[docs] def calculate_access(self) -> int: access = 0 access += 1 if self.can_deposit else 0 access += 2 if self.can_view else 0 access += 4 if self.can_withdraw else 0 return access
[docs] def build_sql_parameters(self): params = [] for param in [ Param(self.hostAccountId, D_TYPES.Int, "hostAccountId"), Param(self.userId, D_TYPES.Int, "userId"), Param(self.calculate_access(), D_TYPES.Int, "access"), ]: param.add_to_param_list(params) return params
[docs] @dataclass class ProductChangeParameters: host_account_id: int old_product_id: int new_product_id: int
[docs] def build_xml_node(self): return E.AccountRecord( HostAccountID=str(self.host_account_id), OldProductID=str(self.old_product_id), NewProductID=str(self.new_product_id), )
[docs] class ProductChangeStatus(Enum): Ok = "OK" AllRecordsSkipped = "AllRecordsSkipped" PartialUpdate = "PartialUpdate"
[docs] @dataclass class ProductChangeResults: result: ProductChangeStatus successful_accounts: list skipped_accounts: list
[docs] class HostAccount(DbObject): REPRESENTATION_ROW_CLASS = HostAccountRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_host_account_by_id") subparser.set_defaults(parser="get_host_account_by_id") subparser.set_defaults(func=partial(self.get_by_id, serialize_for_cli=True)) subparser.add_argument("host_account_id", help="Q2_HostAccount.HostAccountID")
[docs] @dev_only async def check_adapter_mode(self): """Note: this only works in the dev environment""" return await self.call_hq( "sdk_SetToBatch", ExecuteStoredProcedure.SqlParameters([]) )
[docs] @dev_only async def update_to_batch(self): """Note: this only works in the dev environment""" return await self.call_hq( "sdk_SetToBatch", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "update_to_batch", True ) ]), )
[docs] async def get_by_id( self, host_account_id: int, serialize_for_cli=False ) -> HostAccountRow: response = await self.call_hq( "sdk_GetHostAccountById", sql_parameters=ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if response: response = response[0] if serialize_for_cli: response = self.serialize_for_cli( [response], fields_to_display=[ "AccountNumberInternal", "AccountNumberExternal", "ProductID", "CIFInternal", "CIFExternal", "DataAsOfDate", "IsExternalAccount", "ProductName", ], ) return response
[docs] async def create( self, account_internal: str, account_external: str, host_product_code: str, host_product_type_code: str, cif: str, account_description: str, is_external: bool = False, data_as_of_date: datetime | None = None, ) -> list[HostAccountRow]: if not isinstance(data_as_of_date, (datetime, type(None))): raise TypeError( "data_as_of_date must be a datetime.datetime object or None" ) if not data_as_of_date: data_as_of_date = datetime.now(tz=ZoneInfo("UTC")) data_as_of_date = data_as_of_date.isoformat() return await self.call_hq( "sdk_AddHostAccount", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "acct_internal", account_internal, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "acct_external", account_external, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "host_product_code", host_product_code, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "host_product_type_code", host_product_type_code, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "cif", cif ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "acct_description", account_description, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "is_external", is_external ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.DateTime, "data_as_of_date", data_as_of_date, ), ]), )
[docs] async def search_by_external_number( self, external_account_number ) -> list[HostAccountRow]: return await self.call_hq( "sdk_GetHostAccountByExternalNumber", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "account_external", external_account_number, ) ]), )
[docs] async def search_by_internal_number( self, internal_account_number ) -> list[HostAccountRow]: return await self.call_hq( "sdk_GetHostAccountByInternalNumber", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "account_internal", internal_account_number, ) ]), )
[docs] async def update_account_rights(self, parameters: UpdateAccountRightsParameters): """ Update the account rights for an account linked to a user. This only works for accounts explicitly linked to the user, and will not work for accounts linked by a cif. """ await self.call_hq( "sdk_UpdateAccountAccess", ExecuteStoredProcedure.SqlParameters(parameters.build_sql_parameters()), ) audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( f"{parameters.hostAccountId} access changed to {parameters.calculate_access()} for user id {parameters.userId}", "SDK Session", user_id=parameters.userId, host_account_id=parameters.hostAccountId, )
[docs] async def get_customer_host_accounts( self, customer_ids: list[int] ) -> list[HostAccountCustomerRow]: """ Retrieves multiple accounts associated to the customers in the list. For DB performance reason, the customer id list should be limited to 20 :param customer_ids: list of Q2_Customer.CustomerID <= 20 """ if len(customer_ids) > 20: raise DatabaseDataError( "Limit the customer id list to 20 or less for DB performance reasons" ) # <request><get c="1" /><get c="2" /><get c="3" /></request> get_customers_list = [E.get(c=str(cust_id)) for cust_id in customer_ids] get_request_xml = E.request(*get_customers_list) get_request = tostring(get_request_xml, encoding="utf-8") params = [] Param(get_request.decode(), D_TYPES.Xml, "request").add_to_param_list(params) return await self.call_hq( "sdk_GetCustomerAccountsFromCustomList", ExecuteStoredProcedure.SqlParameters(params), )
[docs] async def get_accounts_by_product( self, group_id: int, product_id: Optional[int] = None ) -> list[HostAccountCustomerWithProductRow]: """ Searches a group of customers for accounts with a specific product id :param group_id: Group to search [Q2_Group.GroupID] :param product_id: Product ID of the target product [Q2_Product.ProductID] """ param_list = [] Param(group_id, D_TYPES.Int, "group_id").add_to_param_list(param_list) if product_id: Param(product_id, D_TYPES.Int, "product_id").add_to_param_list(param_list) return await self.call_hq( "sdk_GetCustomersWithProductId", ExecuteStoredProcedure.SqlParameters(param_list), )
[docs] async def change_account_product( self, account_changes: list[ProductChangeParameters] ) -> ProductChangeResults: """ Changes the product of a list of accounts :param account_changes: list of ProductChangeParameters objects. :return: ProductChangeResults object """ account_nodes = [x.build_xml_node() for x in account_changes] root = E.Records(*account_nodes) xml_parameter_string = tostring(root, encoding="utf-8") param_list = [] Param(xml_parameter_string.decode(), D_TYPES.Xml, "data").add_to_param_list( param_list ) results = await self.call_hq( "bsp_ChangeProduct", ExecuteStoredProcedure.SqlParameters(param_list) ) result_xml_text = results[0].getchildren()[0].text result_xml = fromstring(result_xml_text) result_enum = ProductChangeStatus(result_xml.attrib["status"]) match result_enum: case ProductChangeStatus.Ok: result_obj = ProductChangeResults( result_enum, [x.host_account_id for x in account_changes], [] ) case ProductChangeStatus.AllRecordsSkipped: result_obj = ProductChangeResults( result_enum, [], [x.host_account_id for x in account_changes] ) case ProductChangeStatus.PartialUpdate: skipped = [int(x.attrib["HostAccountID"]) for x in result_xml.skipped] success = [ x.host_account_id for x in account_changes if x.host_account_id not in skipped ] result_obj = ProductChangeResults(result_enum, success, skipped) return result_obj