import datetime
from argparse import _SubParsersAction
from functools import partial
from typing import Optional
from lxml.objectify import BoolElement, IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from q2_sdk.hq.db.audit_record import AuditRecord
from q2_sdk.models.demographic import Address as AddressObj
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class AddressRow(RepresentationRowBase):
AddressID: IntElement = "AddressID"
StreetAddress1: StringElement = "StreetAddress1"
StreetAddress2: StringElement = "StreetAddress2"
City: StringElement = "City"
State: StringElement = "State"
PostalCode: StringElement = "PostalCode"
CustomerID: IntElement = "CustomerID"
AddressType: IntElement = "AddressType"
UserID: IntElement = "UserID"
CountryID: IntElement = "CountryID"
IsInternational: BoolElement = "IsInternational"
Province: StringElement = "Province"
IsoCodeA2: StringElement = "IsoCodeA2"
IsoCodeA3: StringElement = "IsoCodeA3"
CountryName: StringElement = "CountryName"
[docs]
class Address(DbObject):
"""
Allows for additional address details given an address ID. Usually used in conjunction with a Default Address ID
from a User query.
"""
REPRESENTATION_ROW_CLASS = AddressRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_address")
subparser.set_defaults(parser="get")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument("address_id", help="Q2_User.AddressID")
subparser = parser.add_parser("get_address_by_login")
subparser.set_defaults(parser="get_by_login")
subparser.set_defaults(func=partial(self.get_by_login, serialize_for_cli=True))
subparser.add_argument("login_name", help="Q2_UserLogin.LoginName")
[docs]
async def get(self, address_id: int, serialize_for_cli=False) -> list[AddressRow]:
try:
address_id = int(address_id)
except ValueError as err:
raise ValueError("Please supply a valid address ID") from err
response = await self.call_hq(
"sdk_GetAddress",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "address_id", address_id
)
]),
)
if serialize_for_cli:
columns = [
"StreetAddress1",
"StreetAddress2",
"City",
"State",
"PostalCode",
"AddressType",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def get_by_login(
self, login_name: str, serialize_for_cli=False
) -> list[AddressRow]:
response = await self.call_hq(
"sdk_GetAddressByLogin",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "login", login_name
)
]),
)
if serialize_for_cli:
columns = [
"StreetAddress1",
"StreetAddress2",
"City",
"State",
"PostalCode",
"AddressType",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def create(
self,
address_object: AddressObj,
user_id: Optional[int] = None,
customer_id: Optional[int] = None,
):
"""
Creates an address in the database.
State should be a two character state code if country is USA, otherwise a province
"""
assert any([
user_id,
customer_id,
]), "Either user_id or customer_id, or both, need to be provided"
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"address_1",
address_object.address_1,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"address_2",
address_object.address_2,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "city", address_object.city
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "state", address_object.state
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"zipcode",
address_object.zipcode,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"address_type",
address_object.address_type,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"country",
address_object.country,
),
]
if customer_id:
parameters.append(
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "customer_id", customer_id
)
)
if user_id:
parameters.append(
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "user_id", user_id
)
)
response = await self.call_hq(
"sdk_AddAddress", ExecuteStoredProcedure.SqlParameters(parameters)
)
return response
[docs]
async def delete(
self,
address_id: int,
online_user: OnlineUser,
online_session: OnlineSession,
time_occurred: Optional[str] = None,
):
"""
Deletes an address in the Q2_Address table. The User ID is required in the OnlineUser parameter in order to check the default address id on the Q2_User table before deletion. The deletion is audited
:param address_id: The id of the address to delete
:param online_user: An OnlineUser object with the user id populated at the minimum
:param online_session: An OnlineSession object used for auditing
:param time_occurred: The ISO format timestamp of the deletion time.
"""
if not online_user.user_id:
raise DatabaseDataError("online_user.user_id is required")
if not time_occurred:
time_occurred = datetime.datetime.now().isoformat() # The datetime parameter is required for the stored proc but not used. Going to keep it a time stamp in case that changes in the future.
response = await self.call_hq(
"Q2_Address_Delete_EndUser",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"Id",
address_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "ParentId", online_user.user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.DateTime, "DateTime", time_occurred
),
]),
)
if self.hq_response.success:
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"Address row deleted: Address ID {address_id}. User ID submitted: {online_user.user_id}",
online_session.session_id,
workstation_id=online_session.workstation,
customer_id=online_user.customer_id,
user_id=online_user.user_id,
user_logon_id=online_user.user_logon_id,
)
return response