Source code for q2_sdk.hq.db.edv_profile

from argparse import _SubParsersAction
from functools import partial
from typing import List
from lxml.objectify import IntElement, StringElement, BoolElement
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase


[docs] class EdvProfileRow(RepresentationRowBase): # object name: type hinting = "column name in the db response" EdvProfileID: IntElement = "EdvProfileID" Description: StringElement = "Description" TokenLifetimeInMinutes: IntElement = "TokenLifetimeInMinutes" IsRealtime: BoolElement = "IsRealtime" RequireTokenIfNotSuspect: BoolElement = "RequireTokenIfNotSuspect" DoNotAllowIfSuspectLogin: BoolElement = "DoNotAllowIfSuspectLogin" AuditActionShortName: StringElement = "AuditActionShortName"
[docs] class EdvProfile(DbObject): # GET_BY_NAME_KEY = "column in the db response" NAME = "EdvProfile" REPRESENTATION_ROW_CLASS = EdvProfileRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_edv_profile_by_profile_id") subparser.set_defaults(parser="get_edv_profile_by_profile_id") subparser.set_defaults(func=partial(self.get_by_id, serialize_for_cli=True)) subparser.add_argument("edv_profile_id", help="Q2_EdvProfile.EdvProfileID") subparser = parser.add_parser("get_edv_profiles") subparser.set_defaults(parser="get_edv_profiles") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
[docs] async def get(self, serialize_for_cli=False): response = await self.call_hq("sdk_GetEdvProfiles") if serialize_for_cli: columns = [ "EdvProfileID", "Description", "TokenLifetimeInMinutes", "IsRealtime", "RequireTokenIfNotSuspect", "DoNotAllowIfSuspectLogin", "AuditActionShortName", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_by_id( self, edv_profile_id: int, serialize_for_cli=False ) -> List[EdvProfileRow]: response = await self.call_hq( "sdk_GetEdvProfile", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "edv_profile_id", int(edv_profile_id), ) ]), ) if serialize_for_cli: columns = [ "EdvProfileID", "Description", "TokenLifetimeInMinutes", "IsRealtime", "RequireTokenIfNotSuspect", "DoNotAllowIfSuspectLogin", "AuditActionShortName", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def add_edv_profile( self, description: str = "EDV Profile (Realtime)", token_life_in_minutes: int = 5, is_realtime: bool = True, ): response = await self.call_hq( "sdk_AddEdvProfile", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "description", description, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "token_life_in_minutes", token_life_in_minutes, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "is_realtime", is_realtime, ), ]), ) return response
[docs] async def update_edv_profile(self, edv_profile_id: int, is_realtime: bool): return await self.call_hq( "sdk_UpdateEdvProfile", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "edv_profile_id", int(edv_profile_id), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "is_realtime", is_realtime, ), ]), )