Source code for q2_sdk.hq.db.audit_record

import uuid
from datetime import datetime
from functools import partial
from typing import List, Optional
from argparse import _SubParsersAction
from lxml.objectify import StringElement, IntElement

from q2_sdk.hq.hq_api.q2_api import CreateAuditRecord
from q2_sdk.hq.hq_api.wedge_online_banking import (
    CreateAuditRecord as CreateAuditRecordWOB,
)

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)

from .db_object import DbObject
from .representation_row_base import RepresentationRowBase


[docs] class AuditRecordRowBase(RepresentationRowBase): AuditAction: StringElement = "AuditAction" AuditID: IntElement = "AuditID" AdminUserLogonID: IntElement = "AdminUserLogonID" Workstation: StringElement = "Workstation" UISourceID: IntElement = "UISourceID" SessionId: IntElement = "SessionId" ActionID: IntElement = "ActionID" CustomerID: IntElement = "CustomerID" UserID: IntElement = "UserID" UserLogonID: IntElement = "UserLogonID" HostAccountID: IntElement = "HostAccountID" TransactionID: IntElement = "TransactionID" ErrorReturnCode: IntElement = "ErrorReturnCode" ExceptionMessage: StringElement = "ExceptionMessage" ClientAddress: StringElement = "ClientAddress" HydraID: IntElement = "HydraID" HydraIDName: StringElement = "HydraIDName" EndDateTime: datetime = "EndDateTime" DateTime: datetime = "DateTime"
[docs] class AuditRecord(DbObject): """ Allows for safe queries on the Q2_Audit table """ REPRESENTATION_ROW_CLASS = AuditRecordRowBase
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_audit_records") subparser.set_defaults(parser="get_audit_records") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("audit_action_name", help="Q2_AuditAction.ShortName") subparser.add_argument( "-d", "--days_ago", default=0, help="Days in the past to search (0 is today only)", ) subparser.add_argument( "-c", "--record_count", default=100, help="Number of records to return" ) subparser.add_argument( "-i", "--audit_id", default=None, help="Search for the specified audit record within the returned list", ) subparser = parser.add_parser("get_audit_session") subparser.set_defaults(parser="get_audit_session") subparser.set_defaults( func=partial(self.get_audit_session, serialize_for_cli=True) ) subparser.add_argument("session_id", help="self.online_session.session_id") subparser = parser.add_parser("get_audit_record_by_id") subparser.set_defaults(parser="get_audit_record_by_id") subparser.set_defaults(func=partial(self.get_by_id), serialize_for_cli=True) subparser.add_argument("audit_id", help="Q2_AuditAction.AuditID")
def _serialize_response(self, result): return self.serialize_for_cli( result, fields_to_display=["AuditAction", "DateTime", "Details", "HydraIDName"], fields_to_truncate=["Details"], )
[docs] async def get_by_id( self, audit_id: int, serialize_for_cli=False ) -> AuditRecordRowBase: response = await self.call_hq( "sdk_GetAuditRecords", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.BigInt, "audit_id", audit_id ) ]), ) if serialize_for_cli: response = self._serialize_response(response) else: response = response[0] if response else None return response
[docs] async def get_audit_session( self, session_id: str, serialize_for_cli=False ) -> list[AuditRecordRowBase]: """ Searches for audit records by session ids from the last three days """ response = await self.call_hq( "sdk_GetAuditSessionRecords", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "session_id", session_id, ) ]), ) if serialize_for_cli: response = self._serialize_response(response) return response
[docs] async def get( self, audit_action_name, days_ago=0, record_count=100, serialize_for_cli=False, audit_id: Optional[int] = None, **kwargs, ) -> List[AuditRecordRowBase]: response = await self.call_hq( "sdk_GetAuditRecords", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "audit_action_name", audit_action_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "days_ago", days_ago ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "record_count", record_count, ), ]), **kwargs, ) result = [] if audit_id: for item in response: if item.AuditID.text == str(audit_id): result.append(item) break else: result = response if serialize_for_cli: result = self._serialize_response(result) return result
[docs] async def create( self, audit_details: str, session_id: str, workstation_id: Optional[str] = None, exception_message: Optional[str] = None, customer_id: Optional[int] = -1, user_id: Optional[int] = -1, user_logon_id: Optional[int] = -1, host_account_id: Optional[int] = -1, audit_action_short_name="SDK", ): if workstation_id is None: workstation_id = str(uuid.uuid4()) if exception_message: error_return_code = -1 else: error_return_code = 0 hq_object = CreateAuditRecord parameter_dictionary = { "workstation": workstation_id, "ui_source": "API", "session_id": session_id, "audit_action_short_name": audit_action_short_name, "customer_id": customer_id, "user_id": user_id, "user_logon_id": user_logon_id, "host_account_id": host_account_id, "transaction_id": -1, "client_address": "", "audit_details": audit_details, "error_return_code": error_return_code, "exception_message": exception_message, "hq_credentials": self.hq_credentials, } parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary) if self.hq_credentials.auth_token: hq_object = CreateAuditRecordWOB parameter_dictionary["admin_user_logon_id"] = None parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary) await hq_object.execute(parameters)