Source code for q2_sdk.hq.db.form
import logging
from dataclasses import dataclass
import json
import re
from argparse import _SubParsersAction
import asyncio
from collections import namedtuple
from functools import partial
from typing import Optional
from lxml.objectify import StringElement, IntElement, BoolElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.models.installers.form_params import (
InstallParams,
DbPlanRequirements,
SendAccountDetailsOption,
)
from q2_sdk.models.installers.central_params import (
InstallParams as CentralInstallParams,
)
from q2_sdk.core.cli.textui import query, puts_err
from .base import InstallerObj
from .representation_row_base import BaseFormRow
from .wedge_address import WedgeAddress
from ..hq_api.q2_api import GetHqVersion
from ..models.hq_credentials import HqCredentials
from ...models.version import Version
UiTextIds = namedtuple("UiTextIds", "demo display html")
[docs]
class FormRow(BaseFormRow):
AccountHydraProductCodes: StringElement = "AccountHydraProductCodes"
AccountRightsBitFlag: IntElement = "AccountRightsBitFlag"
AccountRightsIsExplicit: StringElement = "AccountRightsIsExplicit"
Advanced: StringElement = "Advanced"
Config: StringElement = "Config"
FormID: IntElement = "FormID"
CallWedgeBeforeRender: BoolElement = "CallWedgeBeforeRender"
DemoHtmlUiTextElementID: IntElement = "DemoHtmlUiTextElementID"
DisplayNameUiTextElementID: IntElement = "DisplayNameUiTextElementID"
DisplayNameUiTextElementShortName: StringElement = (
"DisplayNameUiTextElementShortName"
)
HtmlDataUiTextElementID: IntElement = "HtmlDataUiTextElementID"
MessageRecipientGroupID: IntElement = "MessageRecipientGroupID"
NewWindow: BoolElement = "NewWindow"
ShortName: StringElement = "ShortName"
Url: StringElement = "Url"
WedgeAddressID: IntElement = "WedgeAddressID"
WedgePathName: StringElement = "WedgePathName"
PropertyID: IntElement = "PropertyID"
[docs]
class CentralFormRow(BaseFormRow):
FormID: IntElement = "FormID"
ShortName: StringElement = "ShortName"
Url: StringElement = "Url"
Config: StringElement = "Config"
Description: StringElement = "Description"
WedgePathName: StringElement = "WedgePathName"
CallWedgeBeforeRender: BoolElement = "CallWedgeBeforeRender"
HtmlData: StringElement = "HtmlData"
FormParentID: IntElement = "FormParentID"
WedgeAddressTypeName: StringElement = "WedgeAddressTypeName"
PropertyID: IntElement = "PropertyID"
[docs]
@dataclass
class GroupsWithContext:
property_id: str
groups: list[str]
property_name: str | None
property_long_name: str | None
async def _get_form_groups_with_context(
logger: logging.Logger, hq_credentials: HqCredentials, property_id: str
) -> GroupsWithContext:
from .user_property_data import UserPropertyData
from .user_property_data_element import UserPropertyDataElement
groups = []
property_name = None
upd_obj = UserPropertyData(logger, hq_credentials, ret_table_obj=True)
upde_obj = UserPropertyDataElement(logger, hq_credentials, ret_table_obj=True)
upd_row = await upd_obj.get_by_group_with_id(property_id)
upde_row = await upde_obj.get_by_id(int(property_id))
property_long_name = upde_row[0].findtext("PropertyLongName")
for row in upd_row:
if row.GroupID and row.PropertyValue == "True":
groups.append(row.GroupID.text)
property_name = row.findtext("PropertyName")
return GroupsWithContext(property_id, groups, property_name, property_long_name)
# for backwards compatibility
[docs]
async def get_form_groups(
logger: logging.Logger, hq_credentials: HqCredentials, property_id: str
) -> list[str]:
return (
await _get_form_groups_with_context(logger, hq_credentials, property_id)
).groups
[docs]
async def add_groups_to_forms(
logger: logging.Logger, hq_credentials: HqCredentials, forms: list[BaseFormRow]
) -> list[BaseFormRow]:
property_ids_to_fetch = set()
try:
hq_response = await GetHqVersion.execute(
GetHqVersion.ParamsObj(logger, hq_credentials)
)
hq_version = Version(hq_response.result_node.get("HqVersion"))
except AttributeError:
hq_version = None
for form in forms:
property_id = form.findtext("PropertyID")
if property_id:
property_ids_to_fetch.add(property_id)
chunk_size = 50
responses: list[GroupsWithContext] = []
property_ids_to_fetch = list(property_ids_to_fetch)
for i in range(0, len(property_ids_to_fetch), chunk_size):
ids_to_run = property_ids_to_fetch[i : i + chunk_size]
tasks = [
_get_form_groups_with_context(logger, hq_credentials, x) for x in ids_to_run
]
responses.extend(await asyncio.gather(*tasks))
groups_by_property_id = {}
for response in responses:
groups_by_property_id[response.property_id] = response
for form in forms:
property_id = form.findtext("PropertyID")
if property_id:
groups = []
property_name = None
response = groups_by_property_id.get(property_id)
if response:
groups = response.groups
property_name = response.property_name
property_long_name = response.property_long_name
if groups and (
(hq_version and hq_version >= "4.5.0.6000")
or not hasattr(form, "Groups")
):
setattr(form, "Groups", ",".join(groups))
setattr(form, "PropertyName", property_name)
setattr(form, "PropertyLongName", property_long_name)
return forms
# for backwards compatibility
[docs]
async def add_groups_to_form(
logger: logging.Logger, hq_credentials: HqCredentials, form: BaseFormRow
):
await add_groups_to_forms(logger, hq_credentials, [form])
[docs]
class Form(InstallerObj):
"""
Allows for operations on Online form and Central forms.
"""
GET_BY_NAME_KEY = "ShortName"
NAME = "Form"
REPRESENTATION_ROW_CLASS = FormRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_forms")
subparser.set_defaults(parser="get_forms")
subparser.set_defaults(parser="get_forms")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument("-f", "--form_id", help="Limit results to a single form")
subparser.add_argument(
"-g",
"--with-groups",
action="store_true",
help="Limit results to a single form",
)
subparser = parser.add_parser("get_central_forms")
subparser.set_defaults(parser="get_central_forms")
subparser.set_defaults(func=partial(self.get_central, serialize_for_cli=True))
subparser.add_argument("-f", "--form_id", help="Limit results to a single form")
subparser = parser.add_parser("remove_form")
subparser.set_defaults(parser="remove_form")
subparser.set_defaults(func=partial(self.delete))
subparser.add_argument("short_name", help="Q2_Form.ShortName")
subparser.add_argument("skip_wedge_address", help="Q2_Form.ShortName")
subparser = parser.add_parser("update_form")
subparser.set_defaults(parser="update_form")
subparser.set_defaults(func=partial(self._update_from_cli))
subparser.add_argument("short_name", help="Q2_Form.ShortName")
subparser = parser.add_parser("update_central_form")
subparser.set_defaults(parser="update_central_form")
subparser.set_defaults(func=partial(self._update_central_from_cli))
subparser.add_argument("short_name", help="Q2_Form.ShortName")
[docs]
async def get(
self, serialize_for_cli=False, form_id: Optional[int] = None, with_groups=False
) -> list[FormRow]:
"""
:param serialize_for_cli: Used when running from the command line
:param form_id: Q2_Form.FormID
:param with_groups: Include group associations (can be costly for large payloads)
"""
response = await self.call_hq("sdk_GetInstalledForms")
detailed = False
if form_id:
detailed = True
form_id = str(form_id)
response = [x for x in response if x.findtext("FormID") == form_id]
if with_groups:
response = await add_groups_to_forms(
self.logger, self.hq_credentials, response
)
for row in response:
row.IsCustomerCreated = self._is_customer_created(row)
if serialize_for_cli:
response = await self._serialize_get(
response, with_groups, detailed=detailed
)
return response
[docs]
async def get_central(
self, serialize_for_cli=False, form_id: Optional[int] = None
) -> list[CentralFormRow]:
response = await self.call_hq("sdk_GetInstalledCentralForms")
for row in response:
row.IsCustomerCreated = self._is_customer_created(row)
detailed = False
if form_id:
detailed = True
form_id = str(form_id)
response = [x for x in response if x.findtext("FormID") == form_id]
if serialize_for_cli:
response = self._serialize_central_get(response, detailed=detailed)
return response
async def _serialize_get(self, response, with_groups: bool, detailed=False):
if detailed:
form = response[0]
has_perms = "False"
if form.findtext("HasUserProperties"):
has_perms = "True"
config = "None"
if form.find("Config"):
config = json.dumps(
json.loads(form.Config.text), sort_keys=True, indent=4
)
account_rights = self._get_account_rights(form)
form_groups = form.findtext("Groups")
response_list = [
"FormID:\t{}".format(form.findtext("FormID")),
"WedgeAddressID:\t{}".format(form.findtext("WedgeAddressID")),
"Groups:\t{}".format(form_groups),
"ShortName:\t{}".format(form.findtext("ShortName")),
"CustomName:\t{}".format(form.findtext("CustomName", "None")),
"WedgePathName:\t{}".format(form.findtext("WedgePathName")),
"AccountRights:\t{}".format(account_rights),
"SendAccountList:\t{}".format(form.findtext("SendAccountList")),
"SendAccountDetails:\t{}".format(form.findtext("SendAccountDetails")),
"WedgePayloadStoredProc:\t{}".format(
form.findtext("WedgePayloadStoredProc", "None")
),
"Navigation Node ID:\t{}".format(
form.findtext("NavigationNodeID", "None")
),
"HasUserProperties:\t{}".format(has_perms),
"URL:\t{}".format(form.findtext("Url")),
"IsCustomerCreated:\t{}".format(form.findtext("IsCustomerCreated")),
"Config:\t\n{}".format(config),
]
if with_groups:
response = "\n".join(response_list)
else:
response_list.remove("Groups:\t{}".format(form_groups))
response = "\n".join(response_list)
else:
columns = [
"FormID",
"ShortName",
"CustomName",
"NavigationNodeID",
"Groups",
"HasUserProperties",
"IsCustomerCreated",
]
if with_groups is False:
columns.remove("Groups")
response = self.serialize_for_cli(response, columns)
return response
@staticmethod
def _get_account_rights(form):
"""
AccountRightsBitFlag exists on both WedgeAddress and Form tables
in more recent database versions. This function makes us compatible
either way.
"""
account_rights = form.findtext("AccountRightsBitFlag1")
if not account_rights:
account_rights = form.findtext("AccountRightsBitFlag")
return account_rights
def _serialize_central_get(self, response, detailed=False):
if detailed:
form = response[0]
config = "None"
if form.find("Config"):
config = json.dumps(
json.loads(form.Config.text), sort_keys=True, indent=4
)
response = "\n".join([
"FormID:\t{}".format(form.findtext("FormID")),
"WedgeAddressID:\t{}".format(form.findtext("WedgeAddressID")),
"Groups:\t{}".format(form.findtext("Groups")),
"ShortName:\t{}".format(form.findtext("ShortName")),
"CustomName:\t{}".format(form.findtext("NavigationName", "None")),
"WedgePathName:\t{}".format(form.findtext("WedgePathName")),
"AccountRights:\t{}".format(form.findtext("AccountRightsBitFlag")),
"SendAccountList:\t{}".format(form.findtext("SendAccountList")),
"SendAccountDetails:\t{}".format(form.findtext("SendAccountDetails")),
"WedgePayloadStoredProc:\t{}".format(
form.findtext("WedgePayloadStoredProc", "None")
),
"Navigation Node ID:\t{}".format(form.findtext("NavigationID", "None")),
"URL:\t{}".format(form.findtext("Url")),
"IsCustomerCreated\t{}".format(form.findtext("IsCustomerCreated")),
"Config:\t\n{}".format(config),
])
else:
columns = [
"FormID",
"ShortName",
"NavigationName",
"NavigationID",
"Groups",
"IsCustomerCreated",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def create(self, install_params: InstallParams) -> FormRow:
if install_params.short_name:
assert Form._is_shortname_valid(install_params.short_name), (
"short_name must only contain alpha-numeric characters, ., _, or -."
)
await install_params.add_ui_text_elements(self.logger)
result = await self.call_hq(
"sdk_AddForm", _get_form_install_parameters(install_params)
)
return result[0]
[docs]
async def create_central(
self, install_params: CentralInstallParams
) -> CentralFormRow:
if install_params.short_name:
assert Form._is_shortname_valid(install_params.short_name), (
"short_name must only contain alpha-numeric characters, ., _, or -."
)
result = await self.call_hq(
"sdk_AddCentralForm", _get_central_form_install_parameters(install_params)
)
return result[0]
[docs]
async def get_by_name(self, name, **kwargs) -> FormRow:
serialize_for_cli = kwargs.get("serialize_for_cli", False)
kwargs.pop("serialize_for_cli", None)
all_response = await self.get(**kwargs)
response = None
# Because of the join against Nav tables, one form can show up multiple
# times when searching by name. We just grab the first one for this call
for row in all_response:
if row.ShortName == name:
response = row
break
if response is None:
raise DatabaseDataError(f"No Form with ShortName: {name}")
if serialize_for_cli:
response = await self._serialize_get([response], True, detailed=True)
return response
[docs]
async def delete(self, short_name: str, skip_wedge_address=False):
return await self.call_hq(
"sdk_RemoveForm",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit,
"skip_wedge_address",
skip_wedge_address,
),
]),
)
[docs]
async def delete_central(self, short_name: str):
return await self.call_hq(
"sdk_RemoveCentralForm",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"short_name",
short_name,
)
]),
)
async def _update_from_cli(self, short_name: str):
existing = await self.get_by_name(short_name)
url = query("URL: ", existing.Url.text)
config = existing.Config.text
config = json.loads(config)
account_rights_bit_flag = existing.AccountRightsBitFlag.text
await self.update(short_name, url, config, account_rights_bit_flag)
[docs]
async def update(
self,
short_name: str,
url: str,
config: dict,
account_rights_bit_flag: int,
db_plan_requirements: Optional[DbPlanRequirements] = None,
property_id: Optional[int] = None,
):
json_config = json.dumps(config)
if db_plan_requirements is not None:
account_rights_bit_flag = db_plan_requirements.account_rights_bit_flag
if short_name:
assert Form._is_shortname_valid(short_name), (
"short_name must only contain alpha-numeric characters, ., _, or -."
)
if not db_plan_requirements:
wedge_address_obj = WedgeAddress(
self.logger, self.hq_credentials, ret_table_obj=True
)
wedge_address_row = await wedge_address_obj.get(short_name=short_name)
sql_parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "form_short_name", short_name
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "url", url
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "config", json_config
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"account_rights_bit_flag",
account_rights_bit_flag,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit,
"send_account_list",
db_plan_requirements.send_account_list
if db_plan_requirements
else wedge_address_row[0].SendAccountList.pyval,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"account_details_enum",
SendAccountDetailsOption.convert_input(
db_plan_requirements.send_account_details
if db_plan_requirements
else wedge_address_row[0].SendAccountDetails.pyval
),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wedge_payload_stored_proc",
db_plan_requirements.wedge_payload_stored_proc
if db_plan_requirements
else None,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "property_id", property_id
),
]
return await self.call_hq(
"sdk_UpdateInstalledForm",
ExecuteStoredProcedure.SqlParameters(sql_parameters),
)
async def _update_central_from_cli(self, short_name: str):
installed = await self.get_central()
for form in installed:
if form.WedgeAddressTypeName.text == short_name:
existing = form
break
else:
puts_err("Cannot find central form to update")
exit(-1)
url = query("URL: ", existing.Url.text)
config = existing.Config.text
if config is not None:
config = json.loads(config)
await self.update_central(short_name, url, config)
[docs]
async def update_central(
self,
short_name: str,
url: str,
config: dict,
db_plan_requirements: Optional[DbPlanRequirements] = None,
):
json_config = json.dumps(config)
if short_name:
assert Form._is_shortname_valid(short_name), (
"short_name must only contain alpha-numeric characters, ., _, or -."
)
sql_parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wedge_address_type_name",
short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "url", url
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "config", json_config
),
]
if db_plan_requirements:
sql_parameters.extend([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"wedge_payload_stored_proc",
db_plan_requirements.wedge_payload_stored_proc,
)
])
return await self.call_hq(
"sdk_UpdateInstalledCentralForm",
ExecuteStoredProcedure.SqlParameters(sql_parameters),
)
@staticmethod
def _is_shortname_valid(short_name: str):
return bool(re.match(r"[-\w.]+$", short_name))
def _get_central_form_install_parameters(install_params: CentralInstallParams):
html_data = install_params.html_data if install_params.html_data else None
sql_parameters = ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"form_name",
install_params.short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"description",
install_params.description,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.NVarChar, "html_data", html_data
),
])
return sql_parameters
def _get_form_install_parameters(install_params: InstallParams):
sql_parameters = ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"form_name",
install_params.form_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"form_short_name",
install_params.short_name,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"account_rights_bit_flags",
install_params.account_rights_bit_flags,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Bit,
"account_rights_is_explicit",
install_params.account_rights_is_explicit,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"account_hydra_product_codes",
install_params.account_hydra_product_codes,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"display_name_ui_text_element_id",
str(install_params.display_name_ui_text_id),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"demo_html_ui_text_element_id",
str(install_params.demo_html_ui_text_id),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"html_data_ui_text_element_id",
str(install_params.html_data_ui_text_id),
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"advanced",
install_params.advanced,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"property_id",
install_params.property_id,
),
])
return sql_parameters