import uuid
from argparse import _SubParsersAction
from datetime import datetime
from functools import partial
from typing import List, Optional
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.hq_api.q2_api import CreateAuditRecord
from q2_sdk.hq.hq_api.wedge_online_banking import (
CreateAuditRecord as CreateAuditRecordWOB,
)
from q2_sdk.hq.table_row import TableRow
from .db_object import DbObject
[docs]
class AuditRecordRow(TableRow):
AuditAction: str
AuditID: int
AdminUserLogonID: int
Workstation: str
UISourceID: int
SessionId: int
ActionID: int
CustomerID: int
UserID: int
UserLogonID: int
HostAccountID: int
TransactionID: int
ErrorReturnCode: int
ExceptionMessage: str
ClientAddress: str
HydraID: int
HydraIDName: str
EndDateTime: datetime
DateTime: datetime
[docs]
class AuditRecord(DbObject):
"""
Allows for safe queries on the Q2_Audit table
"""
REPRESENTATION_ROW_CLASS = AuditRecordRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_audit_records")
subparser.set_defaults(parser="get_audit_records")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument("audit_action_name", help="Q2_AuditAction.ShortName")
subparser.add_argument(
"-d",
"--days_ago",
default=0,
help="Days in the past to search (0 is today only)",
)
subparser.add_argument(
"-c", "--record_count", default=100, help="Number of records to return"
)
subparser.add_argument(
"-i",
"--audit_id",
default=None,
help="Search for the specified audit record within the returned list",
)
subparser = parser.add_parser("get_audit_session")
subparser.set_defaults(parser="get_audit_session")
subparser.set_defaults(
func=partial(self.get_audit_session, serialize_for_cli=True)
)
subparser.add_argument("session_id", help="self.online_session.session_id")
subparser = parser.add_parser("get_audit_record_by_id")
subparser.set_defaults(parser="get_audit_record_by_id")
subparser.set_defaults(func=partial(self.get_by_id), serialize_for_cli=True)
subparser.add_argument("audit_id", help="Q2_AuditAction.AuditID")
def _serialize_response(self, result):
return self.serialize_for_cli(
result,
fields_to_display=["AuditAction", "DateTime", "Details", "HydraIDName"],
fields_to_truncate=["Details"],
)
[docs]
async def get_by_id(self, audit_id: int, serialize_for_cli=False) -> AuditRecordRow:
response = await self.call_hq(
"sdk_GetAuditRecords",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.BigInt, "audit_id", audit_id
)
]),
)
if serialize_for_cli:
response = self._serialize_response(response)
else:
response = response[0] if response else None
return response
[docs]
async def get_audit_session(
self, session_id: str, serialize_for_cli=False
) -> list[AuditRecordRow]:
"""
Searches for audit records by session ids from the last three days
"""
response = await self.call_hq(
"sdk_GetAuditSessionRecords",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"session_id",
session_id,
)
]),
)
if serialize_for_cli:
response = self._serialize_response(response)
return response
[docs]
async def get(
self,
audit_action_name,
days_ago=0,
record_count=100,
serialize_for_cli=False,
audit_id: Optional[int] = None,
**kwargs,
) -> List[AuditRecordRow]:
response = await self.call_hq(
"sdk_GetAuditRecords",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"audit_action_name",
audit_action_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "days_ago", days_ago
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"record_count",
record_count,
),
]),
**kwargs,
)
result = []
if audit_id:
for item in response:
if item.AuditID.text == str(audit_id):
result.append(item)
break
else:
result = response
if serialize_for_cli:
result = self._serialize_response(result)
return result
[docs]
async def create(
self,
audit_details: str,
session_id: str,
workstation_id: Optional[str] = None,
exception_message: Optional[str] = None,
customer_id: Optional[int] = -1,
user_id: Optional[int] = -1,
user_logon_id: Optional[int] = -1,
host_account_id: Optional[int] = -1,
audit_action_short_name="SDK",
):
if workstation_id is None:
workstation_id = str(uuid.uuid4())
if exception_message:
error_return_code = -1
else:
error_return_code = 0
hq_object = CreateAuditRecord
parameter_dictionary = {
"workstation": workstation_id,
"ui_source": "API",
"session_id": session_id,
"audit_action_short_name": audit_action_short_name,
"customer_id": customer_id,
"user_id": user_id,
"user_logon_id": user_logon_id,
"host_account_id": host_account_id,
"transaction_id": -1,
"client_address": "",
"audit_details": audit_details,
"error_return_code": error_return_code,
"exception_message": exception_message,
"hq_credentials": self.hq_credentials,
}
parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary)
if self.hq_credentials.auth_token:
hq_object = CreateAuditRecordWOB
parameter_dictionary["admin_user_logon_id"] = None
parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary)
await hq_object.execute(parameters)