import uuid
from datetime import datetime
from functools import partial
from typing import List, Optional
from argparse import _SubParsersAction
from lxml.objectify import StringElement, IntElement
from q2_sdk.hq.hq_api.q2_api import CreateAuditRecord
from q2_sdk.hq.hq_api.wedge_online_banking import (
CreateAuditRecord as CreateAuditRecordWOB,
)
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class AuditRecordRowBase(RepresentationRowBase):
AuditAction: StringElement = "AuditAction"
AuditID: IntElement = "AuditID"
AdminUserLogonID: IntElement = "AdminUserLogonID"
Workstation: StringElement = "Workstation"
UISourceID: IntElement = "UISourceID"
SessionId: IntElement = "SessionId"
ActionID: IntElement = "ActionID"
CustomerID: IntElement = "CustomerID"
UserID: IntElement = "UserID"
UserLogonID: IntElement = "UserLogonID"
HostAccountID: IntElement = "HostAccountID"
TransactionID: IntElement = "TransactionID"
ErrorReturnCode: IntElement = "ErrorReturnCode"
ExceptionMessage: StringElement = "ExceptionMessage"
ClientAddress: StringElement = "ClientAddress"
HydraID: IntElement = "HydraID"
HydraIDName: StringElement = "HydraIDName"
EndDateTime: datetime = "EndDateTime"
DateTime: datetime = "DateTime"
[docs]
class AuditRecord(DbObject):
"""
Allows for safe queries on the Q2_Audit table
"""
REPRESENTATION_ROW_CLASS = AuditRecordRowBase
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_audit_records")
subparser.set_defaults(parser="get_audit_records")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument("audit_action_name", help="Q2_AuditAction.ShortName")
subparser.add_argument(
"-d",
"--days_ago",
default=0,
help="Days in the past to search (0 is today only)",
)
subparser.add_argument(
"-c", "--record_count", default=100, help="Number of records to return"
)
subparser.add_argument(
"-i",
"--audit_id",
default=None,
help="Search for the specified audit record within the returned list",
)
subparser = parser.add_parser("get_audit_session")
subparser.set_defaults(parser="get_audit_session")
subparser.set_defaults(
func=partial(self.get_audit_session, serialize_for_cli=True)
)
subparser.add_argument("session_id", help="self.online_session.session_id")
subparser = parser.add_parser("get_audit_record_by_id")
subparser.set_defaults(parser="get_audit_record_by_id")
subparser.set_defaults(func=partial(self.get_by_id), serialize_for_cli=True)
subparser.add_argument("audit_id", help="Q2_AuditAction.AuditID")
def _serialize_response(self, result):
return self.serialize_for_cli(
result,
fields_to_display=["AuditAction", "DateTime", "Details", "HydraIDName"],
fields_to_truncate=["Details"],
)
[docs]
async def get_by_id(
self, audit_id: int, serialize_for_cli=False
) -> AuditRecordRowBase:
response = await self.call_hq(
"sdk_GetAuditRecords",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.BigInt, "audit_id", audit_id
)
]),
)
if serialize_for_cli:
response = self._serialize_response(response)
else:
response = response[0] if response else None
return response
[docs]
async def get_audit_session(
self, session_id: str, serialize_for_cli=False
) -> list[AuditRecordRowBase]:
"""
Searches for audit records by session ids from the last three days
"""
response = await self.call_hq(
"sdk_GetAuditSessionRecords",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"session_id",
session_id,
)
]),
)
if serialize_for_cli:
response = self._serialize_response(response)
return response
[docs]
async def get(
self,
audit_action_name,
days_ago=0,
record_count=100,
serialize_for_cli=False,
audit_id: Optional[int] = None,
**kwargs,
) -> List[AuditRecordRowBase]:
response = await self.call_hq(
"sdk_GetAuditRecords",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"audit_action_name",
audit_action_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "days_ago", days_ago
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"record_count",
record_count,
),
]),
**kwargs,
)
result = []
if audit_id:
for item in response:
if item.AuditID.text == str(audit_id):
result.append(item)
break
else:
result = response
if serialize_for_cli:
result = self._serialize_response(result)
return result
[docs]
async def create(
self,
audit_details: str,
session_id: str,
workstation_id: Optional[str] = None,
exception_message: Optional[str] = None,
customer_id: Optional[int] = -1,
user_id: Optional[int] = -1,
user_logon_id: Optional[int] = -1,
host_account_id: Optional[int] = -1,
audit_action_short_name="SDK",
):
if workstation_id is None:
workstation_id = str(uuid.uuid4())
if exception_message:
error_return_code = -1
else:
error_return_code = 0
hq_object = CreateAuditRecord
parameter_dictionary = {
"workstation": workstation_id,
"ui_source": "API",
"session_id": session_id,
"audit_action_short_name": audit_action_short_name,
"customer_id": customer_id,
"user_id": user_id,
"user_logon_id": user_logon_id,
"host_account_id": host_account_id,
"transaction_id": -1,
"client_address": "",
"audit_details": audit_details,
"error_return_code": error_return_code,
"exception_message": exception_message,
"hq_credentials": self.hq_credentials,
}
parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary)
if self.hq_credentials.auth_token:
hq_object = CreateAuditRecordWOB
parameter_dictionary["admin_user_logon_id"] = None
parameters = hq_object.ParamsObj(self.logger, **parameter_dictionary)
await hq_object.execute(parameters)