from argparse import _SubParsersAction
from functools import partial
from typing import List, Optional
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from .audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class ReverseLookupRow(RepresentationRowBase):
reverseLookupKeyID: IntElement = "reverseLookupKeyID"
reverseLookupKeyName: StringElement = "reverseLookupKeyName"
UserID_: IntElement = "UserID_"
CustomerID_: IntElement = "CustomerID_"
lookupValue: StringElement = "lookupValue"
[docs]
class ReverseLookup(DbObject):
NAME = "ReverseLookup"
REPRESENTATION_ROW_CLASS = ReverseLookupRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_reverseLookup")
subparser.set_defaults(parser="get_reverseLookup")
subparser.add_argument("key_name")
subparser.add_argument("-u", "--user-id", help="Q2_User.UserID")
subparser.add_argument("-c", "--customer_id", help="Q2_User.CustomerID")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser = parser.add_parser("get_reverseLookup_by_key_value")
subparser.set_defaults(parser="get_reverseLookup_by_key_value")
subparser.add_argument("key_name")
subparser.add_argument("key_value")
subparser.set_defaults(func=partial(self.get_by_value, serialize_for_cli=True))
subparser = parser.add_parser("add_reverseLookup")
subparser.set_defaults(parser="add_reverseLookup")
subparser.add_argument("customer_id", type=int)
subparser.add_argument("user_id", type=int)
subparser.add_argument("key_name", type=str)
subparser.add_argument("key_value", type=str)
subparser.set_defaults(func=partial(self.add_update))
subparser = parser.add_parser("update_reverseLookup")
subparser.set_defaults(parser="update_reverseLookup")
subparser.add_argument("customer_id", type=int)
subparser.add_argument("user_id", type=int)
subparser.add_argument("key_name", type=str)
subparser.add_argument("key_value", type=str)
subparser.set_defaults(func=partial(self.add_update))
[docs]
async def get(
self,
key_name: str,
user_id: Optional[int] = None,
customer_id: Optional[int] = None,
no_trunc=False,
serialize_for_cli=False,
) -> List[ReverseLookupRow]:
truncate = not no_trunc
response = await self.call_hq(
"sdk_ReverseLookupGet",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"keyName",
str(key_name),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "userID", user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "customerID", customer_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyValue", None
),
]),
)
fields_to_truncate = []
if serialize_for_cli:
if truncate:
fields_to_truncate = ["lookupValue"]
response = self.serialize_for_cli(
response,
["UserID_", "CustomerID_", "lookupValue", "reverseLookupKeyName"],
fields_to_truncate=fields_to_truncate,
)
return response
[docs]
async def get_by_value(
self, key_name: str, key_value: str, no_trunc=False, serialize_for_cli=False
) -> List[ReverseLookupRow]:
truncate = not no_trunc
response = await self.call_hq(
"sdk_ReverseLookupGet",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyName", key_name
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyValue", key_value
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "userID", None
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "customerID", None
),
]),
)
fields_to_truncate = []
if serialize_for_cli:
if truncate:
fields_to_truncate = ["lookupValue"]
response = self.serialize_for_cli(
response,
["UserID_", "CustomerID_", "lookupValue", "reverseLookupKeyName"],
fields_to_truncate=fields_to_truncate,
)
return response
[docs]
async def add_update(
self, customer_id: int, user_id: str, key_name: str, key_value: str
):
sql_params = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyName", key_name
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "customerID", customer_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "userID", user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyValue", key_value
),
]
return await self.call_hq(
"sdk_ReverseLookupAddUpdate",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
[docs]
async def delete(
self,
key_name: str,
customer_id: int,
user_id: int,
key_value: str,
online_session: OnlineSession,
online_user: OnlineUser,
):
result = await self.call_hq(
"sdk_ReverseKeylookupDelete",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyName", key_name
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "customerID", customer_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "userID", user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "keyValue", key_value
),
]),
)
if self.hq_response.success:
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"reverseKeylookup deleted. customer_id: {customer_id} key_name: {key_name}",
online_session.session_id,
workstation_id=online_session.workstation,
customer_id=online_user.customer_id,
user_id=online_user.user_id,
user_logon_id=online_user.user_logon_id,
)
return result