from lxml.objectify import E
from lxml.etree import tostring
from dataclasses import dataclass, field
from enum import IntFlag, auto
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .db_object import DbObject
D_TYPES = ExecuteStoredProcedure.DataType
# Enum names are gathered from the diag_LogonEvent table/view
[docs]
class BitFlagDetails(IntFlag):
LoggedOn = auto()
AccountListPopulated = auto()
MultiChannelAuthenticated = auto()
ForgotPasswordSuccess = auto()
LogonTokenMFA = auto()
LogonTokenEnabled = auto()
RoamingAllowed = auto()
BrowserRegistrationAllowed = auto()
PasswordWasNew = auto()
PasswordWasExpired = auto()
CsrAssistSession = auto()
Treasury = auto()
HasAccountListBeenReturned = auto()
[docs]
@dataclass
class LogonEventRow:
event_id: int
http_headers_id: int
audit_id: int
bitflag: int
logon_details: list[BitFlagDetails] = field(default_factory=list)
def __post_init__(self):
self.logon_details = [x for x in BitFlagDetails if x & self.bitflag]
[docs]
class LogonEvent(DbObject):
# GET_BY_NAME_KEY = "column in the db response"
NAME = "LogonEvent"
[docs]
async def get(self, audit_ids: list[int]) -> list[LogonEventRow]:
"""
Retrieves logon details from a list of logon audit ids
:param list[int] audit_ids: A list of logon audit ids
"""
# <request><a id="1" /><a id="2" /></request>
audit_nodes = [E.a(id=str(x)) for x in audit_ids]
request_node = E.request(*audit_nodes)
request_string = tostring(request_node, encoding="utf-8")
params = []
Param(request_string.decode(), D_TYPES.Xml, "audit_ids").add_to_param_list(
params
)
db_results = await self.call_hq(
"sdk_GetLogonEvent", ExecuteStoredProcedure.SqlParameters(params)
)
return [
LogonEventRow(
x.LogonEventID.text,
x.HttpHeaderAuditID.text,
x.AuditID.text,
x.BitFlags,
)
for x in db_results
]