from argparse import _SubParsersAction
from functools import partial
from datetime import datetime
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
class UserDataRow(RepresentationRowBase):
DataID: IntElement = "DataID"
ShortName: StringElement = "ShortName"
UserID: IntElement = "UserID"
GTDataValue: StringElement = "GTDataValue"
FileID: IntElement = "FileID"
CreateDate: StringElement = "CreateDate"
DisplayName: StringElement = "DisplayName"
[docs]
class UserDataWithUserInfoRow(RepresentationRowBase):
DataID: IntElement = "DataID"
ShortName: StringElement = "ShortName"
UserID: IntElement = "UserID"
GTDataValue: StringElement = "GTDataValue"
FileID: IntElement = "FileID"
CreateDate: StringElement = "CreateDate"
DisplayName: StringElement = "DisplayName"
FirstName: StringElement = "FirstName"
LastName: StringElement = "LastName"
[docs]
class UserData(DbObject):
"""
Allows for DB storage on a per user level. Combines the Q2_UserData and Q2_UserDataElements table.
"""
GET_BY_NAME_KEY = "ShortName"
NAME = "UserData"
REPRESENTATION_ROW_CLASS = UserDataRow
RESPONSE_FIELDS = [
"DataID",
"ShortName",
"UserID",
"GTDataValue",
"FileID",
"CreateDate",
"DisplayName",
]
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_user_data")
subparser.set_defaults(parser="get_user_data")
subparser.add_argument("user_id", type=int, help="Q2_User.UserID")
subparser.add_argument("short_name", help="Q2_UserDataElement.ShortName")
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate GTDataValue"
)
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser = parser.add_parser("get_user_data_all")
subparser.set_defaults(parser="get_user_data_all")
subparser.add_argument("user_id", type=int, help="Q2_User.userID")
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate GTDataValue"
)
subparser.set_defaults(func=partial(self.get_multi, serialize_for_cli=True))
subparser = parser.add_parser("get_user_data_by_value")
subparser.set_defaults(parser="get_user_data_by_value")
subparser.add_argument("short_name", help="Q2_UserDataElement.ShortName")
subparser.add_argument("value", help="Q2_UserDataElement.GTDataValue")
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate GTDataValue"
)
subparser.set_defaults(func=partial(self.get_by_value, serialize_for_cli=True))
subparser = parser.add_parser("get_user_data_by_short_name")
subparser.set_defaults(parser="get_user_data_by_short_name")
subparser.add_argument("short_name", help="Q2_UserDataElement.ShortName")
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate GTDataValue"
)
subparser.set_defaults(
func=partial(self.get_by_short_name, serialize_for_cli=True)
)
subparser = parser.add_parser("add_user_data")
subparser.set_defaults(parser="add_user_data")
subparser.set_defaults(func=partial(self.create))
subparser.add_argument("user_id", type=int, help="Q2_UserData.UserID")
subparser.add_argument("short_name", help="Q2_UserDataElement.ShortName")
subparser.add_argument("value", help="Q2_UserData.GTDataValue")
subparser = parser.add_parser("remove_user_data")
subparser.set_defaults(parser="remove_user_data")
subparser.set_defaults(
func=partial(
self.delete, online_session=OnlineSession(), online_user=OnlineUser()
)
)
subparser.add_argument("user_id", type=int, help="Q2_UserData.UserID")
subparser.add_argument("data_id", type=int, help="Q2_UserData.DataID")
[docs]
async def get(
self, user_id: int, short_name: str, no_trunc=False, serialize_for_cli=False
) -> list[UserDataRow]:
truncate = not no_trunc
sql_params = []
Param(str(user_id), D_TYPES.Int, "userID").add_to_param_list(sql_params)
Param(short_name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params)
response = await self.call_hq(
"Q2_GetUserData", ExecuteStoredProcedure.SqlParameters(sql_params)
)
fields_to_truncate = []
if serialize_for_cli:
if truncate:
fields_to_truncate = ["GTDataValue"]
response = self.serialize_for_cli(
response, self.RESPONSE_FIELDS, fields_to_truncate=fields_to_truncate
)
return response
[docs]
async def get_by_create_date_range(
self, short_name: str, start_date: datetime, end_date: datetime
) -> list[UserDataRow]:
"""
Get a list of user data rows filtered by short name where the CreateDate is within a date range
:param: short_name: The short name of the user data element
:param: start_date: The start date of the date range, as a datetime object
:param: end_date: The end date of the date range, as a datetime object
:return: A list of user data rows that fit the parameters
"""
sql_params = []
Param(short_name, D_TYPES.VarChar, "shortName").add_to_param_list(sql_params)
Param(
start_date.isoformat(), D_TYPES.DateTime, "fromCreateDate"
).add_to_param_list(sql_params)
Param(end_date.isoformat(), D_TYPES.DateTime, "toCreateDate").add_to_param_list(
sql_params
)
response = await self.call_hq(
"sdk_GetUserDataByShortNameCreateDate",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
return response
[docs]
async def get_by_short_name_paginated(
self,
short_name: str | list[str],
page_number: int = 1,
page_size: int = 200,
exclude_empty_values=False,
) -> list[UserDataWithUserInfoRow]:
"""
Get all the user data rows that match the short name with pagination
:param: short_name: the short name of the user data element. Can be a string or a list of strings
:param: page_number: the starting point for pagination. Defaults to 1
:param: page_size: The number of transactions to get per page
:param: exclude_empty_values: If true, null or empty string GTDataValues will be excluded from the response
"""
offset = (page_number - 1) * page_size
assert offset >= 0, "page_number must be 1 or greater"
sql_params = []
if isinstance(short_name, list):
short_name = ",".join(short_name)
if len(short_name) > 1000:
raise DatabaseDataError("Too many short names. Search would be truncated.")
Param(short_name, D_TYPES.VarChar, "shortName").add_to_param_list(sql_params)
Param(offset, D_TYPES.Int, "offset").add_to_param_list(sql_params)
Param(page_size, D_TYPES.Int, "returnCount").add_to_param_list(sql_params)
stored_proc = "sdk_GetUserDataByShortNamePaginated"
if exclude_empty_values:
stored_proc = "sdk_GetUserDataByShortNamePaginatedExcludeEmpty"
response = await self.call_hq(
stored_proc, ExecuteStoredProcedure.SqlParameters(sql_params)
)
return response
[docs]
async def get_by_short_name_count(
self, short_name: str | list[str], exclude_empty_values=False
) -> int:
"""
Get all the user data rows that match the short name with pagination
:param: short_name: the short name of the user data element. Can be a string or a list of strings
:param: exclude_empty_values: If true, null or empty string GTDataValues will be excluded from the response
"""
if isinstance(short_name, list):
short_name = ",".join(short_name)
if len(short_name) > 1000:
raise DatabaseDataError("Too many short names. Search would be truncated.")
sql_params = []
Param(short_name, D_TYPES.VarChar, "shortName").add_to_param_list(sql_params)
stored_proc = "sdk_GetUserDataByShortNameCount"
if exclude_empty_values:
stored_proc = "sdk_GetUserDataByShortNameCountExcludeEmpty"
response = await self.call_hq(
stored_proc, ExecuteStoredProcedure.SqlParameters(sql_params)
)
return response[0].RowsFound.pyval
[docs]
async def get_by_short_name(
self, short_name: str, no_trunc=False, serialize_for_cli=False
) -> list[UserDataRow]:
"""This will get all rows that have this short_name available in Q2_UserData"""
truncate = not no_trunc
sql_params = []
Param(short_name, D_TYPES.VarChar, "short_name").add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserDataByShortName",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
fields_to_truncate = []
if serialize_for_cli:
if truncate:
fields_to_truncate = ["GTDataValue"]
response = self.serialize_for_cli(
response, self.RESPONSE_FIELDS, fields_to_truncate=fields_to_truncate
)
return response
[docs]
async def get_by_value(
self, short_name: str, value: str, no_trunc=False, serialize_for_cli=False
) -> list[UserDataRow]:
truncate = not no_trunc
sql_params = []
Param(value, D_TYPES.VarChar, "value").add_to_param_list(sql_params)
Param(short_name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserDataByValue", ExecuteStoredProcedure.SqlParameters(sql_params)
)
fields_to_truncate = []
if serialize_for_cli:
if truncate:
fields_to_truncate = ["GTDataValue"]
response = self.serialize_for_cli(
response, self.RESPONSE_FIELDS, fields_to_truncate=fields_to_truncate
)
return response
[docs]
async def get_multi(
self, user_id: int, serialize_for_cli=False, no_trunc=False
) -> list[UserDataRow]:
"""
Returns all user data associated with the given user id
:param user_id: user identifier
:return: user data values for the associated user
"""
truncate = not no_trunc
sql_params = []
Param(str(user_id), D_TYPES.Int, "user_id").add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUserDataByID", ExecuteStoredProcedure.SqlParameters(sql_params)
)
if serialize_for_cli:
fields_to_truncate = []
if truncate:
fields_to_truncate = ["GTDataValue"]
response = self.serialize_for_cli(
response, self.RESPONSE_FIELDS, fields_to_truncate=fields_to_truncate
)
return response
[docs]
async def create(self, user_id: int, short_name: str, value: str):
sql_params = []
Param(str(user_id), D_TYPES.Int, "userID").add_to_param_list(sql_params)
Param(short_name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params)
Param(value, D_TYPES.VarChar, "data").add_to_param_list(sql_params)
table_limit = 512
if len(str(value)) > table_limit:
self.logger.warning(
"DB Table enforces a 512 limit on value size. Data will be truncated. "
"Please consider entering a key into this table and using it to query another "
"system"
)
return await self.call_hq(
"Q2_AddUpdateUserData", ExecuteStoredProcedure.SqlParameters(sql_params)
)
[docs]
async def update(self, user_id: int, short_name: str, value: str):
return await self.create(user_id, short_name, value)
[docs]
async def delete(
self,
user_id: int,
data_id: int,
online_session: OnlineSession,
online_user: OnlineUser,
):
sql_params = []
Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(sql_params)
Param(data_id, D_TYPES.Int, "data_id").add_to_param_list(sql_params)
result = await self.call_hq(
"sdk_RemoveUserData", ExecuteStoredProcedure.SqlParameters(sql_params)
)
if self.hq_response.success:
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"UserData deleted. user_id: {user_id} data_id: {data_id}",
online_session.session_id,
workstation_id=online_session.workstation,
customer_id=online_user.customer_id,
user_id=online_user.user_id,
user_logon_id=online_user.user_logon_id,
)
return result