Source code for q2_sdk.hq.db.account_address

from argparse import _SubParsersAction
from functools import partial
from typing import Optional

from lxml.objectify import IntElement, StringElement, BoolElement
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from .db_object import DbObject
from .audit_record import AuditRecord
from .representation_row_base import RepresentationRowBase


[docs] class AccountAddressRow(RepresentationRowBase): UserID: IntElement = "UserID" AddressID: IntElement = "AddressID" HostAccountID: IntElement = "HostAccountID" StreetAddress1: StringElement = "StreetAddress1" StreetAddress2: StringElement = "StreetAddress2" City: StringElement = "City" State: StringElement = "State" PostalCode: StringElement = "PostalCode" CustomerID: IntElement = "CustomerID" AddressType: StringElement = "AddressType" CountryID: IntElement = "CountryID" IsInternational: BoolElement = "IsInternational" Province: StringElement = "Province" ISsoCodeA3: StringElement = "ISsoCodeA3" CountryName: StringElement = "CountryName"
[docs] class AccountAddressHistoryRow(RepresentationRowBase): ChangeAccountID: IntElement = "ChangeAccountID" HostAccountID: IntElement = "HostAccountID" TransactionID: IntElement = "TransactionID" OldAddressID: IntElement = "OldAddressID" StreetAddress1: StringElement = "StreetAddress1" StreetAddress2: StringElement = "StreetAddress2" City: StringElement = "City" State: StringElement = "State" PostalCode: StringElement = "PostalCode" EmailAddress: StringElement = "EmailAddress" HomePhoneNumber: StringElement = "HomePhoneNumber" WorkPhoneNumber: StringElement = "WorkPhoneNumber" CellPhoneNumber: StringElement = "CellPhoneNumber" CountryID: IntElement = "CountryID" Province: StringElement = "Province" IsoCodeA3: StringElement = "IsoCodeA3" Status: StringElement = "Status"
[docs] class AccountAddress(DbObject): NAME = "AccountAddress" REPRESENTATION_ROW_CLASS = AccountAddressRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_account_address") subparser.set_defaults(parser="get") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "host_account_id", help="Q2_AccountAddress.HostAccountID" ) subparser = parser.add_parser("get_account_address_by_address_id") subparser.set_defaults(parser="get_by_address_id") subparser.set_defaults( func=partial(self.get_by_address_id, serialize_for_cli=True) ) subparser.add_argument("address_id", help="Q2_AccountAddress.AddressID") subparser = parser.add_parser("get_account_address_history") subparser.set_defaults(parser="get") subparser.set_defaults(func=partial(self.get_history, serialize_for_cli=True)) subparser.add_argument( "host_account_id", help="Q2_AccountAddress.HostAccountID" )
[docs] async def get( self, host_account_id: int, serialize_for_cli=False ) -> list[AccountAddressRow]: response = await self.call_hq( "sdk_GetAccountAddress", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if serialize_for_cli: columns = [ "StreetAddress1", "StreetAddress2", "City", "State", "PostalCode", "AddressType", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_by_address_id( self, address_id: int, serialize_for_cli=False ) -> list[AccountAddressRow]: response = await self.call_hq( "sdk_GetAccountAddressByAddressID", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "addressId", address_id, ) ]), ) if serialize_for_cli: columns = [ "HostAccountID", "StreetAddress1", "StreetAddress2", "City", "State", "PostalCode", "AddressType", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_history( self, host_account_id: int, serialize_for_cli=False ) -> list[AccountAddressHistoryRow]: response = await self.call_hq( "sdk_GetAccountAddressHistory", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if serialize_for_cli: columns = [ "StreetAddress1", "StreetAddress2", "City", "State", "PostalCode", "Status", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def create(self, host_account_id: int, address_id: int): response = await self.call_hq( "sdk_CreateAccountAddress", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "address_id", address_id ), ]), ) return response
[docs] async def update( self, host_account_id: int, address_id: int, current_address_id: Optional[int] = None, ): parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "address_id", address_id ), ] if current_address_id: parameters.append( ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "current_address_id", current_address_id, ) ) response = await self.call_hq( "sdk_UpdateAccountAddress", ExecuteStoredProcedure.SqlParameters(parameters), ) return response
[docs] async def delete( self, host_account_id: int, online_session: OnlineSession, online_user: OnlineUser, ): response = await self.call_hq( "sdk_RemoveAccountAddress", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "host_account_id", host_account_id, ) ]), ) if self.hq_response.success: audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( f"AccountAddress row deleted: Host account id {host_account_id}", online_session.session_id, workstation_id=online_session.workstation, customer_id=online_user.customer_id, user_id=online_user.user_id, user_logon_id=online_user.user_logon_id, ) return response