Source code for q2_sdk.hq.db.product

import asyncio
from argparse import _SubParsersAction
from enum import Enum
from functools import partial
from typing import List, Optional
from lxml.objectify import IntElement, StringElement, BoolElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.tools.decorators import dev_only
from q2_sdk.tools.utils import to_bool

from .db_object import DbObject, DEFAULT
from .host_tran_code_group import HostTranCodeGroup
from .product_type import ProductType
from .representation_row_base import RepresentationRowBase
from .third_party_data import ThirdPartyData
from .third_party_data_element import ThirdPartyDataElement
from .vendor import Vendor
from ...core.cli import textui, colored


[docs] class ThirdPartyDataElementProperty(Enum): SSO_NEW_WINDOW = "UiConfig.ssoNewWindow" SSO_ACCOUNT_SPECIFIC = "UiConfig.ssoAccountSpecific"
[docs] class ProductRow(RepresentationRowBase): ProductID: IntElement = "ProductID" HostProductCode: StringElement = "HostProductCode" HostProductTypeCode: StringElement = "HostProductTypeCode" ProductName: StringElement = "ProductName" ProductTypeName: StringElement = "ProductTypeName" TranCodeGroupID: IntElement = "TranCodeGroupID" ProductTypeID: IntElement = "ProductTypeID" VoiceFile: StringElement = "VoiceFile" ProductNickName: StringElement = "ProductNickName" HydraProductCode: StringElement = "HydraProductCode" VendorID: IntElement = "VendorID" AllowOpen: BoolElement = "AllowOpen" AllowClose: BoolElement = "AllowClose"
[docs] class Product(DbObject): GET_BY_NAME_KEY = "ProductName" NAME = "Product" REPRESENTATION_ROW_CLASS = ProductRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_products") subparser.set_defaults(parser="get_products") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser = parser.add_parser("add_product") subparser.set_defaults(parser="add_product") subparser.set_defaults(func=partial(self.create)) subparser.add_argument("product_name") subparser.add_argument("host_product_code") subparser.add_argument("tran_code_group_name") subparser.add_argument("product_type_name") subparser.add_argument("hydra_product_code") subparser.add_argument("--product_nick_name") subparser.add_argument("--voice-file") subparser.add_argument("--vendor-name", type=str) subparser.add_argument("--allow-open", action="store_true") subparser.add_argument("--allow-close", action="store_true") subparser = parser.add_parser("update_product") subparser.set_defaults(parser="update_product") subparser.set_defaults(func=partial(self.update)) subparser.add_argument("product_id", type=int) subparser.add_argument("--name", default=DEFAULT) subparser.add_argument("--host_product_code", default=DEFAULT) subparser.add_argument("--tran_code_group_name", default=DEFAULT) subparser.add_argument("--hydra_product_code", default=DEFAULT) subparser.add_argument("--product_type_id", default=DEFAULT) subparser.add_argument("--product_nick_name", default=DEFAULT) subparser.add_argument("--voice-file", default=DEFAULT) subparser.add_argument("--vendor-name", type=str, default=DEFAULT) subparser.add_argument("--allow-open", action="store_true", default=DEFAULT) subparser.add_argument("--allow-close", action="store_true", default=DEFAULT) subparser = parser.add_parser("remove_product") subparser.set_defaults(parser="remove_product") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument("product_id") subparser = parser.add_parser("link_vendor_to_product") subparser.set_defaults(parser="link_vendor_to_product") subparser.set_defaults(func=partial(self.link_vendor_to_product)) subparser.add_argument("vendor_name", help="Q2_Vendors.VendorName") subparser.add_argument("product_id", help="Q2_Product.ProductID") subparser.add_argument("user_id", help="Q2_User.UserID")
[docs] async def get(self, serialize_for_cli=False) -> List[ProductRow]: response = await self.call_hq("sdk_GetProducts") if serialize_for_cli: response = self.serialize_for_cli( response, [ "ProductID", "ProductTypeID", "ProductName", "HostProductCode", "HostProductTypeCode", "ProductTypeName", "HydraProductCode", ], ) return response
[docs] async def get_list_by_name( self, product_name, product_type_name=None, host_product_code=None ) -> List[ProductRow]: full_response = await self.get() filtered_response = [ x for x in full_response if x.ProductName.text == product_name ] if product_type_name: filtered_response = [ x for x in filtered_response if x.ProductTypeName.text == product_type_name ] if host_product_code: filtered_response = [ x for x in filtered_response if x.HostProductCode.text == host_product_code ] return filtered_response
async def _get_vendor_id(self, vendor_name) -> int: db_obj = Vendor(self.logger, self.hq_credentials) try: vendor = await db_obj.get_by_name(vendor_name) except DatabaseDataError: return None return vendor.VendorID.pyval async def _get_host_tran_code_group_id(self, tran_code_group_name) -> int: db_obj = HostTranCodeGroup(self.logger, self.hq_credentials) response = await db_obj.get() tran_code_group_id = None for row in response: if row.Description.text == tran_code_group_name: tran_code_group_id = row.TranCodeGroupID.pyval break if not tran_code_group_id: raise DatabaseDataError( f"No HostTranCodeGroup with name {tran_code_group_name} exists" ) return tran_code_group_id async def _get_product_type_id(self, product_type_name): db_obj = ProductType(self.logger, self.hq_credentials) response = await db_obj.get_by_name(product_type_name) if response is None: raise DatabaseDataError( f"No ProductType with name {product_type_name} exists" ) return response.ProductTypeID.pyval
[docs] async def create( self, product_name: str, host_product_code: str, tran_code_group_name: str, product_type_name: str, hydra_product_code: str, product_nick_name: str | None = None, voice_file: str = None, vendor_name: str = None, product_type_id: Optional[int] = None, allow_open=False, allow_close=False, ): """ product_type_id may be passed in to disambiguate duplicate product_type_name entries """ tran_code_group_id = await self._get_host_tran_code_group_id( tran_code_group_name ) vendor_id = await self._get_vendor_id(vendor_name) if product_type_id is None: product_type_id = await self._get_product_type_id(product_type_name) parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "product_name", product_name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "host_product_code", host_product_code, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "tran_code_group_id", tran_code_group_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.SmallInt, "product_type_id", product_type_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Char, "hydra_product_code", hydra_product_code, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "product_nick_name", product_nick_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "voice_file", voice_file ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "vendor_id", vendor_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "allow_open", bool(int(allow_open)) ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "allow_close", bool(int(allow_close)), ), ] await self.call_hq( "sdk_AddProduct", sql_parameters=ExecuteStoredProcedure.SqlParameters(parameters), )
[docs] async def update( self, product_id: int, name: str = DEFAULT, host_product_code: str = DEFAULT, tran_code_group_name: int = DEFAULT, hydra_product_code: str = DEFAULT, product_type_id: int = DEFAULT, product_nick_name: str = DEFAULT, voice_file: str = DEFAULT, vendor_name: str = DEFAULT, allow_open: bool = DEFAULT, allow_close: bool = DEFAULT, ): """ update all values of an existing product for the given product_id. """ matching = [x for x in await self.get() if int(x.ProductID) == product_id] product = ProductRow() if not matching: raise DatabaseDataError(f"No product with id {product_id}") match = matching[0] product.ProductID = product_id product.ProductName = ( name if name is not DEFAULT else match.findtext("ProductName") ) product.HostProductCode = ( host_product_code if host_product_code is not DEFAULT else match.findtext("HostProductCode") ) product.HydraProductCode = ( hydra_product_code if hydra_product_code is not DEFAULT else match.findtext("HydraProductCode") ) product.ProductTypeID = ( product_type_id if product_type_id is not DEFAULT else match.findtext("ProductTypeID") ) product.ProductNickName = ( product_nick_name if product_nick_name is not DEFAULT else match.findtext("ProductNickName") ) product.VoiceFile = ( voice_file if voice_file is not DEFAULT else match.findtext("VoiceFile") ) product.VendorID = ( await self._get_vendor_id(vendor_name) if vendor_name is not DEFAULT else match.findtext("VendorID") ) product.AllowOpen = ( allow_open if allow_open is not DEFAULT else to_bool(match.findtext("AllowOpen")) ) product.AllowClose = ( allow_close if allow_close is not DEFAULT else to_bool(match.findtext("AllowClose")) ) if tran_code_group_name is not DEFAULT: product.TranCodeGroupID = await self._get_host_tran_code_group_id( tran_code_group_name ) else: product.TranCodeGroupID = match.findtext("TranCodeGroupID") parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "product_id", product.ProductID ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "product_name", product.ProductName, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "host_product_code", product.HostProductCode, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "tran_code_group_id", int(product.TranCodeGroupID), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.SmallInt, "product_type_id", int(product.ProductTypeID), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Char, "hydra_product_code", product.HydraProductCode, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "product_nick_name", product.ProductNickName, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "voice_file", product.VoiceFile ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "vendor_id", int(product.VendorID) if product.VendorID else None, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "allow_open", bool(int(product.AllowOpen)), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "allow_close", bool(int(product.AllowClose)), ), ] await self.call_hq( "sdk_UpdateProduct", sql_parameters=ExecuteStoredProcedure.SqlParameters(parameters), )
[docs] @dev_only async def delete(self, product_id: int): """Note: this only works in the dev environment""" await self.call_hq( "sdk_RemoveProduct", sql_parameters=ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "product_id", product_id ) ]), )