from argparse import _SubParsersAction
from functools import partial
from typing import Optional
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from ..table_row import TableRow
from .audit_record import AuditRecord
from .db_object import DbObject
[docs]
class AccountAddressRow(TableRow):
UserID: int
AddressID: int
HostAccountID: int
StreetAddress1: str
StreetAddress2: str
City: str
State: str
PostalCode: str
CustomerID: int
AddressType: str
CountryID: int
IsInternational: bool
Province: str
ISsoCodeA3: str
CountryName: str
[docs]
class AccountAddressHistoryRow(TableRow):
ChangeAccountID: int
HostAccountID: int
TransactionID: int
OldAddressID: int
StreetAddress1: str
StreetAddress2: str
City: str
State: str
PostalCode: str
EmailAddress: str
HomePhoneNumber: str
WorkPhoneNumber: str
CellPhoneNumber: str
CountryID: int
Province: str
IsoCodeA3: str
Status: str
[docs]
class AccountAddress(DbObject):
NAME = "AccountAddress"
REPRESENTATION_ROW_CLASS = AccountAddressRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_account_address")
subparser.set_defaults(parser="get")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument(
"host_account_id", help="Q2_AccountAddress.HostAccountID"
)
subparser = parser.add_parser("get_account_address_by_address_id")
subparser.set_defaults(parser="get_by_address_id")
subparser.set_defaults(
func=partial(self.get_by_address_id, serialize_for_cli=True)
)
subparser.add_argument("address_id", help="Q2_AccountAddress.AddressID")
subparser = parser.add_parser("get_account_address_history")
subparser.set_defaults(parser="get")
subparser.set_defaults(func=partial(self.get_history, serialize_for_cli=True))
subparser.add_argument(
"host_account_id", help="Q2_AccountAddress.HostAccountID"
)
[docs]
async def get(
self, host_account_id: int, serialize_for_cli=False
) -> list[AccountAddressRow]:
response = await self.call_hq(
"sdk_GetAccountAddress",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"host_account_id",
host_account_id,
)
]),
)
if serialize_for_cli:
columns = [
"StreetAddress1",
"StreetAddress2",
"City",
"State",
"PostalCode",
"AddressType",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def get_by_address_id(
self, address_id: int, serialize_for_cli=False
) -> list[AccountAddressRow]:
response = await self.call_hq(
"sdk_GetAccountAddressByAddressID",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"addressId",
address_id,
)
]),
)
if serialize_for_cli:
columns = [
"HostAccountID",
"StreetAddress1",
"StreetAddress2",
"City",
"State",
"PostalCode",
"AddressType",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def get_history(
self, host_account_id: int, serialize_for_cli=False
) -> list[AccountAddressHistoryRow]:
response = await self.call_hq(
"sdk_GetAccountAddressHistory",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"host_account_id",
host_account_id,
)
]),
)
if serialize_for_cli:
columns = [
"StreetAddress1",
"StreetAddress2",
"City",
"State",
"PostalCode",
"Status",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def create(self, host_account_id: int, address_id: int):
response = await self.call_hq(
"sdk_CreateAccountAddress",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"host_account_id",
host_account_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "address_id", address_id
),
]),
)
return response
[docs]
async def update(
self,
host_account_id: int,
address_id: int,
current_address_id: Optional[int] = None,
):
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"host_account_id",
host_account_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "address_id", address_id
),
]
if current_address_id:
parameters.append(
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"current_address_id",
current_address_id,
)
)
response = await self.call_hq(
"sdk_UpdateAccountAddress",
ExecuteStoredProcedure.SqlParameters(parameters),
)
return response
[docs]
async def delete(
self,
host_account_id: int,
online_session: OnlineSession,
online_user: OnlineUser,
):
response = await self.call_hq(
"sdk_RemoveAccountAddress",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"host_account_id",
host_account_id,
)
]),
)
if self.hq_response.success:
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"AccountAddress row deleted: Host account id {host_account_id}",
online_session.session_id,
workstation_id=online_session.workstation,
customer_id=online_user.customer_id,
user_id=online_user.user_id,
user_logon_id=online_user.user_logon_id,
)
return response