from argparse import _SubParsersAction
from functools import partial
from typing import List
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class HostTransactionHistoryDataRow(RepresentationRowBase):
# object name: type hinting = "column name in the db response"
HostTransactionHistoryDataID: IntElement = "HostTransactionHistoryDataID"
TransactionID: IntElement = "TransactionID"
HostTransactionHistoryDataElementID: IntElement = (
"HostTransactionHistoryDataElementID"
)
DataValue: StringElement = "DataValue"
ShortName: StringElement = "ShortName"
[docs]
class HostTransactionHistoryData(DbObject):
# GET_BY_NAME_KEY = "column in the db response"
NAME = "HostTransactionHistoryData"
REPRESENTATION_ROW_CLASS = HostTransactionHistoryDataRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_host_transaction_history_data")
subparser.set_defaults(parser="get")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument(
"transaction_id", type=int, help="Q2_HostTransactionHistory.TransactionID"
)
[docs]
async def get(
self, transaction_id: int, serialize_for_cli=False
) -> List[HostTransactionHistoryDataRow]:
response = await self.call_hq(
"sdk_GetHostTransactionHistoryData",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"transaction_id",
transaction_id,
)
]),
)
if serialize_for_cli:
columns = [
"HostTransactionHistoryDataID",
"TransactionID",
"HostTransactionHistoryDataElementID",
"DataValue",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def create(
self,
transaction_id: int,
data_element: str,
data_value: str,
data_type: str = None,
):
return await self.call_add_update(
transaction_id, data_element, data_value, data_type=data_type
)
[docs]
async def update(self, transaction_id: int, data_element: str, data_value: str):
return await self.call_add_update(transaction_id, data_element, data_value)
[docs]
async def call_add_update(
self,
transaction_id: int,
data_element: str,
data_value: str,
data_type: str = None,
):
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "transaction_id", transaction_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"history_data_element",
data_element,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "data_value", data_value
),
]
if data_type:
parameters.append(
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "data_type", data_type
)
)
response = await self.call_hq(
"sdk_AddUpdateHostTransactionHistoryData",
ExecuteStoredProcedure.SqlParameters(parameters),
)
return response
[docs]
async def delete(self, transaction_id: int, data_element: str):
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "transaction_id", transaction_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"history_data_element",
data_element,
),
]
response = await self.call_hq(
"sdk_RemoveHostTransactionHistoryData",
ExecuteStoredProcedure.SqlParameters(parameters),
)
return response