Source code for q2_sdk.core.install_steps.data_feed

from typing import Unpack

from q2_sdk.core.install_steps.base import (
    InstallStep,
    InstallStepAttribute,
    InstallStepArguments,
)
from q2_sdk.hq.db.data_feed import DataFeed as DataFeedDbObj


[docs] class DataFeed(InstallStep): """Interacts with Q2_DataFeed table""" def __init__( self, short_name, description, wedge_path_name, **kwargs: Unpack[InstallStepArguments], ): super().__init__(**kwargs) self.short_name = InstallStepAttribute(short_name) self.description = InstallStepAttribute(description) self.wedge_path_name = InstallStepAttribute(wedge_path_name)
[docs] async def install(self): await super().install() await DataFeedDbObj(self.logger, hq_credentials=self.hq_credentials).create( self.short_name.value, self.description.value, self.wedge_path_name.value )
[docs] async def uninstall(self): await DataFeedDbObj(self.logger, hq_credentials=self.hq_credentials).delete( self.short_name.value )