Source code for q2_sdk.hq.db.hq_session_cache

from __future__ import annotations

from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import datetime
from functools import partial

from lxml.objectify import Element, fromstring

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.table_row import TableRow

from .db_object import DbObject


[docs] @dataclass class SessionData: raw: Element SessionId: str GroupId: int CustomerId: int UserId: int UserLogonId: int
[docs] @staticmethod def from_xml(inp: Element) -> SessionData: raw = fromstring(inp) return SessionData( raw, raw.SessionID, raw.LogonDetails.GroupId, raw.LogonDetails.CustomerId, raw.LogonDetails.UserId, raw.LogonDetails.UserLogonId, )
[docs] @dataclass class HqSessionCacheRow: HqSessionCacheID: int Workstation: str ExpirationDate: datetime SessionData: SessionData
class _HqSessionCacheRow(TableRow): HqSessionCacheID: int Workstation: str ExpirationDate: datetime SessionData: SessionData
[docs] class HqSessionCache(DbObject): NAME = "HqSessionCache" REPRESENTATION_ROW_CLASS = _HqSessionCacheRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_hq_session_cache") subparser.set_defaults(parser="get") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("workstation", help="Filter by workstation")
[docs] async def get( self, workstation: str, serialize_for_cli=False ) -> list[HqSessionCacheRow]: response = await self.call_hq( "sdk_GetHqSessionCacheByWorkstation", sql_parameters=ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "workstation", workstation, ) ]), ) if serialize_for_cli: columns = ["Workstation", "ExpirationDate"] return self.serialize_for_cli(response, columns) dataclass_response = [ HqSessionCacheRow( x.HqSessionCacheID, x.Workstation, x.ExpirationDate, x.SessionData ) for x in response ] return dataclass_response