Source code for q2_sdk.hq.db.logon_event

from lxml.objectify import E
from lxml.etree import tostring
from dataclasses import dataclass, field
from enum import IntFlag, auto
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .db_object import DbObject

D_TYPES = ExecuteStoredProcedure.DataType


#  Enum names are gathered from the diag_LogonEvent table/view
[docs] class BitFlagDetails(IntFlag): LoggedOn = auto() AccountListPopulated = auto() MultiChannelAuthenticated = auto() ForgotPasswordSuccess = auto() LogonTokenMFA = auto() LogonTokenEnabled = auto() RoamingAllowed = auto() BrowserRegistrationAllowed = auto() PasswordWasNew = auto() PasswordWasExpired = auto() CsrAssistSession = auto() Treasury = auto() HasAccountListBeenReturned = auto()
[docs] @dataclass class LogonEventRow: event_id: int http_headers_id: int audit_id: int bitflag: int logon_details: list[BitFlagDetails] = field(default_factory=list) def __post_init__(self): self.logon_details = [x for x in BitFlagDetails if x & self.bitflag]
[docs] class LogonEvent(DbObject): # GET_BY_NAME_KEY = "column in the db response" NAME = "LogonEvent"
[docs] async def get(self, audit_ids: list[int]) -> list[LogonEventRow]: """ Retrieves logon details from a list of logon audit ids :param list[int] audit_ids: A list of logon audit ids """ # <request><a id="1" /><a id="2" /></request> audit_nodes = [E.a(id=str(x)) for x in audit_ids] request_node = E.request(*audit_nodes) request_string = tostring(request_node, encoding="utf-8") params = [] Param(request_string.decode(), D_TYPES.Xml, "audit_ids").add_to_param_list( params ) db_results = await self.call_hq( "sdk_GetLogonEvent", ExecuteStoredProcedure.SqlParameters(params) ) return [ LogonEventRow( x.LogonEventID.text, x.HttpHeaderAuditID.text, x.AuditID.text, x.BitFlags, ) for x in db_results ]