import json
from argparse import _SubParsersAction
from dataclasses import dataclass, fields
from functools import partial
from typing import List, Optional
from lxml.objectify import BoolElement, IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .base import InstallerObj
from .representation_row_base import BaseFormRow
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
@dataclass
class UpdateDataFeed:
shortName: str
description: Optional[str] = None
enabled: Optional[bool] = None
wedgePathName: Optional[str] = None
wedgeJsonData: Optional[dict] = None
url: Optional[str] = None
config: Optional[dict] = None
[docs]
def build_params(self):
params = []
for field in fields(self):
name = field.name
value = getattr(self, name)
if value is not None:
stored_proc_type, adjusted_value = self.param_to_type(name)
Param(adjusted_value, stored_proc_type, name).add_to_param_list(params)
return params
[docs]
def param_to_type(self, field_name):
sql_type = None
value = None
match field_name:
case "shortName":
sql_type = D_TYPES.VarChar
value = self.shortName
case "description":
sql_type = D_TYPES.VarChar
value = self.description
case "enabled":
sql_type = D_TYPES.Bit
value = self.enabled
case "wedgePathName":
sql_type = D_TYPES.VarChar
value = self.wedgePathName
case "wedgeJsonData":
if self.wedgeJsonData and (
"sdk" not in self.wedgeJsonData
or "company" not in self.wedgeJsonData
):
raise DatabaseDataError(
'wedgeJsonData needs to contain an sdk and company value ({"sdk": "Q2", "company": "Q2"})'
)
sql_type = D_TYPES.VarChar
value = json.dumps(self.wedgeJsonData)
case "url":
sql_type = D_TYPES.VarChar
value = self.url
case "config":
sql_type = D_TYPES.Xml
value = json.dumps(self.config)
return sql_type, value
[docs]
class DataFeedRow(BaseFormRow):
DataFeedID: IntElement = "DataFeedID"
ShortName: StringElement = "ShortName"
Description: StringElement = "Description"
Enabled: BoolElement = "Enabled"
WedgePathName: StringElement = "WedgePathName"
WedgeAddressID: IntElement = "WedgeAddressID"
WedgeJsonData: StringElement = "WedgeJsonData"
[docs]
class DataFeed(InstallerObj):
GET_BY_NAME_KEY = "ShortName"
NAME = "DataFeed"
REPRESENTATION_ROW_CLASS = DataFeedRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_data_feeds")
subparser.set_defaults(parser="get_data_feeds")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument(
"-s",
"--short_name",
help="Limit results to a single entry for more details",
)
subparser = parser.add_parser("add_data_feed")
subparser.set_defaults(parser="add_data_feed")
subparser.set_defaults(func=partial(self.create))
subparser.add_argument("short_name", help="Q2_DataFeed.ShortName")
subparser.add_argument("description", help="Q2_DataFeed.Description")
subparser.add_argument("wedge_path_name", help="Q2_Form.WedgePathName")
subparser = parser.add_parser("remove_data_feed")
subparser.set_defaults(parser="remove_data_feed")
subparser.set_defaults(func=partial(self.delete))
subparser.add_argument("short_name", help="Q2_DataFeed.ShortName")
subparser.add_argument(
"-w",
"--leave_wedge_address",
dest="remove_wedge_address",
action="store_false",
help="Only remove the data feed entry, and not the wedge address entry",
)
[docs]
async def get(
self,
serialize_for_cli=False,
short_name: Optional[str] = None,
data_feed_id: Optional[int] = None,
) -> List[DataFeedRow]:
response = await self.call_hq("sdk_GetDataFeeds")
if short_name:
response = [x for x in response if x.ShortName == short_name]
elif data_feed_id:
response = [x for x in response if x.DataFeedID.text == str(data_feed_id)]
for row in response:
row.IsCustomerCreated = self._is_customer_created(row)
if serialize_for_cli:
if not response:
return ""
column = [
"DataFeedID",
"ShortName",
"Description",
"Enabled",
"WedgePathName",
"WedgeAddressID",
"IsCustomerCreated",
]
if short_name:
column.extend([
"WedgeAddressID",
"WedgeAddressTypeName",
"Url",
"SendAccountList",
"SendAccountDetails",
])
response = sorted(response, key=lambda x: x.DataFeedID)
response = self.serialize_for_cli(response, column)
return response
[docs]
async def create(
self,
short_name,
description,
wedge_path_name,
wedge_json_config: Optional[dict] = None,
is_customer_created: bool = False,
company: Optional[str] = None,
):
if wedge_json_config is None:
wedge_json_config = {"sdk": "Q2"}
if is_customer_created:
wedge_json_config = {"sdk": "Customer"}
else:
author = "Q2"
if is_customer_created:
author = "Customer"
wedge_json_config["sdk"] = author
if company is not None:
wedge_json_config["company"] = company
wedge_json_config = json.dumps(wedge_json_config)
return await self.call_hq(
"sdk_AddDataFeed",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"description",
description,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wedge_path_name",
wedge_path_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wedge_json_configs",
wedge_json_config,
),
]),
)
[docs]
async def update(self, update_parameters: UpdateDataFeed):
sql_parameters = update_parameters.build_params()
return await self.call_hq(
"sdk_UpdateDataFeed",
ExecuteStoredProcedure.SqlParameters(sql_parameters),
)
[docs]
async def delete(self, short_name, remove_wedge_address=True):
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "short_name", short_name
)
]
if remove_wedge_address:
parameters.append(
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit, "remove_wedge_address", True
)
)
return await self.call_hq(
"sdk_RemoveDataFeed", ExecuteStoredProcedure.SqlParameters(parameters)
)