Source code for q2_sdk.hq.db.group

from argparse import _SubParsersAction
from functools import partial
from typing import List

from lxml.objectify import IntElement, StringElement, BoolElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.tools.decorators import dev_only
from .db_object import DbObject, DEFAULT
from .representation_row_base import RepresentationRowBase


[docs] class GroupRow(RepresentationRowBase): GroupID: IntElement = "GroupID" GroupDesc: StringElement = "GroupDesc" DashboardProfileID: IntElement = "DashboardProfileID" SecAlertProfileID: IntElement = "SecAlertProfileID" DefaultUiSelectionID: IntElement = "DefaultUiSelectionID" IsCommercial: BoolElement = "IsCommercial" IsTreasury: BoolElement = "IsTreasury" PolicyID: IntElement = "PolicyID" EdvProfileID: IntElement = "EDVProfileID"
[docs] class CentralGroupRow(RepresentationRowBase): GroupID: IntElement = "GroupID" GroupDesc: StringElement = "GroupDesc"
[docs] class Group(DbObject): GET_BY_NAME_KEY = "GroupDesc" NAME = "Group" REPRESENTATION_ROW_CLASS = GroupRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_groups") subparser.set_defaults(parser="get_groups") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "--show-deleted", action="store_true", help="Show Deleted Groups" ) subparser = parser.add_parser("get_central_groups") subparser.set_defaults(parser="get_central_groups") subparser.set_defaults(func=partial(self.get_central, serialize_for_cli=True)) subparser = parser.add_parser("add_group") subparser.set_defaults(parser="add_group") subparser.set_defaults(func=partial(self.create)) subparser.add_argument("group_name", help="Name of the new group") subparser = parser.add_parser("remove_group") subparser.set_defaults(parser="remove_group") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument("group_name", help="Name of the new group")
[docs] async def get(self, show_deleted=False, serialize_for_cli=False) -> List[GroupRow]: response = await self.call_hq("sdk_GetGroups") if not show_deleted: response = [x for x in response if x.find("./DeletedDate") is None] if serialize_for_cli: columns = ["GroupID", "GroupDesc"] if show_deleted: columns.append("DeletedDate") response = self.serialize_for_cli(response, columns) return response
[docs] async def get_by_id(self, group_id) -> GroupRow: response = await self.get(show_deleted=True) for record in response: if record.findtext("GroupID") == str(group_id): return record # Searched through all groups and cannot find group record. raise DatabaseDataError( f"No Group with GroupID '{group_id}' is present in the database" )
[docs] async def get_central(self, serialize_for_cli=False) -> List[CentralGroupRow]: response = await self.call_hq( "sdk_GetCentralGroups", representation_class_override=CentralGroupRow ) if serialize_for_cli: columns = ["GroupID", "GroupDesc"] response = self.serialize_for_cli(response, columns) return response
[docs] async def create(self, group_name): return await self.call_hq( "sdk_AddGroup", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "group_name", group_name, ) ]), )
[docs] async def update_group( self, group_id, sec_alert_profile_id=DEFAULT, is_commercial=DEFAULT, edv_profile_id=DEFAULT, ): group_row = await Group( self.logger, self.hq_credentials, ret_table_obj=None ).get_by_id(group_id) return await self.call_hq( "sdk_UpdateGroup", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "group_id", int(group_id), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "sec_alert_profile_id", int(group_row.SecAlertProfileID) if sec_alert_profile_id == DEFAULT else sec_alert_profile_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "is_commercial", bool(group_row.IsCommercial) if is_commercial == DEFAULT else is_commercial, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "edv_profile_id", int(group_row.EdvProfileID) if edv_profile_id == DEFAULT else edv_profile_id, ), ]), )
[docs] @dev_only async def delete(self, group_name): """Note: this only works in the dev environment""" return await self.call_hq( "sdk_RemoveGroup", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "group_name", group_name, ) ]), )