Source code for q2_sdk.hq.db.admin_user_property_data_element

from enum import Enum
from argparse import _SubParsersAction
from functools import partial

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.tools.decorators import dev_only
from q2_sdk.hq.db.user_property_data_element import UserPropertyDataElement
from q2_sdk.hq.models.hq_params.stored_procedure import Param

D_TYPES = ExecuteStoredProcedure.DataType


[docs] class UserPropertyDataType(Enum): Boolean = "BOO" String = "STR" Bit = "BIT" Integer = "INT" Double = "DBL"
[docs] class AdminUserPropertyDataElement(UserPropertyDataElement):
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_admin_user_property_data_elements") subparser.set_defaults(parser="get_admin_user_property_data_elements") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "property_name_prefix", help="Wildcard match against Q2_AdminUserPropertyDataElements.PropertyName", ) subparser.add_argument( "--no-trunc", action="store_true", help="Do not truncate PropertyLongName" ) subparser = parser.add_parser("add_admin_user_property_data_element") subparser.set_defaults(parser="add_admin_user_property_data_element") subparser.set_defaults(func=partial(self.create)) subparser.add_argument( "property_name", help="Q2_AdminUserPropertyDataElements.PropertyName" ) subparser.add_argument( "property_long_name", help="Q2_AdminUserPropertyDataElements.PropertyLongName", ) subparser.add_argument("data_type", help=UserPropertyDataType) subparser.add_argument( "--is-group-property", help="Q2_AdminUserPropertyDataElements.IsGroupProperty", default=True, ) subparser.add_argument( "--is-user-property", help="Q2_AdminUserPropertyDataElements.IsUserProperty", default=False, ) subparser = parser.add_parser("remove_admin_user_property_data_element") subparser.set_defaults(parser="remove_admin_user_property_data_element") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument( "property_name", help="Q2_AdminUserPropertyDataElement.PropertyName" )
[docs] async def get( self, property_name_prefix: str, no_trunc=False, serialize_for_cli=False ): truncate = not no_trunc params = [] Param( property_name_prefix, D_TYPES.VarChar, "property_name_prefix" ).add_to_param_list(params) response = await self.call_hq( "sdk_GetAdminUserPropertyDataElements", ExecuteStoredProcedure.SqlParameters(params), ) if serialize_for_cli: response = self._serialize_response( ["IsGroupProperty", "IsUserProperty", "VersionAdded"], truncate, response, ) return response
[docs] async def create( self, property_name, property_long_name, data_type=UserPropertyDataType.String, is_group_property=True, is_user_property=False, ): if isinstance(data_type, UserPropertyDataType): data_type = data_type.value hq_resp = await self.call_hq( "sdk_AddAdminUserPropertyDataElement", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "property_name", property_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "property_long_name", property_long_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "property_data_type", data_type, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "is_group_property", str(is_group_property), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "is_user_property", str(is_user_property), ), ]), ) return hq_resp
[docs] @dev_only async def delete(self, property_name): """Note: this only works in the dev environment""" return await self.call_hq( "sdk_RemoveAdminUserPropertyDataElement", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "property_name", property_name, ) ]), )