import asyncio
from argparse import _SubParsersAction
from enum import Enum
from functools import partial
from typing import List, Optional
from lxml.objectify import IntElement, StringElement, BoolElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.tools.decorators import dev_only
from q2_sdk.tools.utils import to_bool
from .db_object import DbObject, DEFAULT
from .host_tran_code_group import HostTranCodeGroup
from .product_type import ProductType
from .representation_row_base import RepresentationRowBase
from .third_party_data import ThirdPartyData
from .third_party_data_element import ThirdPartyDataElement
from .vendor import Vendor
from ...core.cli import textui, colored
[docs]
class ThirdPartyDataElementProperty(Enum):
SSO_NEW_WINDOW = "UiConfig.ssoNewWindow"
SSO_ACCOUNT_SPECIFIC = "UiConfig.ssoAccountSpecific"
[docs]
class ProductRow(RepresentationRowBase):
ProductID: IntElement = "ProductID"
HostProductCode: StringElement = "HostProductCode"
HostProductTypeCode: StringElement = "HostProductTypeCode"
ProductName: StringElement = "ProductName"
ProductTypeName: StringElement = "ProductTypeName"
TranCodeGroupID: IntElement = "TranCodeGroupID"
ProductTypeID: IntElement = "ProductTypeID"
VoiceFile: StringElement = "VoiceFile"
ProductNickName: StringElement = "ProductNickName"
HydraProductCode: StringElement = "HydraProductCode"
VendorID: IntElement = "VendorID"
AllowOpen: BoolElement = "AllowOpen"
AllowClose: BoolElement = "AllowClose"
[docs]
class Product(DbObject):
GET_BY_NAME_KEY = "ProductName"
NAME = "Product"
REPRESENTATION_ROW_CLASS = ProductRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_products")
subparser.set_defaults(parser="get_products")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser = parser.add_parser("add_product")
subparser.set_defaults(parser="add_product")
subparser.set_defaults(func=partial(self.create))
subparser.add_argument("product_name")
subparser.add_argument("host_product_code")
subparser.add_argument("tran_code_group_name")
subparser.add_argument("product_type_name")
subparser.add_argument("hydra_product_code")
subparser.add_argument("--product_nick_name")
subparser.add_argument("--voice-file")
subparser.add_argument("--vendor-name", type=str)
subparser.add_argument("--allow-open", action="store_true")
subparser.add_argument("--allow-close", action="store_true")
subparser = parser.add_parser("update_product")
subparser.set_defaults(parser="update_product")
subparser.set_defaults(func=partial(self.update))
subparser.add_argument("product_id", type=int)
subparser.add_argument("--name", default=DEFAULT)
subparser.add_argument("--host_product_code", default=DEFAULT)
subparser.add_argument("--tran_code_group_name", default=DEFAULT)
subparser.add_argument("--hydra_product_code", default=DEFAULT)
subparser.add_argument("--product_type_id", default=DEFAULT)
subparser.add_argument("--product_nick_name", default=DEFAULT)
subparser.add_argument("--voice-file", default=DEFAULT)
subparser.add_argument("--vendor-name", type=str, default=DEFAULT)
subparser.add_argument("--allow-open", action="store_true", default=DEFAULT)
subparser.add_argument("--allow-close", action="store_true", default=DEFAULT)
subparser = parser.add_parser("remove_product")
subparser.set_defaults(parser="remove_product")
subparser.set_defaults(func=partial(self.delete))
subparser.add_argument("product_id")
subparser = parser.add_parser("link_vendor_to_product")
subparser.set_defaults(parser="link_vendor_to_product")
subparser.set_defaults(func=partial(self.link_vendor_to_product))
subparser.add_argument("vendor_name", help="Q2_Vendors.VendorName")
subparser.add_argument("product_id", help="Q2_Product.ProductID")
subparser.add_argument("user_id", help="Q2_User.UserID")
[docs]
async def get(self, serialize_for_cli=False) -> List[ProductRow]:
response = await self.call_hq("sdk_GetProducts")
if serialize_for_cli:
response = self.serialize_for_cli(
response,
[
"ProductID",
"ProductTypeID",
"ProductName",
"HostProductCode",
"HostProductTypeCode",
"ProductTypeName",
"HydraProductCode",
],
)
return response
[docs]
async def get_list_by_name(
self, product_name, product_type_name=None, host_product_code=None
) -> List[ProductRow]:
full_response = await self.get()
filtered_response = [
x for x in full_response if x.ProductName.text == product_name
]
if product_type_name:
filtered_response = [
x
for x in filtered_response
if x.ProductTypeName.text == product_type_name
]
if host_product_code:
filtered_response = [
x
for x in filtered_response
if x.HostProductCode.text == host_product_code
]
return filtered_response
async def _get_vendor_id(self, vendor_name) -> int:
db_obj = Vendor(self.logger, self.hq_credentials)
try:
vendor = await db_obj.get_by_name(vendor_name)
except DatabaseDataError:
return None
return vendor.VendorID.pyval
async def _get_host_tran_code_group_id(self, tran_code_group_name) -> int:
db_obj = HostTranCodeGroup(self.logger, self.hq_credentials)
response = await db_obj.get()
tran_code_group_id = None
for row in response:
if row.Description.text == tran_code_group_name:
tran_code_group_id = row.TranCodeGroupID.pyval
break
if not tran_code_group_id:
raise DatabaseDataError(
f"No HostTranCodeGroup with name {tran_code_group_name} exists"
)
return tran_code_group_id
async def _get_product_type_id(self, product_type_name):
db_obj = ProductType(self.logger, self.hq_credentials)
response = await db_obj.get_by_name(product_type_name)
if response is None:
raise DatabaseDataError(
f"No ProductType with name {product_type_name} exists"
)
return response.ProductTypeID.pyval
[docs]
async def create(
self,
product_name: str,
host_product_code: str,
tran_code_group_name: str,
product_type_name: str,
hydra_product_code: str,
product_nick_name: str | None = None,
voice_file: str = None,
vendor_name: str = None,
product_type_id: Optional[int] = None,
allow_open=False,
allow_close=False,
):
"""
product_type_id may be passed in to disambiguate duplicate product_type_name entries
"""
tran_code_group_id = await self._get_host_tran_code_group_id(
tran_code_group_name
)
vendor_id = await self._get_vendor_id(vendor_name)
if product_type_id is None:
product_type_id = await self._get_product_type_id(product_type_name)
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "product_name", product_name
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"host_product_code",
host_product_code,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"tran_code_group_id",
tran_code_group_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.SmallInt,
"product_type_id",
product_type_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Char,
"hydra_product_code",
hydra_product_code,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"product_nick_name",
product_nick_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "voice_file", voice_file
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "vendor_id", vendor_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit, "allow_open", bool(int(allow_open))
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit,
"allow_close",
bool(int(allow_close)),
),
]
await self.call_hq(
"sdk_AddProduct",
sql_parameters=ExecuteStoredProcedure.SqlParameters(parameters),
)
[docs]
async def update(
self,
product_id: int,
name: str = DEFAULT,
host_product_code: str = DEFAULT,
tran_code_group_name: int = DEFAULT,
hydra_product_code: str = DEFAULT,
product_type_id: int = DEFAULT,
product_nick_name: str = DEFAULT,
voice_file: str = DEFAULT,
vendor_name: str = DEFAULT,
allow_open: bool = DEFAULT,
allow_close: bool = DEFAULT,
):
"""
update all values of an existing product for the given product_id.
"""
matching = [x for x in await self.get() if int(x.ProductID) == product_id]
product = ProductRow()
if not matching:
raise DatabaseDataError(f"No product with id {product_id}")
match = matching[0]
product.ProductID = product_id
product.ProductName = (
name if name is not DEFAULT else match.findtext("ProductName")
)
product.HostProductCode = (
host_product_code
if host_product_code is not DEFAULT
else match.findtext("HostProductCode")
)
product.HydraProductCode = (
hydra_product_code
if hydra_product_code is not DEFAULT
else match.findtext("HydraProductCode")
)
product.ProductTypeID = (
product_type_id
if product_type_id is not DEFAULT
else match.findtext("ProductTypeID")
)
product.ProductNickName = (
product_nick_name
if product_nick_name is not DEFAULT
else match.findtext("ProductNickName")
)
product.VoiceFile = (
voice_file if voice_file is not DEFAULT else match.findtext("VoiceFile")
)
product.VendorID = (
await self._get_vendor_id(vendor_name)
if vendor_name is not DEFAULT
else match.findtext("VendorID")
)
product.AllowOpen = (
allow_open
if allow_open is not DEFAULT
else to_bool(match.findtext("AllowOpen"))
)
product.AllowClose = (
allow_close
if allow_close is not DEFAULT
else to_bool(match.findtext("AllowClose"))
)
if tran_code_group_name is not DEFAULT:
product.TranCodeGroupID = await self._get_host_tran_code_group_id(
tran_code_group_name
)
else:
product.TranCodeGroupID = match.findtext("TranCodeGroupID")
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "product_id", product.ProductID
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"product_name",
product.ProductName,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"host_product_code",
product.HostProductCode,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"tran_code_group_id",
int(product.TranCodeGroupID),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.SmallInt,
"product_type_id",
int(product.ProductTypeID),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Char,
"hydra_product_code",
product.HydraProductCode,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"product_nick_name",
product.ProductNickName,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "voice_file", product.VoiceFile
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"vendor_id",
int(product.VendorID) if product.VendorID else None,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit,
"allow_open",
bool(int(product.AllowOpen)),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit,
"allow_close",
bool(int(product.AllowClose)),
),
]
await self.call_hq(
"sdk_UpdateProduct",
sql_parameters=ExecuteStoredProcedure.SqlParameters(parameters),
)
[docs]
@dev_only
async def delete(self, product_id: int):
"""Note: this only works in the dev environment"""
await self.call_hq(
"sdk_RemoveProduct",
sql_parameters=ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "product_id", product_id
)
]),
)
[docs]
async def link_vendor_to_product(self, vendor_name, product_id, user_id):
tpd_obj = ThirdPartyData(self.logger, self.hq_credentials, ret_table_obj=True)
tpde_obj = ThirdPartyDataElement(
self.logger, self.hq_credentials, ret_table_obj=True
)
try:
await Product(self.logger, self.hq_credentials, ret_table_obj=True).update(
int(product_id), vendor_name=str(vendor_name)
)
await asyncio.gather(*[
tpde_obj.create(
ThirdPartyDataElementProperty.SSO_NEW_WINDOW.value,
"Open in a new window",
),
tpde_obj.create(
ThirdPartyDataElementProperty.SSO_ACCOUNT_SPECIFIC.value,
"Send account number on SSO account tile",
),
])
# add tpd row with vendor_id and data_id(tpde id) if not present
await asyncio.gather(*[
tpd_obj.create(
user_id,
str(vendor_name),
ThirdPartyDataElementProperty.SSO_NEW_WINDOW.value,
True,
),
tpd_obj.create(
user_id,
str(vendor_name),
ThirdPartyDataElementProperty.SSO_ACCOUNT_SPECIFIC.value,
True,
),
])
textui.puts(colored.green(f"Vendor '{vendor_name}' is now linked"))
except Exception as e:
if "UiConfig.ssoNewWindow" in e.args[0]:
await tpd_obj.update(
user_id,
str(vendor_name),
ThirdPartyDataElementProperty.SSO_NEW_WINDOW.value,
True,
)
if "UiConfig.ssoAccountSpecific" in e.args[0]:
await tpd_obj.update(
user_id,
str(vendor_name),
ThirdPartyDataElementProperty.SSO_ACCOUNT_SPECIFIC.value,
True,
)