from argparse import _SubParsersAction
from functools import partial
from typing import List, Optional
from lxml.objectify import StringElement, IntElement, BoolElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.tools.decorators import dev_only
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class AuditActionRow(RepresentationRowBase):
ActionID: IntElement = "ActionID"
ShortName: StringElement = "ShortName"
Description: StringElement = "Description"
CategoryID: IntElement = "CategoryID"
LogThisAction: BoolElement = "LogThisAction"
ShowInCentral: BoolElement = "ShowInCentral"
SyslogThisAction: BoolElement = "SyslogThisAction"
IsRiskAction: BoolElement = "IsRiskAction"
XslTransform: StringElement = "XslTransform"
AuditThisAction: BoolElement = "AuditThisAction"
DynNotContextBitflag: BoolElement = "DynNotContextBitflag"
EnablePolicyWedgeCall: BoolElement = "EnablePolicyWedgeCall"
PolicyWedgeCallAdditinalDataStoredProc: StringElement = (
"PolicyWedgeCallAdditinalDataStoredProc"
)
EnableUserStatusCheck: BoolElement = "EnableUserStatusCheck"
BeforeActionWedgeAddressID: IntElement = "BeforeActionWedgeAddressID"
AfterActionWedgeAddressID: IntElement = "AfterActionWedgeAddressID"
ActionName: StringElement = "ActionName"
[docs]
class AuditAction(DbObject):
"""
Allows for specific actions that need to be audited to be defined and used in the Q2_Audit table
"""
REPRESENTATION_ROW_CLASS = AuditActionRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_audit_actions")
subparser.set_defaults(parser="get_audit_actions")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument(
"--no-trunc", action="store_true", help="Do not truncate Description"
)
subparser.add_argument("-s", "--short-name", help="Q2_AuditAction.ShortName")
subparser.add_argument(
"-w",
"--wedge-address-type-name",
help="Q2_AuditAction.WedgeAddressTypeName",
)
subparser = parser.add_parser("add_audit_action")
subparser.set_defaults(parser="add_audit_action")
subparser.set_defaults(func=partial(self.create))
subparser.add_argument("short_name", help="Q2_AuditAction.ShortName")
subparser.add_argument("description", help="Q2_AuditAction.Description")
subparser.add_argument("category", help="Q2_AuditCategory.ShortName")
subparser = parser.add_parser(
"update_audit_after_action",
help="Sets the Q2_AuditAction.AfterActionWedgeAddressID column",
)
subparser.set_defaults(parser="update_audit_after_action")
subparser.set_defaults(func=partial(self.update_after_action_wa))
subparser.add_argument("short_name", help="Q2_AuditAction.ShortName")
subparser.add_argument(
"-i",
"--wedge_address_id",
type=int,
help="Q2_WedgeAddress.WedgeAddressID (Leave None to unset)",
)
subparser = parser.add_parser("remove_audit_action")
subparser.set_defaults(parser="remove_audit_action")
subparser.set_defaults(func=partial(self.delete))
subparser.add_argument("short_name", help="Q2_AuditAction.ShortName")
[docs]
async def get(
self,
serialize_for_cli=False,
no_trunc=False,
short_name=None,
wedge_address_type_name="",
) -> List[AuditActionRow]:
assert not (short_name and wedge_address_type_name), (
"short_name and wedge_address_type_name are mutually exclusive. Only one should be specified at a time"
)
truncate = not no_trunc
fields_to_truncate = []
if truncate:
fields_to_truncate = ["Description"]
parameters = ExecuteStoredProcedure.SqlParameters([])
if short_name:
parameters = ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
)
])
elif wedge_address_type_name:
parameters = ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wa_type_name",
wedge_address_type_name,
)
])
response = await self.call_hq("sdk_GetAuditActions", parameters)
if serialize_for_cli:
response = self.serialize_for_cli(
response,
[
"ActionID",
"ActionName",
"Description",
"CategoryName",
"BeforeActionWedgeAddressID",
"AfterActionWedgeAddressID",
],
fields_to_truncate=fields_to_truncate,
)
return response
[docs]
async def get_by_name(self, name, **kwargs) -> AuditActionRow:
results = await self.get(short_name=name)
return results[0]
[docs]
async def get_by_aaa_wa_type_name(
self, wedge_address_type_name, serialize_for_cli=False, no_trunc=False
) -> List[AuditActionRow]:
truncate = not no_trunc
fields_to_truncate = []
if truncate:
fields_to_truncate = ["Description"]
parameters = ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wa_type_name",
wedge_address_type_name,
)
])
response = await self.call_hq("sdk_GetAuditActions", parameters)
if serialize_for_cli:
response = self.serialize_for_cli(
response,
["ActionID", "ActionName", "Description", "CategoryName"],
fields_to_truncate=fields_to_truncate,
)
return response
[docs]
async def create(self, short_name, description, category) -> AuditActionRow:
results = await self.call_hq(
"sdk_AddAuditAction",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"description",
description,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"category_name",
category,
),
]),
)
return results[0]
[docs]
async def update_after_action_wa(
self, short_name: str, wedge_address_id: Optional[int] = None
):
return await self.call_hq(
"sdk_SetAuditActionAfterWedgeID",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wedge_address_id",
wedge_address_id,
),
]),
)
[docs]
@dev_only
async def delete(self, short_name):
"""Note: this only works in the dev environment"""
return await self.call_hq(
"sdk_RemoveAuditAction",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
)
]),
)