from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from functools import partial
from typing import Optional
from zoneinfo import ZoneInfo
from lxml.etree import tostring
from lxml.objectify import E, fromstring
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.audit_record import AuditRecord
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.table_row import TableRow
from q2_sdk.tools.decorators import dev_only
from .db_object import DbObject
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
class HostAccountRow(TableRow):
HostAccountID: int
AccountNumberInternal: str
AccountNumberExternal: str
ProductID: int
ProductName: str
ProductNickName: str
AccountDesc: str
CIFInternal: str
CIFExternal: str
DefaultPwd: str
DataAsOfDate: str
BranchID: int
SkipBalanceCheck: bool
ABA: str
IsExternalAccount: bool
AccountInfo: str
LastStatusChange: str
AccountStatusID: int
LastHistoryDate: str
[docs]
class HostAccountCustomerRow(TableRow):
CustomerID: int
UserID: int
HostAccountID: int
AccountNumberInternal: str
AccountNumberExternal: str
ProductID: int
ProductName: str
AccountDesc: str
CIFInternal: str
CIFExternal: str
DataAsOfDate: str
ProductTypeName: str
ABA: str
IsExternalAccount: bool
[docs]
class HostAccountCustomerWithProductRow(TableRow):
CustomerID: int
HostAccountID: int
ProductName: str
ProductID: int
AccountStatus: str
[docs]
@dataclass
class UpdateAccountRightsParameters:
hostAccountId: int
userId: int
can_view: bool
can_deposit: bool
can_withdraw: bool
[docs]
def calculate_access(self) -> int:
access = 0
access += 1 if self.can_deposit else 0
access += 2 if self.can_view else 0
access += 4 if self.can_withdraw else 0
return access
[docs]
def build_sql_parameters(self):
params = []
for param in [
Param(self.hostAccountId, D_TYPES.Int, "hostAccountId"),
Param(self.userId, D_TYPES.Int, "userId"),
Param(self.calculate_access(), D_TYPES.Int, "access"),
]:
param.add_to_param_list(params)
return params
[docs]
@dataclass
class ProductChangeParameters:
host_account_id: int
old_product_id: int
new_product_id: int
[docs]
def build_xml_node(self):
return E.AccountRecord(
HostAccountID=str(self.host_account_id),
OldProductID=str(self.old_product_id),
NewProductID=str(self.new_product_id),
)
[docs]
class ProductChangeStatus(Enum):
Ok = "OK"
AllRecordsSkipped = "AllRecordsSkipped"
PartialUpdate = "PartialUpdate"
[docs]
@dataclass
class ProductChangeResults:
result: ProductChangeStatus
successful_accounts: list
skipped_accounts: list
[docs]
class HostAccount(DbObject):
REPRESENTATION_ROW_CLASS = HostAccountRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_host_account_by_id")
subparser.set_defaults(parser="get_host_account_by_id")
subparser.set_defaults(func=partial(self.get_by_id, serialize_for_cli=True))
subparser.add_argument("host_account_id", help="Q2_HostAccount.HostAccountID")
[docs]
@dev_only
async def check_adapter_mode(self):
"""Note: this only works in the dev environment"""
return await self.call_hq(
"sdk_SetToBatch", ExecuteStoredProcedure.SqlParameters([])
)
[docs]
@dev_only
async def update_to_batch(self):
"""Note: this only works in the dev environment"""
return await self.call_hq(
"sdk_SetToBatch",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit, "update_to_batch", True
)
]),
)
[docs]
async def get_by_id(
self, host_account_id: int, serialize_for_cli=False
) -> HostAccountRow:
response = await self.call_hq(
"sdk_GetHostAccountById",
sql_parameters=ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"host_account_id",
host_account_id,
)
]),
)
if response:
response = response[0]
if serialize_for_cli:
response = self.serialize_for_cli(
[response],
fields_to_display=[
"AccountNumberInternal",
"AccountNumberExternal",
"ProductID",
"CIFInternal",
"CIFExternal",
"DataAsOfDate",
"IsExternalAccount",
"ProductName",
],
)
return response
[docs]
async def create(
self,
account_internal: str,
account_external: str,
host_product_code: str,
host_product_type_code: str,
cif: str,
account_description: str,
is_external: bool = False,
data_as_of_date: datetime | None = None,
) -> list[HostAccountRow]:
if not isinstance(data_as_of_date, (datetime, type(None))):
raise TypeError(
"data_as_of_date must be a datetime.datetime object or None"
)
if not data_as_of_date:
data_as_of_date = datetime.now(tz=ZoneInfo("UTC"))
data_as_of_date = data_as_of_date.isoformat()
return await self.call_hq(
"sdk_AddHostAccount",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"acct_internal",
account_internal,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"acct_external",
account_external,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"host_product_code",
host_product_code,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"host_product_type_code",
host_product_type_code,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "cif", cif
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"acct_description",
account_description,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit, "is_external", is_external
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.DateTime,
"data_as_of_date",
data_as_of_date,
),
]),
)
[docs]
async def search_by_external_number(
self, external_account_number
) -> list[HostAccountRow]:
return await self.call_hq(
"sdk_GetHostAccountByExternalNumber",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"account_external",
external_account_number,
)
]),
)
[docs]
async def search_by_internal_number(
self, internal_account_number
) -> list[HostAccountRow]:
return await self.call_hq(
"sdk_GetHostAccountByInternalNumber",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"account_internal",
internal_account_number,
)
]),
)
[docs]
async def update_account_rights(self, parameters: UpdateAccountRightsParameters):
"""
Update the account rights for an account linked to a user. This only works for accounts
explicitly linked to the user, and will not work for accounts linked by a cif.
"""
await self.call_hq(
"sdk_UpdateAccountAccess",
ExecuteStoredProcedure.SqlParameters(parameters.build_sql_parameters()),
)
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"{parameters.hostAccountId} access changed to {parameters.calculate_access()} for user id {parameters.userId}",
"SDK Session",
user_id=parameters.userId,
host_account_id=parameters.hostAccountId,
)
[docs]
async def get_customer_host_accounts(
self, customer_ids: list[int]
) -> list[HostAccountCustomerRow]:
"""
Retrieves multiple accounts associated to the customers in the list. For DB performance reason, the customer id list should be limited to 20
:param customer_ids: list of Q2_Customer.CustomerID <= 20
"""
if len(customer_ids) > 20:
raise DatabaseDataError(
"Limit the customer id list to 20 or less for DB performance reasons"
)
# <request><get c="1" /><get c="2" /><get c="3" /></request>
get_customers_list = [E.get(c=str(cust_id)) for cust_id in customer_ids]
get_request_xml = E.request(*get_customers_list)
get_request = tostring(get_request_xml, encoding="utf-8")
params = []
Param(get_request.decode(), D_TYPES.Xml, "request").add_to_param_list(params)
return await self.call_hq(
"sdk_GetCustomerAccountsFromCustomList",
ExecuteStoredProcedure.SqlParameters(params),
)
[docs]
async def get_accounts_by_product(
self, group_id: int, product_id: Optional[int] = None
) -> list[HostAccountCustomerWithProductRow]:
"""
Searches a group of customers for accounts with a specific product id
:param group_id: Group to search [Q2_Group.GroupID]
:param product_id: Product ID of the target product [Q2_Product.ProductID]
"""
param_list = []
Param(group_id, D_TYPES.Int, "group_id").add_to_param_list(param_list)
if product_id:
Param(product_id, D_TYPES.Int, "product_id").add_to_param_list(param_list)
return await self.call_hq(
"sdk_GetCustomersWithProductId",
ExecuteStoredProcedure.SqlParameters(param_list),
)
[docs]
async def change_account_product(
self, account_changes: list[ProductChangeParameters]
) -> ProductChangeResults:
"""
Changes the product of a list of accounts
:param account_changes: list of ProductChangeParameters objects.
:return: ProductChangeResults object
"""
account_nodes = [x.build_xml_node() for x in account_changes]
root = E.Records(*account_nodes)
xml_parameter_string = tostring(root, encoding="utf-8")
param_list = []
Param(xml_parameter_string.decode(), D_TYPES.Xml, "data").add_to_param_list(
param_list
)
results = await self.call_hq(
"bsp_ChangeProduct", ExecuteStoredProcedure.SqlParameters(param_list)
)
result_xml_text = results[0].getchildren()[0].text
result_xml = fromstring(result_xml_text)
result_enum = ProductChangeStatus(result_xml.attrib["status"])
match result_enum:
case ProductChangeStatus.Ok:
result_obj = ProductChangeResults(
result_enum, [x.host_account_id for x in account_changes], []
)
case ProductChangeStatus.AllRecordsSkipped:
result_obj = ProductChangeResults(
result_enum, [], [x.host_account_id for x in account_changes]
)
case ProductChangeStatus.PartialUpdate:
skipped = [int(x.attrib["HostAccountID"]) for x in result_xml.skipped]
success = [
x.host_account_id
for x in account_changes
if x.host_account_id not in skipped
]
result_obj = ProductChangeResults(result_enum, success, skipped)
return result_obj