from argparse import _SubParsersAction
from dataclasses import dataclass
from functools import partial
from typing import Optional
from lxml.objectify import IntElement, StringElement
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.product import Product
from q2_sdk.hq.db.product_type import ProductType
from q2_sdk.hq.db.audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
class UserPropertyDataRow(RepresentationRowBase):
UserPropertyDataID: IntElement = "UserPropertyDataID"
UISourceID: IntElement = "UISourceID"
GroupID: IntElement = "GroupID"
ProductTypeID: IntElement = "ProductTypeID"
ProductID: IntElement = "ProductID"
CustomerID: IntElement = "CustomerID"
UserID: IntElement = "UserID"
UserRoleID: IntElement = "UserRoleID"
HostAccountID: IntElement = "HostAccountID"
PropertyID: IntElement = "PropertyID"
PropertyValue: StringElement = "PropertyValue"
PropertyName: StringElement = "PropertyName"
PropertyDataType: StringElement = "PropertyDataType"
Weight: IntElement = "Weight"
CategoryShortName: StringElement = "CategoryShortName"
CategoryDescription: StringElement = "CategoryDescription"
[docs]
class UserPropertyData(DbObject):
REPRESENTATION_ROW_CLASS = UserPropertyDataRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_user_property_data")
subparser.set_defaults(parser="get_user_property_data")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument(
"user_property_name",
help="Q2_UserPropertyDataElements.PropertyValue prefix",
)
subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID")
subparser.add_argument(
"-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID"
)
subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID")
subparser.add_argument(
"-c", "--customer-id", help="Q2_UserPropertyData.CustomerID"
)
subparser.add_argument(
"--strict",
action="store_true",
help="Only show exact matches for group/user/customer",
)
subparser.add_argument(
"--return-count",
type=int,
help="Max number of matches to return",
default=10,
)
subparser.add_argument("--ui-source", help="Q2_UISource.ShortName")
subparser.add_argument(
"--host-account-id", help="Q2_UserPropertyData.HostAccountID"
)
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate PropertyLongName"
)
subparser = parser.add_parser("add_user_property_data")
subparser.set_defaults(parser="add_user_property_data")
subparser.set_defaults(func=partial(self.create))
subparser.add_argument(
"property_name", help="Q2_UserPropertyDataElement.ShortName"
)
subparser.add_argument(
"property_value", help="Q2_UserPropertyData.PropertyValue"
)
subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID")
subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID")
subparser.add_argument(
"-c", "--customer-id", help="Q2_UserPropertyData.CustomerID"
)
subparser.add_argument(
"-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID"
)
subparser.add_argument("--ui-source", help="Q2_UISource.ShortName")
subparser.add_argument(
"--host-account-id", help="Q2_UserPropertyData.HostAccountID"
)
subparser.add_argument("--product_id", help="Q2_UserPropertyData.ProductID")
subparser.add_argument(
"--product_type_id", help="Q2_UserPropertyData.ProductTypeID"
)
subparser = parser.add_parser("update_user_property_data")
subparser.set_defaults(parser="update_user_property_data")
subparser.set_defaults(func=partial(self.update))
subparser.add_argument(
"property_name", help="Q2_UserPropertyDataElements.PropertyName"
)
subparser.add_argument(
"property_value", help="Q2_UserPropertyData.PropertyValue"
)
subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID")
subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID")
subparser.add_argument(
"-c", "--customer-id", help="Q2_UserPropertyData.CustomerID"
)
subparser.add_argument(
"-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID"
)
subparser.add_argument("--ui-source", help="Q2_UISource.ShortName")
subparser.add_argument(
"--host-account-id", help="Q2_UserPropertyData.HostAccountID"
)
subparser = parser.add_parser("update_user_property_data_by_id")
subparser.set_defaults(parser="update_user_property_data_by_id")
subparser.set_defaults(func=partial(self.update_by_id))
subparser.add_argument(
"user_property_data_id", help="Q2_UserPropertyData.UserPropertyDataID"
)
subparser.add_argument(
"property_name", help="Q2_UserPropertyDataElement.PropertyName"
)
subparser.add_argument(
"property_value", help="Q2_UserPropertyData.PropertyValue"
)
subparser = parser.add_parser("remove_user_property_data")
subparser.set_defaults(parser="remove_user_property_data")
subparser.set_defaults(func=partial(self.delete))
subparser.add_argument(
"property_name", help="Q2_UserPropertyDataElement.ShortName"
)
subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID")
subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID")
subparser.add_argument(
"-c", "--customer-id", help="Q2_UserPropertyData.CustomerID"
)
subparser.add_argument(
"-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID"
)
subparser.add_argument(
"--host-account-id", help="Q2_UserPropertyData.HostAccountID"
)
subparser = parser.add_parser("get_user_property_data_by_value")
subparser.set_defaults(parser="get_user_property_data_by_value")
subparser.set_defaults(func=partial(self.get_by_value, serialize_for_cli=True))
subparser.add_argument(
"property_value", help="Q2_UserPropertyData.PropertyValue"
)
subparser.add_argument(
"user_property_name", help="Q2_UserPropertyDataElements.PropertyName"
)
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate PropertyLongName"
)
subparser = parser.add_parser("get_user_property_data_by_group")
subparser.set_defaults(parser="get_user_property_data_by_group")
subparser.set_defaults(func=partial(self.get_by_group, serialize_for_cli=True))
subparser.add_argument(
"user_property_name", help="Q2_UserPropertyDataElements.PropertyName"
)
[docs]
async def get(
self,
user_property_name: str,
user_id: Optional[int] = None,
group_id: Optional[int] = None,
customer_id: Optional[int] = None,
host_account_id: Optional[int] = None,
ui_source: Optional[str] = None,
product_id: Optional[int] = None,
product_type_id: Optional[int] = None,
return_count=10,
strict=True,
no_trunc=False,
serialize_for_cli=False,
user_role_id: Optional[int] = None,
) -> list[UserPropertyDataRow]:
truncate = not no_trunc
sql_params = []
list_of_params = [
Param(user_property_name, D_TYPES.VarChar, "PropertyName"),
Param(user_id, D_TYPES.Int, "UserID"),
Param(user_role_id, D_TYPES.Int, "UserRoleID"),
Param(group_id, D_TYPES.Int, "GroupID"),
Param(customer_id, D_TYPES.Int, "CustomerID"),
Param(host_account_id, D_TYPES.Int, "HostAccountID"),
Param(ui_source, D_TYPES.VarChar, "UISource"),
Param(product_id, D_TYPES.Int, "ProductID"),
Param(product_type_id, D_TYPES.Int, "ProductTypeID"),
Param(str(return_count), D_TYPES.Int, "ReturnCount"),
Param(str(strict), D_TYPES.Bit, "Strict"),
]
for item in list_of_params:
item.add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params)
)
if serialize_for_cli:
fields_to_truncate = []
if truncate:
fields_to_truncate = ["PropertyValue"]
response = self.serialize_for_cli(
response,
fields_to_display=[
"PropertyID",
"PropertyName",
"UserID",
"CustomerID",
"GroupID",
"UserRoleID",
"HostAccountID",
"PropertyValue",
"ProductID",
"ProductTypeID",
],
fields_to_truncate=fields_to_truncate,
)
return response
[docs]
async def get_by_value(
self,
property_value: str,
user_property_name: str,
no_trunc=False,
serialize_for_cli=False,
) -> list[UserPropertyDataRow]:
truncate = not no_trunc
sql_params = []
Param(property_value, D_TYPES.VarChar, "property_value").add_to_param_list(
sql_params
)
Param(
user_property_name, D_TYPES.VarChar, "user_property_name"
).add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserPropertyDataByValue",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
if serialize_for_cli:
fields_to_truncate = []
if truncate:
fields_to_truncate = ["TextValue"]
response = self.serialize_for_cli(
response,
fields_to_display=[
"PropertyID",
"UserID",
"CustomerID",
"UserRoleID",
"GroupID",
"HostAccountID",
"PropertyValue",
],
fields_to_truncate=fields_to_truncate,
)
return response
[docs]
async def get_by_property_id(self, property_id) -> list[UserPropertyDataRow]:
"""
This method will return user_property_data using by PropertyID.
"""
sql_params = []
Param(property_id, D_TYPES.Int, "property_id").add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserPropertyDataByPropertyID",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
return response
[docs]
async def get_by_group(
self, user_property_name: str, serialize_for_cli=False
) -> list[UserPropertyDataRow]:
"""
This method will include the default property value in the return. The default value will be the only value
that does not have a group id associated
"""
sql_params = []
Param(user_property_name, D_TYPES.VarChar, "PropertyName").add_to_param_list(
sql_params
)
response = await self.call_hq(
"sdk_GetUserPropertyDataByGroup",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
if serialize_for_cli:
response = self.serialize_for_cli(
response,
fields_to_display=[
"PropertyID",
"UserID",
"CustomerID",
"UserRoleID",
"GroupID",
"HostAccountID",
"PropertyValue",
],
)
return response
[docs]
async def get_by_group_with_id(
self, user_property_id: int
) -> list[UserPropertyDataRow]:
"""
This method will include the default property value in the return. The default value will be the only value
that does not have a group id associated
"""
sql_params = []
Param(user_property_id, D_TYPES.Int, "PropertyID").add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserPropertyDataByGroup",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
return response
[docs]
async def get_by_user_id(
self,
user_id: int,
user_property_name: str | None = None,
user_property_id: int | None = None,
) -> list[UserPropertyDataRow]:
if not any([user_property_name, user_property_id]):
raise DatabaseDataError("Either property name or id is needed")
sql_params = []
Param(user_id, D_TYPES.Int, "UserID").add_to_param_list(sql_params)
if user_property_name:
Param(
user_property_name, D_TYPES.VarChar, "PropertyName"
).add_to_param_list(sql_params)
if user_property_id:
Param(user_property_id, D_TYPES.Int, "PropertyID").add_to_param_list(
sql_params
)
return await self.call_hq(
"sdk_GetUserPropertyDataByUser",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
[docs]
async def get_by_customer_id(
self,
customer_id: int,
user_property_name: str | None = None,
user_property_id: int | None = None,
) -> list[UserPropertyDataRow]:
if not any([user_property_name, user_property_id]):
raise DatabaseDataError("Either property name or id is needed")
sql_params = []
Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(sql_params)
if user_property_name:
Param(
user_property_name, D_TYPES.VarChar, "PropertyName"
).add_to_param_list(sql_params)
if user_property_id:
Param(user_property_id, D_TYPES.Int, "PropertyID").add_to_param_list(
sql_params
)
return await self.call_hq(
"sdk_GetUserPropertyDataByCustomer",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
[docs]
async def create(
self,
property_name: str,
property_value: str | int | bool,
user_id: Optional[int] = None,
group_id: Optional[int] = None,
customer_id: Optional[int] = None,
host_account_id: Optional[int] = None,
ui_source: Optional[str] = None,
product_id: Optional[int] = None,
product_type_id: Optional[int] = None,
product_name: Optional[str] = None,
product_type_name: Optional[str] = None,
user_role_id: Optional[int] = None,
):
if product_name and not product_id:
product_db_obj = Product(self.logger, hq_credentials=self.hq_credentials)
product_obj = await product_db_obj.get_by_name(product_name)
product_id = product_obj.ProductID.pyval
if product_type_name and not product_type_id:
product_type_db_obj = ProductType(
self.logger, hq_credentials=self.hq_credentials
)
product_type_obj = await product_type_db_obj.get_by_name(product_type_name)
product_type_id = product_type_obj.ProductTypeID.pyval
sql_params = []
list_of_params = [
Param(user_id, D_TYPES.Int, "UserID"),
Param(property_name, D_TYPES.VarChar, "PropertyName"),
Param(str(property_value), D_TYPES.VarChar, "PropertyValue"),
Param(ui_source, D_TYPES.VarChar, "UISource"),
Param(group_id, D_TYPES.Int, "GroupID"),
Param(customer_id, D_TYPES.Int, "CustomerID"),
Param(host_account_id, D_TYPES.Int, "HostAccountID"),
Param(product_id, D_TYPES.SmallInt, "ProductID"),
Param(product_type_id, D_TYPES.SmallInt, "ProductTypeID"),
Param(user_role_id, D_TYPES.Int, "UserRoleID"),
]
for item in list_of_params:
item.add_to_param_list(sql_params)
return await self.call_hq(
"sdk_AddUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params)
)
[docs]
async def update(
self,
property_name: str,
property_value: str | int | bool,
user_id: Optional[int] = None,
group_id: Optional[int] = None,
customer_id: Optional[int] = None,
host_account_id: Optional[int] = None,
ui_source: Optional[str] = None,
product_id: Optional[int] = None,
product_type_id: Optional[int] = None,
product_name: Optional[str] = None,
product_type_name: Optional[str] = None,
user_role_id: Optional[int] = None,
):
if product_name and not product_id:
product_db_obj = Product(self.logger, hq_credentials=self.hq_credentials)
product_obj = await product_db_obj.get_by_name(product_name)
product_id = product_obj.ProductID.pyval
if product_type_name and not product_type_id:
product_type_db_obj = ProductType(
self.logger, hq_credentials=self.hq_credentials
)
product_type_obj = await product_type_db_obj.get_by_name(product_type_name)
product_type_id = product_type_obj.ProductTypeID.pyval
sql_params = []
list_of_params = [
Param(user_id, D_TYPES.Int, "UserID"),
Param(property_name, D_TYPES.VarChar, "PropertyName"),
Param(str(property_value), D_TYPES.VarChar, "PropertyValue"),
Param(ui_source, D_TYPES.VarChar, "UISource"),
Param(group_id, D_TYPES.Int, "GroupID"),
Param(customer_id, D_TYPES.Int, "CustomerID"),
Param(host_account_id, D_TYPES.Int, "HostAccountID"),
Param(product_id, D_TYPES.SmallInt, "ProductID"),
Param(product_type_id, D_TYPES.SmallInt, "ProductTypeID"),
Param(user_role_id, D_TYPES.Int, "UserRoleID"),
]
for item in list_of_params:
item.add_to_param_list(sql_params)
return await self.call_hq(
"sdk_UpdateUserPropertyData",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
[docs]
async def update_by_id(
self, user_property_data_id: int, property_name: str, property_value: str
):
assert int(user_property_data_id) >= 1, (
"User Property Data ID must be an integer greater than 0"
)
sql_params = []
Param(
user_property_data_id, D_TYPES.Int, "UserPropertyDataId"
).add_to_param_list(sql_params)
Param(property_name, D_TYPES.VarChar, "PropertyName").add_to_param_list(
sql_params
)
Param(str(property_value), D_TYPES.VarChar, "PropertyValue").add_to_param_list(
sql_params
)
return await self.call_hq(
"sdk_UpdateUserPropertyDataById",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
[docs]
async def create_or_update(
self,
property_name: str,
property_value: str | int | bool,
user_id: Optional[int] = None,
group_id: Optional[int] = None,
customer_id: Optional[int] = None,
host_account_id: Optional[int] = None,
ui_source: Optional[str] = None,
product_id: Optional[int] = None,
product_type_id: Optional[int] = None,
product_name: Optional[str] = None,
product_type_name: Optional[str] = None,
user_role_id: Optional[int] = None,
):
"""
There may be times when we want to error if a row already exists (create only).
This method exists in case you just want a row to be there after you're done calling the function, regardless of the initial state of the DB
"""
# This already exists logic is complicated, because even Strict mode against the DB only checks to see if there is
# a row in the DB that could count here, even if it's inherited. This specifically asks if there's already a row
# that matches exactly what we're looking for
@dataclass
class ParamToAttrMap:
param: str | int | None
attr_name: str
matching_rows = await self.get(
property_name,
user_id=user_id,
group_id=group_id,
customer_id=customer_id,
host_account_id=host_account_id,
ui_source=ui_source,
product_id=product_id,
product_type_id=product_type_id,
user_role_id=user_role_id,
strict=True,
)
already_exists = False
if matching_rows:
if not any((
user_id,
group_id,
customer_id,
host_account_id,
ui_source,
product_id,
product_type_id,
product_name,
product_type_name,
)):
already_exists = True
else:
mappings = (
ParamToAttrMap(user_id, "UserID"),
ParamToAttrMap(group_id, "GroupID"),
ParamToAttrMap(customer_id, "CustomerID"),
ParamToAttrMap(host_account_id, "HostAccountID"),
ParamToAttrMap(ui_source, "UISourceID"),
ParamToAttrMap(product_id, "ProductID"),
ParamToAttrMap(product_type_id, "ProductTypeID"),
ParamToAttrMap(user_role_id, "UserRoleID"),
)
for row in matching_rows:
if already_exists:
break
for mapping in mappings:
if mapping.param is not None:
if mapping.param == getattr(row, mapping.attr_name):
already_exists = True
break
if not already_exists:
return await self.create(
property_name,
property_value,
user_id=user_id,
group_id=group_id,
customer_id=customer_id,
host_account_id=host_account_id,
ui_source=ui_source,
product_id=product_id,
product_type_id=product_type_id,
product_name=product_name,
product_type_name=product_type_name,
user_role_id=user_role_id,
)
else:
return await self.update(
property_name,
property_value,
user_id=user_id,
group_id=group_id,
customer_id=customer_id,
host_account_id=host_account_id,
ui_source=ui_source,
product_id=product_id,
product_type_id=product_type_id,
product_name=product_name,
product_type_name=product_type_name,
user_role_id=user_role_id,
)
[docs]
async def delete(
self,
property_name: str,
user_id: Optional[int] = None,
group_id: Optional[int] = None,
customer_id: Optional[int] = None,
host_account_id: Optional[int] = None,
ui_source: Optional[str] = None,
user_role_id: Optional[int] = None,
**kwargs,
):
sql_params = []
list_of_params = [
Param(user_id, D_TYPES.Int, "UserID"),
Param(property_name, D_TYPES.VarChar, "PropertyName"),
Param(ui_source, D_TYPES.VarChar, "UISource"),
Param(group_id, D_TYPES.Int, "GroupID"),
Param(customer_id, D_TYPES.Int, "CustomerID"),
Param(host_account_id, D_TYPES.Int, "HostAccountID"),
Param(user_role_id, D_TYPES.Int, "UserRoleID"),
]
for item in list_of_params:
item.add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_RemoveUserPropertyData",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
if self.hq_response.success and response:
audit_obj = AuditRecord(self.logger, hq_credentials=self.hq_credentials)
deleted_rows = [x.__dict__ for x in response]
details = f"Removed UserPropertyData {deleted_rows}"
session = self.hq_response.headers.get("q2token")
session_id = session if session else "000000000000000000000000"
await audit_obj.create(details, session_id)
[docs]
async def delete_all_property_data(self, property_name: str):
sql_params = []
Param(property_name, D_TYPES.VarChar, "PropertyName").add_to_param_list(
sql_params
)
return await self.call_hq(
"sdk_RemoveAllUserPropertyData",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
[docs]
async def delete_default_value(self, short_name: str, is_form: bool):
sql_params = []
Param(short_name, D_TYPES.VarChar, "short_name").add_to_param_list(sql_params)
Param(is_form, D_TYPES.Bit, "is_form").add_to_param_list(sql_params)
return await self.call_hq(
"sdk_RemoveSDKNavDefaultValue",
ExecuteStoredProcedure.SqlParameters(sql_params),
)