Source code for q2_sdk.hq.db.account_address

from argparse import _SubParsersAction
from functools import partial
from typing import Optional

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser

from ..table_row import TableRow
from .audit_record import AuditRecord
from .db_object import DbObject


[docs] class AccountAddressRow(TableRow): UserID: int AddressID: int HostAccountID: int StreetAddress1: str StreetAddress2: str City: str State: str PostalCode: str CustomerID: int AddressType: str CountryID: int IsInternational: bool Province: str ISsoCodeA3: str CountryName: str
[docs] class AccountAddressHistoryRow(TableRow): ChangeAccountID: int HostAccountID: int TransactionID: int OldAddressID: int StreetAddress1: str StreetAddress2: str City: str State: str PostalCode: str EmailAddress: str HomePhoneNumber: str WorkPhoneNumber: str CellPhoneNumber: str CountryID: int Province: str IsoCodeA3: str Status: str
[docs] class AccountAddress(DbObject): NAME = "AccountAddress" REPRESENTATION_ROW_CLASS = AccountAddressRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_account_address") subparser.set_defaults(parser="get") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "host_account_id", help="Q2_AccountAddress.HostAccountID" ) subparser = parser.add_parser("get_account_address_by_address_id") subparser.set_defaults(parser="get_by_address_id") subparser.set_defaults( func=partial(self.get_by_address_id, serialize_for_cli=True) ) subparser.add_argument("address_id", help="Q2_AccountAddress.AddressID") subparser = parser.add_parser("get_account_address_history") subparser.set_defaults(parser="get") subparser.set_defaults(func=partial(self.get_history, serialize_for_cli=True)) subparser.add_argument( "host_account_id", help="Q2_AccountAddress.HostAccountID" )
[docs] async def get( self, host_account_id: int, serialize_for_cli=False ) -> list[AccountAddressRow]: response = await self.call_hq( "sdk_GetAccountAddress", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if serialize_for_cli: columns = [ "StreetAddress1", "StreetAddress2", "City", "State", "PostalCode", "AddressType", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_by_address_id( self, address_id: int, serialize_for_cli=False ) -> list[AccountAddressRow]: response = await self.call_hq( "sdk_GetAccountAddressByAddressID", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "addressId", address_id, ) ]), ) if serialize_for_cli: columns = [ "HostAccountID", "StreetAddress1", "StreetAddress2", "City", "State", "PostalCode", "AddressType", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_history( self, host_account_id: int, serialize_for_cli=False ) -> list[AccountAddressHistoryRow]: response = await self.call_hq( "sdk_GetAccountAddressHistory", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if serialize_for_cli: columns = [ "StreetAddress1", "StreetAddress2", "City", "State", "PostalCode", "Status", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def create(self, host_account_id: int, address_id: int): response = await self.call_hq( "sdk_CreateAccountAddress", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "address_id", address_id ), ]), ) return response
[docs] async def update( self, host_account_id: int, address_id: int, current_address_id: Optional[int] = None, ): parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "address_id", address_id ), ] if current_address_id: parameters.append( ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "current_address_id", current_address_id, ) ) response = await self.call_hq( "sdk_UpdateAccountAddress", ExecuteStoredProcedure.SqlParameters(parameters), ) return response
[docs] async def delete( self, host_account_id: int, online_session: OnlineSession, online_user: OnlineUser, ): response = await self.call_hq( "sdk_RemoveAccountAddress", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if self.hq_response.success: audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( f"AccountAddress row deleted: Host account id {host_account_id}", online_session.session_id, workstation_id=online_session.workstation, customer_id=online_user.customer_id, user_id=online_user.user_id, user_logon_id=online_user.user_logon_id, ) return response