Source code for q2_sdk.hq.db.reverse_lookup

from argparse import _SubParsersAction
from functools import partial
from typing import List, Optional
from lxml.objectify import IntElement, StringElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser

from .audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase


[docs] class ReverseLookupRow(RepresentationRowBase): reverseLookupKeyID: IntElement = "reverseLookupKeyID" reverseLookupKeyName: StringElement = "reverseLookupKeyName" UserID_: IntElement = "UserID_" CustomerID_: IntElement = "CustomerID_" lookupValue: StringElement = "lookupValue"
[docs] class ReverseLookup(DbObject): NAME = "ReverseLookup" REPRESENTATION_ROW_CLASS = ReverseLookupRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_reverseLookup") subparser.set_defaults(parser="get_reverseLookup") subparser.add_argument("key_name") subparser.add_argument("-u", "--user-id", help="Q2_User.UserID") subparser.add_argument("-c", "--customer_id", help="Q2_User.CustomerID") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser = parser.add_parser("get_reverseLookup_by_key_value") subparser.set_defaults(parser="get_reverseLookup_by_key_value") subparser.add_argument("key_name") subparser.add_argument("key_value") subparser.set_defaults(func=partial(self.get_by_value, serialize_for_cli=True)) subparser = parser.add_parser("add_reverseLookup") subparser.set_defaults(parser="add_reverseLookup") subparser.add_argument("customer_id", type=int) subparser.add_argument("user_id", type=int) subparser.add_argument("key_name", type=str) subparser.add_argument("key_value", type=str) subparser.set_defaults(func=partial(self.add_update)) subparser = parser.add_parser("update_reverseLookup") subparser.set_defaults(parser="update_reverseLookup") subparser.add_argument("customer_id", type=int) subparser.add_argument("user_id", type=int) subparser.add_argument("key_name", type=str) subparser.add_argument("key_value", type=str) subparser.set_defaults(func=partial(self.add_update))
[docs] async def get( self, key_name: str, user_id: Optional[int] = None, customer_id: Optional[int] = None, no_trunc=False, serialize_for_cli=False, ) -> List[ReverseLookupRow]: truncate = not no_trunc response = await self.call_hq( "sdk_ReverseLookupGet", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyName", str(key_name), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "userID", user_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "customerID", customer_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyValue", None ), ]), ) fields_to_truncate = [] if serialize_for_cli: if truncate: fields_to_truncate = ["lookupValue"] response = self.serialize_for_cli( response, ["UserID_", "CustomerID_", "lookupValue", "reverseLookupKeyName"], fields_to_truncate=fields_to_truncate, ) return response
[docs] async def get_by_value( self, key_name: str, key_value: str, no_trunc=False, serialize_for_cli=False ) -> List[ReverseLookupRow]: truncate = not no_trunc response = await self.call_hq( "sdk_ReverseLookupGet", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyName", key_name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyValue", key_value ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "userID", None ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "customerID", None ), ]), ) fields_to_truncate = [] if serialize_for_cli: if truncate: fields_to_truncate = ["lookupValue"] response = self.serialize_for_cli( response, ["UserID_", "CustomerID_", "lookupValue", "reverseLookupKeyName"], fields_to_truncate=fields_to_truncate, ) return response
[docs] async def add_update( self, customer_id: int, user_id: str, key_name: str, key_value: str ): sql_params = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyName", key_name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "customerID", customer_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "userID", user_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyValue", key_value ), ] return await self.call_hq( "sdk_ReverseLookupAddUpdate", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def delete( self, key_name: str, customer_id: int, user_id: int, key_value: str, online_session: OnlineSession, online_user: OnlineUser, ): result = await self.call_hq( "sdk_ReverseKeylookupDelete", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyName", key_name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "customerID", customer_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "userID", user_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "keyValue", key_value ), ]), ) if self.hq_response.success: audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( f"reverseKeylookup deleted. customer_id: {customer_id} key_name: {key_name}", online_session.session_id, workstation_id=online_session.workstation, customer_id=online_user.customer_id, user_id=online_user.user_id, user_logon_id=online_user.user_logon_id, ) return result