Source code for q2_sdk.hq.db.audit_record

import uuid
from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from functools import partial
from typing import List, Optional, Union

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.hq_api.q2_api import CreateAuditRecord
from q2_sdk.hq.hq_api.wedge_online_banking import (
    CreateAuditRecord as CreateAuditRecordWOB,
)
from q2_sdk.hq.table_row import TableRow

from .db_object import DbObject
from ...core.exceptions import MissingDataError

SqlParameters = ExecuteStoredProcedure.SqlParameters
SqlParam = ExecuteStoredProcedure.SqlParam
DataType = ExecuteStoredProcedure.DataType


[docs] class AuditRecordRow( TableRow, rename_fields={ "ErrorReturnCode": "ErrorReturnText", "ErrorReturnCode1": "ErrorReturnCode", }, ): AuditAction: str AuditID: int AuditUUID: str AdminUserLogonID: int AdminUserLoginName: str Workstation: str UISourceID: int SessionId: str ActionID: int CustomerID: int UserID: int UserLogonID: int HostAccountID: int TransactionID: int ErrorReturnText: str ErrorReturnCode: int ExceptionMessage: str ClientAddress: str HydraID: int HydraIDName: str EndDateTime: datetime DateTime: datetime
[docs] class AuditRecordRowBase(AuditRecordRow): """ Deprecated This class is used for backwards compatiblity. It will be removed in a future version of the SDK. """ pass
[docs] class AuditFilterType(Enum): USERLOGON = "userlogon" UISOURCE = "uisource"
[docs] @dataclass class AuditFilter: filter_type: Optional[AuditFilterType] = None filter_value: Union[str, int] = None def __post_init__(self): if self.filter_type is not None and not self.filter_value: raise MissingDataError("Filter value is missing")
[docs] class AuditRecord(DbObject): """ Allows for safe queries on the Q2_Audit table """ REPRESENTATION_ROW_CLASS = AuditRecordRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_audit_records") subparser.set_defaults(parser="get_audit_records") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("audit_action_name", help="Q2_AuditAction.ShortName") subparser.add_argument( "-d", "--days_ago", default=0, help="Days in the past to search (0 is today only)", ) subparser.add_argument( "-c", "--record_count", default=100, help="Number of records to return" ) subparser.add_argument( "-i", "--audit_id", default=None, help="Search for the specified audit record within the returned list", ) subparser = parser.add_parser("get_audit_session") subparser.set_defaults(parser="get_audit_session") subparser.set_defaults( func=partial(self.get_audit_session, serialize_for_cli=True) ) subparser.add_argument("session_id", help="self.online_session.session_id") subparser = parser.add_parser("get_audit_record_by_id") subparser.set_defaults(parser="get_audit_record_by_id") subparser.set_defaults(func=partial(self.get_by_id), serialize_for_cli=True) subparser.add_argument("audit_id", help="Q2_AuditAction.AuditID") subparser = parser.add_parser("get_audit_by_user") subparser.set_defaults(parser="get_audit_by_user") subparser.set_defaults( func=partial(self._get_by_user_id_cli, serialize_for_cli=True) ) subparser.add_argument("user_id", type=int, help="Q2_User.UserID") subparser.add_argument( "-s", "--start_date", default=None, help="Start date in ISO format (defaults to 3 days ago)", ) subparser.add_argument( "-e", "--end_date", default=None, help="End date in ISO format (defaults to now)", ) subparser.add_argument( "-t", "--filter-type", default=None, help="filter type ('userlogon' or 'uisource')", ) subparser.add_argument( "-v", "--filter-value", default=None, help="filter value (value of 'userlogon' or 'uisource')", ) subparser.add_argument( "-r", "--record-count", default=None, help="Number of records to be returned", )
def _serialize_response(self, result): return self.serialize_for_cli( result, fields_to_display=["AuditAction", "DateTime", "Details", "HydraIDName"], fields_to_truncate=["Details"], )
[docs] async def get_by_id(self, audit_id: int, serialize_for_cli=False) -> AuditRecordRow: response = await self.call_hq( "sdk_GetAuditRecords", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.BigInt, "audit_id", audit_id ) ]), force_q2_api=True, ) if serialize_for_cli: response = self._serialize_response(response) else: response = response[0] if response else None return response
[docs] async def get_audit_session( self, session_id: str, serialize_for_cli=False ) -> list[AuditRecordRow]: """ Searches for audit records by session ids from the last three days """ response = await self.call_hq( "sdk_GetAuditSessionRecords", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "session_id", session_id, ) ]), force_q2_api=True, ) if serialize_for_cli: response = self._serialize_response(response) return response
[docs] async def get( self, audit_action_name, days_ago=0, record_count=100, serialize_for_cli=False, audit_id: Optional[int] = None, **kwargs, ) -> List[AuditRecordRow]: response = await self.call_hq( "sdk_GetAuditRecords", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "audit_action_name", audit_action_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "days_ago", days_ago ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "record_count", record_count, ), ]), force_q2_api=True, **kwargs, ) result = [] if audit_id: for item in response: if item.AuditID.text == str(audit_id): result.append(item) break else: result = response if serialize_for_cli: result = self._serialize_response(result) return result
[docs] async def create( self, audit_details: str, session_id: str, workstation_id: Optional[str] = None, exception_message: Optional[str] = None, customer_id: Optional[int] = -1, user_id: Optional[int] = -1, user_logon_id: Optional[int] = -1, host_account_id: Optional[int] = -1, transaction_id: Optional[int] = -1, ui_source: str = "API", audit_action_short_name="SDK", client_address: Optional[str] = None, error_return_code: Optional[int] = None, ): if workstation_id is None: workstation_id = str(uuid.uuid4()) if client_address is None: client_address = "" if error_return_code is None: # Default behavior: -1 for errors, 0 for success if exception_message: error_return_code = -1 else: error_return_code = 0 elif error_return_code > 0: # Positive error codes become null in DB, which is not desired raise ValueError( f"error_return_code must be <= 0 to be stored in database. " f"Received: {error_return_code}. Use a negative value instead." ) hq_object = CreateAuditRecord parameter_dictionary = { "workstation": workstation_id, "ui_source": ui_source, "session_id": session_id, "audit_action_short_name": audit_action_short_name, "customer_id": customer_id, "user_id": user_id, "user_logon_id": user_logon_id, "host_account_id": host_account_id, "transaction_id": transaction_id, "client_address": client_address, "audit_details": audit_details, "error_return_code": error_return_code, "exception_message": exception_message, "hq_credentials": self.hq_credentials, } parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary) if self.hq_credentials.auth_token: hq_object = CreateAuditRecordWOB parameter_dictionary["admin_user_logon_id"] = None parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary) await hq_object.execute(parameters)
async def _get_by_user_id_cli( self, user_id: int, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, filter_type: Optional[str] = None, filter_value: Union[str, int] = None, record_count=100, serialize_for_cli: bool = False, ) -> List[AuditRecordRow]: audit_filter = None if filter_type is not None: audit_filter = AuditFilter( filter_type=AuditFilterType(filter_type), filter_value=filter_value, ) return await self.get_by_user_id( user_id=user_id, start_date=start_date, end_date=end_date, audit_filter=audit_filter, record_count=record_count, serialize_for_cli=serialize_for_cli, )
[docs] def get_data_type(self, filter_type): if filter_type.value == "userlogon": return ExecuteStoredProcedure.DataType.Int return ExecuteStoredProcedure.DataType.VarChar
[docs] async def get_by_user_id( self, user_id: int, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, audit_filter: Optional[AuditFilter] = None, record_count=100, serialize_for_cli: bool = False, ) -> List[AuditRecordRow]: """ Searches for audit records by user ID within a date range. :param user_id: The Q2_User.UserID to search for :param start_date: Start of date range in ISO format (defaults to 3 days ago) :param end_date: End of date range in ISO format (defaults to now) :param audit_filter: AuditFilter with filter_type and filter_value :param record_count: Number of records to be returned, defaults to 100 :param serialize_for_cli: If True, returns formatted string for CLI output :return: List of AuditRecordRow objects """ sql_params = [] start_date = start_date or (datetime.now() - timedelta(days=3)) end_date = end_date or datetime.now() sql_params.append(SqlParam(DataType.Int, "user_id", user_id)) if audit_filter is not None and audit_filter.filter_type is not None: sql_params.append( SqlParam(DataType.VarChar, "where_type", audit_filter.filter_type.value) ) sql_params.append( SqlParam( self.get_data_type(audit_filter.filter_type), "where_val", audit_filter.filter_value, ) ) sql_params.append( SqlParam(DataType.DateTime, "start_date", start_date.isoformat()) ) sql_params.append(SqlParam(DataType.DateTime, "end_date", end_date.isoformat())) sql_params.append(SqlParam(DataType.Int, "record_count", record_count)) sql_parameters = SqlParameters(sql_params) response = await self.call_hq( "sdk_GetAuditRecordsByUser", sql_parameters=sql_parameters, force_q2_api=True, ) if serialize_for_cli: response = self._serialize_response(response) return response