Source code for q2_sdk.hq.db.data_feed

import json
from argparse import _SubParsersAction
from dataclasses import dataclass, fields
from functools import partial
from typing import List, Optional

from lxml.objectify import BoolElement, IntElement, StringElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.models.hq_params.stored_procedure import Param

from .base import InstallerObj
from .representation_row_base import BaseFormRow

D_TYPES = ExecuteStoredProcedure.DataType


[docs] @dataclass class UpdateDataFeed: shortName: str description: Optional[str] = None enabled: Optional[bool] = None wedgePathName: Optional[str] = None wedgeJsonData: Optional[dict] = None url: Optional[str] = None config: Optional[dict] = None
[docs] def build_params(self): params = [] for field in fields(self): name = field.name value = getattr(self, name) if value is not None: stored_proc_type, adjusted_value = self.param_to_type(name) Param(adjusted_value, stored_proc_type, name).add_to_param_list(params) return params
[docs] def param_to_type(self, field_name): sql_type = None value = None match field_name: case "shortName": sql_type = D_TYPES.VarChar value = self.shortName case "description": sql_type = D_TYPES.VarChar value = self.description case "enabled": sql_type = D_TYPES.Bit value = self.enabled case "wedgePathName": sql_type = D_TYPES.VarChar value = self.wedgePathName case "wedgeJsonData": if self.wedgeJsonData and ( "sdk" not in self.wedgeJsonData or "company" not in self.wedgeJsonData ): raise DatabaseDataError( 'wedgeJsonData needs to contain an sdk and company value ({"sdk": "Q2", "company": "Q2"})' ) sql_type = D_TYPES.VarChar value = json.dumps(self.wedgeJsonData) case "url": sql_type = D_TYPES.VarChar value = self.url case "config": sql_type = D_TYPES.Xml value = json.dumps(self.config) return sql_type, value
[docs] class DataFeedRow(BaseFormRow): DataFeedID: IntElement = "DataFeedID" ShortName: StringElement = "ShortName" Description: StringElement = "Description" Enabled: BoolElement = "Enabled" WedgePathName: StringElement = "WedgePathName" WedgeAddressID: IntElement = "WedgeAddressID" WedgeJsonData: StringElement = "WedgeJsonData"
[docs] class DataFeed(InstallerObj): GET_BY_NAME_KEY = "ShortName" NAME = "DataFeed" REPRESENTATION_ROW_CLASS = DataFeedRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_data_feeds") subparser.set_defaults(parser="get_data_feeds") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "-s", "--short_name", help="Limit results to a single entry for more details", ) subparser = parser.add_parser("add_data_feed") subparser.set_defaults(parser="add_data_feed") subparser.set_defaults(func=partial(self.create)) subparser.add_argument("short_name", help="Q2_DataFeed.ShortName") subparser.add_argument("description", help="Q2_DataFeed.Description") subparser.add_argument("wedge_path_name", help="Q2_Form.WedgePathName") subparser = parser.add_parser("remove_data_feed") subparser.set_defaults(parser="remove_data_feed") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument("short_name", help="Q2_DataFeed.ShortName") subparser.add_argument( "-w", "--leave_wedge_address", dest="remove_wedge_address", action="store_false", help="Only remove the data feed entry, and not the wedge address entry", )
[docs] async def get( self, serialize_for_cli=False, short_name: Optional[str] = None, data_feed_id: Optional[int] = None, ) -> List[DataFeedRow]: response = await self.call_hq("sdk_GetDataFeeds") if short_name: response = [x for x in response if x.ShortName == short_name] elif data_feed_id: response = [x for x in response if x.DataFeedID.text == str(data_feed_id)] for row in response: row.IsCustomerCreated = self._is_customer_created(row) if serialize_for_cli: if not response: return "" column = [ "DataFeedID", "ShortName", "Description", "Enabled", "WedgePathName", "WedgeAddressID", "IsCustomerCreated", ] if short_name: column.extend([ "WedgeAddressID", "WedgeAddressTypeName", "Url", "SendAccountList", "SendAccountDetails", ]) response = sorted(response, key=lambda x: x.DataFeedID) response = self.serialize_for_cli(response, column) return response
[docs] async def create( self, short_name, description, wedge_path_name, wedge_json_config: Optional[dict] = None, is_customer_created: bool = False, company: Optional[str] = None, ): if wedge_json_config is None: wedge_json_config = {"sdk": "Q2"} if is_customer_created: wedge_json_config = {"sdk": "Customer"} else: author = "Q2" if is_customer_created: author = "Customer" wedge_json_config["sdk"] = author if company is not None: wedge_json_config["company"] = company wedge_json_config = json.dumps(wedge_json_config) return await self.call_hq( "sdk_AddDataFeed", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "short_name", short_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "description", description, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "wedge_path_name", wedge_path_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "wedge_json_configs", wedge_json_config, ), ]), )
[docs] async def update(self, update_parameters: UpdateDataFeed): sql_parameters = update_parameters.build_params() return await self.call_hq( "sdk_UpdateDataFeed", ExecuteStoredProcedure.SqlParameters(sql_parameters), )
[docs] async def delete(self, short_name, remove_wedge_address=True): parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "short_name", short_name ) ] if remove_wedge_address: parameters.append( ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "remove_wedge_address", True ) ) return await self.call_hq( "sdk_RemoveDataFeed", ExecuteStoredProcedure.SqlParameters(parameters) )