Source code for q2_sdk.hq.db.user_property_category

from argparse import _SubParsersAction
from functools import partial
from typing import List, AnyStr
from lxml.objectify import IntElement, StringElement
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] class UserPropertyCategoryRow(RepresentationRowBase): # object name: type hinting = "column name in the db response" CategoryID: IntElement = "CategoryID" ShortName: StringElement = "ShortName" Description: StringElement = "Description"
[docs] class UserPropertyCategory(DbObject): # GET_BY_NAME_KEY = "column in the db response" NAME = "UserPropertyCategory" REPRESENTATION_ROW_CLASS = UserPropertyCategoryRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("create_user_property_category") subparser.set_defaults(parser="create_user_property_category") subparser.set_defaults(func=partial(self.create, serialize_for_cli=True)) subparser.add_argument( "shortname", type=str, help="Short name for user property category" ) subparser.add_argument( "description", type=str, help="Description for user property category" ) subparser = parser.add_parser("get_user_property_category") subparser.set_defaults(parser="get_user_property_category") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser = parser.add_parser("get_user_property_category_by_shortname") subparser.set_defaults(parser="get_user_property_category_by_shortname") subparser.set_defaults( func=partial(self.get_category_by_shortname, serialize_for_cli=True) ) subparser.add_argument( "shortname", type=str, help="Short name for user property category" )
[docs] async def create( self, shortname: AnyStr, description: AnyStr, serialize_for_cli=False ) -> List[UserPropertyCategoryRow]: assert isinstance(shortname, str), "Please supply a valid shortname" assert isinstance(description, str), "Please supply a valid description" sql_params = [] list_of_params = [ Param(shortname, D_TYPES.VarChar, "shortname"), Param(description, D_TYPES.VarChar, "description"), ] for item in list_of_params: item.add_to_param_list(sql_params) response = await self.call_hq( "sdk_AddUserPropertyCategory", ExecuteStoredProcedure.SqlParameters(sql_params), ) if serialize_for_cli: response = self.serialize_for_cli(response) return response
[docs] async def get(self, serialize_for_cli=False): response = await self.call_hq("sdk_GetUserPropertyCategories") if serialize_for_cli: response = self.serialize_for_cli(response) return response
[docs] async def get_category_by_shortname( self, shortname: str, serialize_for_cli=False ) -> List[UserPropertyCategoryRow]: response = await self.get_by_name( shortname, get_by_name_key="ShortName", get_func=self.get ) if serialize_for_cli: return self.serialize_for_cli([response]) return response