Source code for q2_sdk.hq.db.user_property_data

from argparse import _SubParsersAction
from dataclasses import dataclass
from functools import partial
from typing import Optional

from lxml.objectify import IntElement, StringElement

from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.product import Product
from q2_sdk.hq.db.product_type import ProductType
from q2_sdk.hq.db.audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] class UserPropertyDataRow(RepresentationRowBase): UserPropertyDataID: IntElement = "UserPropertyDataID" UISourceID: IntElement = "UISourceID" GroupID: IntElement = "GroupID" ProductTypeID: IntElement = "ProductTypeID" ProductID: IntElement = "ProductID" CustomerID: IntElement = "CustomerID" UserID: IntElement = "UserID" UserRoleID: IntElement = "UserRoleID" HostAccountID: IntElement = "HostAccountID" PropertyID: IntElement = "PropertyID" PropertyValue: StringElement = "PropertyValue" PropertyName: StringElement = "PropertyName" PropertyDataType: StringElement = "PropertyDataType" Weight: IntElement = "Weight" CategoryShortName: StringElement = "CategoryShortName" CategoryDescription: StringElement = "CategoryDescription"
[docs] class UserPropertyData(DbObject): REPRESENTATION_ROW_CLASS = UserPropertyDataRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_user_property_data") subparser.set_defaults(parser="get_user_property_data") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "user_property_name", help="Q2_UserPropertyDataElements.PropertyValue prefix", ) subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID") subparser.add_argument( "-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID" ) subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID") subparser.add_argument( "-c", "--customer-id", help="Q2_UserPropertyData.CustomerID" ) subparser.add_argument( "--strict", action="store_true", help="Only show exact matches for group/user/customer", ) subparser.add_argument( "--return-count", type=int, help="Max number of matches to return", default=10, ) subparser.add_argument("--ui-source", help="Q2_UISource.ShortName") subparser.add_argument( "--host-account-id", help="Q2_UserPropertyData.HostAccountID" ) subparser.add_argument( "--no-trunc", action="store_true", help="Do not truncate PropertyLongName" ) subparser = parser.add_parser("add_user_property_data") subparser.set_defaults(parser="add_user_property_data") subparser.set_defaults(func=partial(self.create)) subparser.add_argument( "property_name", help="Q2_UserPropertyDataElement.ShortName" ) subparser.add_argument( "property_value", help="Q2_UserPropertyData.PropertyValue" ) subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID") subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID") subparser.add_argument( "-c", "--customer-id", help="Q2_UserPropertyData.CustomerID" ) subparser.add_argument( "-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID" ) subparser.add_argument("--ui-source", help="Q2_UISource.ShortName") subparser.add_argument( "--host-account-id", help="Q2_UserPropertyData.HostAccountID" ) subparser.add_argument("--product_id", help="Q2_UserPropertyData.ProductID") subparser.add_argument( "--product_type_id", help="Q2_UserPropertyData.ProductTypeID" ) subparser = parser.add_parser("update_user_property_data") subparser.set_defaults(parser="update_user_property_data") subparser.set_defaults(func=partial(self.update)) subparser.add_argument( "property_name", help="Q2_UserPropertyDataElements.PropertyName" ) subparser.add_argument( "property_value", help="Q2_UserPropertyData.PropertyValue" ) subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID") subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID") subparser.add_argument( "-c", "--customer-id", help="Q2_UserPropertyData.CustomerID" ) subparser.add_argument( "-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID" ) subparser.add_argument("--ui-source", help="Q2_UISource.ShortName") subparser.add_argument( "--host-account-id", help="Q2_UserPropertyData.HostAccountID" ) subparser = parser.add_parser("update_user_property_data_by_id") subparser.set_defaults(parser="update_user_property_data_by_id") subparser.set_defaults(func=partial(self.update_by_id)) subparser.add_argument( "user_property_data_id", help="Q2_UserPropertyData.UserPropertyDataID" ) subparser.add_argument( "property_name", help="Q2_UserPropertyDataElement.PropertyName" ) subparser.add_argument( "property_value", help="Q2_UserPropertyData.PropertyValue" ) subparser = parser.add_parser("remove_user_property_data") subparser.set_defaults(parser="remove_user_property_data") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument( "property_name", help="Q2_UserPropertyDataElement.ShortName" ) subparser.add_argument("-u", "--user-id", help="Q2_UserPropertyData.UserID") subparser.add_argument("-g", "--group-id", help="Q2_UserPropertyData.GroupID") subparser.add_argument( "-c", "--customer-id", help="Q2_UserPropertyData.CustomerID" ) subparser.add_argument( "-r", "--user-role-id", help="Q2_UserPropertyData.UserRoleID" ) subparser.add_argument( "--host-account-id", help="Q2_UserPropertyData.HostAccountID" ) subparser = parser.add_parser("get_user_property_data_by_value") subparser.set_defaults(parser="get_user_property_data_by_value") subparser.set_defaults(func=partial(self.get_by_value, serialize_for_cli=True)) subparser.add_argument( "property_value", help="Q2_UserPropertyData.PropertyValue" ) subparser.add_argument( "user_property_name", help="Q2_UserPropertyDataElements.PropertyName" ) subparser.add_argument( "--no-trunc", action="store_true", help="Do not truncate PropertyLongName" ) subparser = parser.add_parser("get_user_property_data_by_group") subparser.set_defaults(parser="get_user_property_data_by_group") subparser.set_defaults(func=partial(self.get_by_group, serialize_for_cli=True)) subparser.add_argument( "user_property_name", help="Q2_UserPropertyDataElements.PropertyName" )
[docs] async def get( self, user_property_name: str, user_id: Optional[int] = None, group_id: Optional[int] = None, customer_id: Optional[int] = None, host_account_id: Optional[int] = None, ui_source: Optional[str] = None, product_id: Optional[int] = None, product_type_id: Optional[int] = None, return_count=10, strict=True, no_trunc=False, serialize_for_cli=False, user_role_id: Optional[int] = None, ) -> list[UserPropertyDataRow]: truncate = not no_trunc sql_params = [] list_of_params = [ Param(user_property_name, D_TYPES.VarChar, "PropertyName"), Param(user_id, D_TYPES.Int, "UserID"), Param(user_role_id, D_TYPES.Int, "UserRoleID"), Param(group_id, D_TYPES.Int, "GroupID"), Param(customer_id, D_TYPES.Int, "CustomerID"), Param(host_account_id, D_TYPES.Int, "HostAccountID"), Param(ui_source, D_TYPES.VarChar, "UISource"), Param(product_id, D_TYPES.Int, "ProductID"), Param(product_type_id, D_TYPES.Int, "ProductTypeID"), Param(str(return_count), D_TYPES.Int, "ReturnCount"), Param(str(strict), D_TYPES.Bit, "Strict"), ] for item in list_of_params: item.add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params) ) if serialize_for_cli: fields_to_truncate = [] if truncate: fields_to_truncate = ["PropertyValue"] response = self.serialize_for_cli( response, fields_to_display=[ "PropertyID", "PropertyName", "UserID", "CustomerID", "GroupID", "UserRoleID", "HostAccountID", "PropertyValue", "ProductID", "ProductTypeID", ], fields_to_truncate=fields_to_truncate, ) return response
[docs] async def get_by_value( self, property_value: str, user_property_name: str, no_trunc=False, serialize_for_cli=False, ) -> list[UserPropertyDataRow]: truncate = not no_trunc sql_params = [] Param(property_value, D_TYPES.VarChar, "property_value").add_to_param_list( sql_params ) Param( user_property_name, D_TYPES.VarChar, "user_property_name" ).add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetUserPropertyDataByValue", ExecuteStoredProcedure.SqlParameters(sql_params), ) if serialize_for_cli: fields_to_truncate = [] if truncate: fields_to_truncate = ["TextValue"] response = self.serialize_for_cli( response, fields_to_display=[ "PropertyID", "UserID", "CustomerID", "UserRoleID", "GroupID", "HostAccountID", "PropertyValue", ], fields_to_truncate=fields_to_truncate, ) return response
[docs] async def get_by_property_id(self, property_id) -> list[UserPropertyDataRow]: """ This method will return user_property_data using by PropertyID. """ sql_params = [] Param(property_id, D_TYPES.Int, "property_id").add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetUserPropertyDataByPropertyID", ExecuteStoredProcedure.SqlParameters(sql_params), ) return response
[docs] async def get_by_group( self, user_property_name: str, serialize_for_cli=False ) -> list[UserPropertyDataRow]: """ This method will include the default property value in the return. The default value will be the only value that does not have a group id associated """ sql_params = [] Param(user_property_name, D_TYPES.VarChar, "PropertyName").add_to_param_list( sql_params ) response = await self.call_hq( "sdk_GetUserPropertyDataByGroup", ExecuteStoredProcedure.SqlParameters(sql_params), ) if serialize_for_cli: response = self.serialize_for_cli( response, fields_to_display=[ "PropertyID", "UserID", "CustomerID", "UserRoleID", "GroupID", "HostAccountID", "PropertyValue", ], ) return response
[docs] async def get_by_group_with_id( self, user_property_id: int ) -> list[UserPropertyDataRow]: """ This method will include the default property value in the return. The default value will be the only value that does not have a group id associated """ sql_params = [] Param(user_property_id, D_TYPES.Int, "PropertyID").add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetUserPropertyDataByGroup", ExecuteStoredProcedure.SqlParameters(sql_params), ) return response
[docs] async def get_by_user_id( self, user_id: int, user_property_name: str | None = None, user_property_id: int | None = None, ) -> list[UserPropertyDataRow]: if not any([user_property_name, user_property_id]): raise DatabaseDataError("Either property name or id is needed") sql_params = [] Param(user_id, D_TYPES.Int, "UserID").add_to_param_list(sql_params) if user_property_name: Param( user_property_name, D_TYPES.VarChar, "PropertyName" ).add_to_param_list(sql_params) if user_property_id: Param(user_property_id, D_TYPES.Int, "PropertyID").add_to_param_list( sql_params ) return await self.call_hq( "sdk_GetUserPropertyDataByUser", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def get_by_customer_id( self, customer_id: int, user_property_name: str | None = None, user_property_id: int | None = None, ) -> list[UserPropertyDataRow]: if not any([user_property_name, user_property_id]): raise DatabaseDataError("Either property name or id is needed") sql_params = [] Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(sql_params) if user_property_name: Param( user_property_name, D_TYPES.VarChar, "PropertyName" ).add_to_param_list(sql_params) if user_property_id: Param(user_property_id, D_TYPES.Int, "PropertyID").add_to_param_list( sql_params ) return await self.call_hq( "sdk_GetUserPropertyDataByCustomer", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def create( self, property_name: str, property_value: str | int | bool, user_id: Optional[int] = None, group_id: Optional[int] = None, customer_id: Optional[int] = None, host_account_id: Optional[int] = None, ui_source: Optional[str] = None, product_id: Optional[int] = None, product_type_id: Optional[int] = None, product_name: Optional[str] = None, product_type_name: Optional[str] = None, user_role_id: Optional[int] = None, ): if product_name and not product_id: product_db_obj = Product(self.logger, hq_credentials=self.hq_credentials) product_obj = await product_db_obj.get_by_name(product_name) product_id = product_obj.ProductID.pyval if product_type_name and not product_type_id: product_type_db_obj = ProductType( self.logger, hq_credentials=self.hq_credentials ) product_type_obj = await product_type_db_obj.get_by_name(product_type_name) product_type_id = product_type_obj.ProductTypeID.pyval sql_params = [] list_of_params = [ Param(user_id, D_TYPES.Int, "UserID"), Param(property_name, D_TYPES.VarChar, "PropertyName"), Param(str(property_value), D_TYPES.VarChar, "PropertyValue"), Param(ui_source, D_TYPES.VarChar, "UISource"), Param(group_id, D_TYPES.Int, "GroupID"), Param(customer_id, D_TYPES.Int, "CustomerID"), Param(host_account_id, D_TYPES.Int, "HostAccountID"), Param(product_id, D_TYPES.SmallInt, "ProductID"), Param(product_type_id, D_TYPES.SmallInt, "ProductTypeID"), Param(user_role_id, D_TYPES.Int, "UserRoleID"), ] for item in list_of_params: item.add_to_param_list(sql_params) return await self.call_hq( "sdk_AddUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params) )
[docs] async def update( self, property_name: str, property_value: str | int | bool, user_id: Optional[int] = None, group_id: Optional[int] = None, customer_id: Optional[int] = None, host_account_id: Optional[int] = None, ui_source: Optional[str] = None, product_id: Optional[int] = None, product_type_id: Optional[int] = None, product_name: Optional[str] = None, product_type_name: Optional[str] = None, user_role_id: Optional[int] = None, ): if product_name and not product_id: product_db_obj = Product(self.logger, hq_credentials=self.hq_credentials) product_obj = await product_db_obj.get_by_name(product_name) product_id = product_obj.ProductID.pyval if product_type_name and not product_type_id: product_type_db_obj = ProductType( self.logger, hq_credentials=self.hq_credentials ) product_type_obj = await product_type_db_obj.get_by_name(product_type_name) product_type_id = product_type_obj.ProductTypeID.pyval sql_params = [] list_of_params = [ Param(user_id, D_TYPES.Int, "UserID"), Param(property_name, D_TYPES.VarChar, "PropertyName"), Param(str(property_value), D_TYPES.VarChar, "PropertyValue"), Param(ui_source, D_TYPES.VarChar, "UISource"), Param(group_id, D_TYPES.Int, "GroupID"), Param(customer_id, D_TYPES.Int, "CustomerID"), Param(host_account_id, D_TYPES.Int, "HostAccountID"), Param(product_id, D_TYPES.SmallInt, "ProductID"), Param(product_type_id, D_TYPES.SmallInt, "ProductTypeID"), Param(user_role_id, D_TYPES.Int, "UserRoleID"), ] for item in list_of_params: item.add_to_param_list(sql_params) return await self.call_hq( "sdk_UpdateUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def update_by_id( self, user_property_data_id: int, property_name: str, property_value: str ): assert int(user_property_data_id) >= 1, ( "User Property Data ID must be an integer greater than 0" ) sql_params = [] Param( user_property_data_id, D_TYPES.Int, "UserPropertyDataId" ).add_to_param_list(sql_params) Param(property_name, D_TYPES.VarChar, "PropertyName").add_to_param_list( sql_params ) Param(str(property_value), D_TYPES.VarChar, "PropertyValue").add_to_param_list( sql_params ) return await self.call_hq( "sdk_UpdateUserPropertyDataById", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def create_or_update( self, property_name: str, property_value: str | int | bool, user_id: Optional[int] = None, group_id: Optional[int] = None, customer_id: Optional[int] = None, host_account_id: Optional[int] = None, ui_source: Optional[str] = None, product_id: Optional[int] = None, product_type_id: Optional[int] = None, product_name: Optional[str] = None, product_type_name: Optional[str] = None, user_role_id: Optional[int] = None, ): """ There may be times when we want to error if a row already exists (create only). This method exists in case you just want a row to be there after you're done calling the function, regardless of the initial state of the DB """ # This already exists logic is complicated, because even Strict mode against the DB only checks to see if there is # a row in the DB that could count here, even if it's inherited. This specifically asks if there's already a row # that matches exactly what we're looking for @dataclass class ParamToAttrMap: param: str | int | None attr_name: str matching_rows = await self.get( property_name, user_id=user_id, group_id=group_id, customer_id=customer_id, host_account_id=host_account_id, ui_source=ui_source, product_id=product_id, product_type_id=product_type_id, user_role_id=user_role_id, strict=True, ) already_exists = False if matching_rows: if not any(( user_id, group_id, customer_id, host_account_id, ui_source, product_id, product_type_id, product_name, product_type_name, )): already_exists = True else: mappings = ( ParamToAttrMap(user_id, "UserID"), ParamToAttrMap(group_id, "GroupID"), ParamToAttrMap(customer_id, "CustomerID"), ParamToAttrMap(host_account_id, "HostAccountID"), ParamToAttrMap(ui_source, "UISourceID"), ParamToAttrMap(product_id, "ProductID"), ParamToAttrMap(product_type_id, "ProductTypeID"), ParamToAttrMap(user_role_id, "UserRoleID"), ) for row in matching_rows: if already_exists: break for mapping in mappings: if mapping.param is not None: if mapping.param == getattr(row, mapping.attr_name): already_exists = True break if not already_exists: return await self.create( property_name, property_value, user_id=user_id, group_id=group_id, customer_id=customer_id, host_account_id=host_account_id, ui_source=ui_source, product_id=product_id, product_type_id=product_type_id, product_name=product_name, product_type_name=product_type_name, user_role_id=user_role_id, ) else: return await self.update( property_name, property_value, user_id=user_id, group_id=group_id, customer_id=customer_id, host_account_id=host_account_id, ui_source=ui_source, product_id=product_id, product_type_id=product_type_id, product_name=product_name, product_type_name=product_type_name, user_role_id=user_role_id, )
[docs] async def delete( self, property_name: str, user_id: Optional[int] = None, group_id: Optional[int] = None, customer_id: Optional[int] = None, host_account_id: Optional[int] = None, ui_source: Optional[str] = None, user_role_id: Optional[int] = None, **kwargs, ): sql_params = [] list_of_params = [ Param(user_id, D_TYPES.Int, "UserID"), Param(property_name, D_TYPES.VarChar, "PropertyName"), Param(ui_source, D_TYPES.VarChar, "UISource"), Param(group_id, D_TYPES.Int, "GroupID"), Param(customer_id, D_TYPES.Int, "CustomerID"), Param(host_account_id, D_TYPES.Int, "HostAccountID"), Param(user_role_id, D_TYPES.Int, "UserRoleID"), ] for item in list_of_params: item.add_to_param_list(sql_params) response = await self.call_hq( "sdk_RemoveUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params), ) if self.hq_response.success and response: audit_obj = AuditRecord(self.logger, hq_credentials=self.hq_credentials) deleted_rows = [x.__dict__ for x in response] details = f"Removed UserPropertyData {deleted_rows}" session = self.hq_response.headers.get("q2token") session_id = session if session else "000000000000000000000000" await audit_obj.create(details, session_id)
[docs] async def delete_all_property_data(self, property_name: str): sql_params = [] Param(property_name, D_TYPES.VarChar, "PropertyName").add_to_param_list( sql_params ) return await self.call_hq( "sdk_RemoveAllUserPropertyData", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def delete_default_value(self, short_name: str, is_form: bool): sql_params = [] Param(short_name, D_TYPES.VarChar, "short_name").add_to_param_list(sql_params) Param(is_form, D_TYPES.Bit, "is_form").add_to_param_list(sql_params) return await self.call_hq( "sdk_RemoveSDKNavDefaultValue", ExecuteStoredProcedure.SqlParameters(sql_params), )