Source code for q2_sdk.hq.db.third_party_data_element

from argparse import _SubParsersAction
from functools import partial

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.tools.decorators import dev_only

from .db_object import DbObject


[docs] class ThirdPartyDataElement(DbObject): GET_BY_NAME_KEY = "Name" NAME = "ThirdPartyDataElement"
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_third_party_data_elements") subparser.set_defaults(parser="get_third_party_data_elements") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser = parser.add_parser("add_third_party_data_element") subparser.set_defaults(parser="add_third_party_data_element") subparser.set_defaults(func=partial(self.create)) subparser.add_argument("name", help="Q2_ThirdPartyDataElements.ShortName") subparser.add_argument( "description", help="Q2_ThirdPartyDataElements.Description" ) subparser = parser.add_parser("add_or_update_third_party_data_element") subparser.set_defaults(parser="add_or_update_third_party_data_element") subparser.set_defaults(func=partial(self.add_or_update)) subparser.add_argument("name", help="Q2_ThirdPartyDataElements.ShortName") subparser.add_argument( "description", help="Q2_ThirdPartyDataElements.Description" ) subparser = parser.add_parser("remove_third_party_data_element") subparser.set_defaults(parser="remove_third_party_data_element") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument("name", help="Q2_ThirdPartyDataElements.ShortName")
[docs] async def get(self, serialize_for_cli=False): response = await self.call_hq("sdk_GetThirdPartyDataElements") if serialize_for_cli: response = self.serialize_for_cli( response, ["DataID", "Name", "Description"] ) return response
[docs] async def create(self, name, description): return await self.call_hq( "sdk_AddThirdPartyDataElement", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "name", name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "description", description, ), ]), )
[docs] async def add_or_update(self, name, description): return await self.call_hq( "sdk_AddOrUpdateThirdPartyDataElement", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "name", name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "description", description, ), ]), )
[docs] @dev_only async def delete(self, name): """Note: this only works in the dev environment""" return await self.call_hq( "sdk_RemoveThirdPartyDataElement", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "name", name ) ]), )