Source code for q2_sdk.hq.db.hq_session_cache

from __future__ import annotations
from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from lxml.objectify import Element, IntElement, StringElement, fromstring
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)


[docs] @dataclass class HqSessionCacheRow(RepresentationRowBase): HqSessionCacheID: IntElement = "UserID" Workstation: StringElement = "Workstation" ExpirationDate: datetime = "ExpirationDate" SessionData: SessionData = "SessionData"
[docs] @dataclass class SessionData: raw: Element SessionId: str GroupId: int CustomerId: int UserId: int UserLogonId: int
[docs] @staticmethod def from_xml(inp: Element) -> SessionData: raw = fromstring(inp) return SessionData( raw, raw.SessionID, raw.LogonDetails.GroupId, raw.LogonDetails.CustomerId, raw.LogonDetails.UserId, raw.LogonDetails.UserLogonId, )
[docs] class HqSessionCache(DbObject): NAME = "HqSessionCache" REPRESENTATION_ROW_CLASS = HqSessionCacheRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_hq_session_cache") subparser.set_defaults(parser="get") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("workstation", help="Filter by workstation")
[docs] async def get( self, workstation: str, serialize_for_cli=False ) -> list[HqSessionCacheRow]: response = await self.call_hq( "sdk_GetHqSessionCacheByWorkstation", sql_parameters=ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "workstation", workstation, ) ]), ) if serialize_for_cli: columns = ["Workstation", "ExpirationDate"] return self.serialize_for_cli(response, columns) dataclass_response = [ HqSessionCacheRow( x.HqSessionCacheID, x.Workstation, x.ExpirationDate, x.SessionData ) for x in response ] return dataclass_response