Source code for q2_sdk.hq.db.form

import logging
from dataclasses import dataclass
import json
import re
from argparse import _SubParsersAction
import asyncio
from collections import namedtuple
from functools import partial
from typing import Optional

from lxml.objectify import StringElement, IntElement, BoolElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.models.installers.form_params import (
    InstallParams,
    DbPlanRequirements,
    SendAccountDetailsOption,
)
from q2_sdk.models.installers.central_params import (
    InstallParams as CentralInstallParams,
)
from q2_sdk.core.cli.textui import query, puts_err
from .base import InstallerObj
from .representation_row_base import BaseFormRow
from .wedge_address import WedgeAddress
from ..hq_api.q2_api import GetHqVersion
from ..models.hq_credentials import HqCredentials
from ...models.version import Version

UiTextIds = namedtuple("UiTextIds", "demo display html")


[docs] class FormRow(BaseFormRow): AccountHydraProductCodes: StringElement = "AccountHydraProductCodes" AccountRightsBitFlag: IntElement = "AccountRightsBitFlag" AccountRightsIsExplicit: StringElement = "AccountRightsIsExplicit" Advanced: StringElement = "Advanced" Config: StringElement = "Config" FormID: IntElement = "FormID" CallWedgeBeforeRender: BoolElement = "CallWedgeBeforeRender" DemoHtmlUiTextElementID: IntElement = "DemoHtmlUiTextElementID" DisplayNameUiTextElementID: IntElement = "DisplayNameUiTextElementID" DisplayNameUiTextElementShortName: StringElement = ( "DisplayNameUiTextElementShortName" ) HtmlDataUiTextElementID: IntElement = "HtmlDataUiTextElementID" MessageRecipientGroupID: IntElement = "MessageRecipientGroupID" NewWindow: BoolElement = "NewWindow" ShortName: StringElement = "ShortName" Url: StringElement = "Url" WedgeAddressID: IntElement = "WedgeAddressID" WedgePathName: StringElement = "WedgePathName" PropertyID: IntElement = "PropertyID"
[docs] class CentralFormRow(BaseFormRow): FormID: IntElement = "FormID" ShortName: StringElement = "ShortName" Url: StringElement = "Url" Config: StringElement = "Config" Description: StringElement = "Description" WedgePathName: StringElement = "WedgePathName" CallWedgeBeforeRender: BoolElement = "CallWedgeBeforeRender" HtmlData: StringElement = "HtmlData" FormParentID: IntElement = "FormParentID" WedgeAddressTypeName: StringElement = "WedgeAddressTypeName" PropertyID: IntElement = "PropertyID"
[docs] @dataclass class GroupsWithContext: property_id: str groups: list[str] property_name: str | None property_long_name: str | None
async def _get_form_groups_with_context( logger: logging.Logger, hq_credentials: HqCredentials, property_id: str ) -> GroupsWithContext: from .user_property_data import UserPropertyData from .user_property_data_element import UserPropertyDataElement groups = [] property_name = None upd_obj = UserPropertyData(logger, hq_credentials, ret_table_obj=True) upde_obj = UserPropertyDataElement(logger, hq_credentials, ret_table_obj=True) upd_row = await upd_obj.get_by_group_with_id(property_id) upde_row = await upde_obj.get_by_id(int(property_id)) property_long_name = upde_row[0].findtext("PropertyLongName") for row in upd_row: if row.GroupID and row.PropertyValue == "True": groups.append(row.GroupID.text) property_name = row.findtext("PropertyName") return GroupsWithContext(property_id, groups, property_name, property_long_name) # for backwards compatibility
[docs] async def get_form_groups( logger: logging.Logger, hq_credentials: HqCredentials, property_id: str ) -> list[str]: return ( await _get_form_groups_with_context(logger, hq_credentials, property_id) ).groups
[docs] async def add_groups_to_forms( logger: logging.Logger, hq_credentials: HqCredentials, forms: list[BaseFormRow] ) -> list[BaseFormRow]: property_ids_to_fetch = set() try: hq_response = await GetHqVersion.execute( GetHqVersion.ParamsObj(logger, hq_credentials) ) hq_version = Version(hq_response.result_node.get("HqVersion")) except AttributeError: hq_version = None for form in forms: property_id = form.findtext("PropertyID") if property_id: property_ids_to_fetch.add(property_id) chunk_size = 50 responses: list[GroupsWithContext] = [] property_ids_to_fetch = list(property_ids_to_fetch) for i in range(0, len(property_ids_to_fetch), chunk_size): ids_to_run = property_ids_to_fetch[i : i + chunk_size] tasks = [ _get_form_groups_with_context(logger, hq_credentials, x) for x in ids_to_run ] responses.extend(await asyncio.gather(*tasks)) groups_by_property_id = {} for response in responses: groups_by_property_id[response.property_id] = response for form in forms: property_id = form.findtext("PropertyID") if property_id: groups = [] property_name = None response = groups_by_property_id.get(property_id) if response: groups = response.groups property_name = response.property_name property_long_name = response.property_long_name if groups and ( (hq_version and hq_version >= "4.5.0.6000") or not hasattr(form, "Groups") ): setattr(form, "Groups", ",".join(groups)) setattr(form, "PropertyName", property_name) setattr(form, "PropertyLongName", property_long_name) return forms
# for backwards compatibility
[docs] async def add_groups_to_form( logger: logging.Logger, hq_credentials: HqCredentials, form: BaseFormRow ): await add_groups_to_forms(logger, hq_credentials, [form])
[docs] class Form(InstallerObj): """ Allows for operations on Online form and Central forms. """ GET_BY_NAME_KEY = "ShortName" NAME = "Form" REPRESENTATION_ROW_CLASS = FormRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_forms") subparser.set_defaults(parser="get_forms") subparser.set_defaults(parser="get_forms") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("-f", "--form_id", help="Limit results to a single form") subparser.add_argument( "-g", "--with-groups", action="store_true", help="Limit results to a single form", ) subparser = parser.add_parser("get_central_forms") subparser.set_defaults(parser="get_central_forms") subparser.set_defaults(func=partial(self.get_central, serialize_for_cli=True)) subparser.add_argument("-f", "--form_id", help="Limit results to a single form") subparser = parser.add_parser("remove_form") subparser.set_defaults(parser="remove_form") subparser.set_defaults(func=partial(self.delete)) subparser.add_argument("short_name", help="Q2_Form.ShortName") subparser.add_argument("skip_wedge_address", help="Q2_Form.ShortName") subparser = parser.add_parser("update_form") subparser.set_defaults(parser="update_form") subparser.set_defaults(func=partial(self._update_from_cli)) subparser.add_argument("short_name", help="Q2_Form.ShortName") subparser = parser.add_parser("update_central_form") subparser.set_defaults(parser="update_central_form") subparser.set_defaults(func=partial(self._update_central_from_cli)) subparser.add_argument("short_name", help="Q2_Form.ShortName")
[docs] async def get( self, serialize_for_cli=False, form_id: Optional[int] = None, with_groups=False ) -> list[FormRow]: """ :param serialize_for_cli: Used when running from the command line :param form_id: Q2_Form.FormID :param with_groups: Include group associations (can be costly for large payloads) """ response = await self.call_hq("sdk_GetInstalledForms") detailed = False if form_id: detailed = True form_id = str(form_id) response = [x for x in response if x.findtext("FormID") == form_id] if with_groups: response = await add_groups_to_forms( self.logger, self.hq_credentials, response ) for row in response: row.IsCustomerCreated = self._is_customer_created(row) if serialize_for_cli: response = await self._serialize_get( response, with_groups, detailed=detailed ) return response
[docs] async def get_central( self, serialize_for_cli=False, form_id: Optional[int] = None ) -> list[CentralFormRow]: response = await self.call_hq("sdk_GetInstalledCentralForms") for row in response: row.IsCustomerCreated = self._is_customer_created(row) detailed = False if form_id: detailed = True form_id = str(form_id) response = [x for x in response if x.findtext("FormID") == form_id] if serialize_for_cli: response = self._serialize_central_get(response, detailed=detailed) return response
async def _serialize_get(self, response, with_groups: bool, detailed=False): if detailed: form = response[0] has_perms = "False" if form.findtext("HasUserProperties"): has_perms = "True" config = "None" if form.find("Config"): config = json.dumps( json.loads(form.Config.text), sort_keys=True, indent=4 ) account_rights = self._get_account_rights(form) form_groups = form.findtext("Groups") response_list = [ "FormID:\t{}".format(form.findtext("FormID")), "WedgeAddressID:\t{}".format(form.findtext("WedgeAddressID")), "Groups:\t{}".format(form_groups), "ShortName:\t{}".format(form.findtext("ShortName")), "CustomName:\t{}".format(form.findtext("CustomName", "None")), "WedgePathName:\t{}".format(form.findtext("WedgePathName")), "AccountRights:\t{}".format(account_rights), "SendAccountList:\t{}".format(form.findtext("SendAccountList")), "SendAccountDetails:\t{}".format(form.findtext("SendAccountDetails")), "WedgePayloadStoredProc:\t{}".format( form.findtext("WedgePayloadStoredProc", "None") ), "Navigation Node ID:\t{}".format( form.findtext("NavigationNodeID", "None") ), "HasUserProperties:\t{}".format(has_perms), "URL:\t{}".format(form.findtext("Url")), "IsCustomerCreated:\t{}".format(form.findtext("IsCustomerCreated")), "Config:\t\n{}".format(config), ] if with_groups: response = "\n".join(response_list) else: response_list.remove("Groups:\t{}".format(form_groups)) response = "\n".join(response_list) else: columns = [ "FormID", "ShortName", "CustomName", "NavigationNodeID", "Groups", "HasUserProperties", "IsCustomerCreated", ] if with_groups is False: columns.remove("Groups") response = self.serialize_for_cli(response, columns) return response @staticmethod def _get_account_rights(form): """ AccountRightsBitFlag exists on both WedgeAddress and Form tables in more recent database versions. This function makes us compatible either way. """ account_rights = form.findtext("AccountRightsBitFlag1") if not account_rights: account_rights = form.findtext("AccountRightsBitFlag") return account_rights def _serialize_central_get(self, response, detailed=False): if detailed: form = response[0] config = "None" if form.find("Config"): config = json.dumps( json.loads(form.Config.text), sort_keys=True, indent=4 ) response = "\n".join([ "FormID:\t{}".format(form.findtext("FormID")), "WedgeAddressID:\t{}".format(form.findtext("WedgeAddressID")), "Groups:\t{}".format(form.findtext("Groups")), "ShortName:\t{}".format(form.findtext("ShortName")), "CustomName:\t{}".format(form.findtext("NavigationName", "None")), "WedgePathName:\t{}".format(form.findtext("WedgePathName")), "AccountRights:\t{}".format(form.findtext("AccountRightsBitFlag")), "SendAccountList:\t{}".format(form.findtext("SendAccountList")), "SendAccountDetails:\t{}".format(form.findtext("SendAccountDetails")), "WedgePayloadStoredProc:\t{}".format( form.findtext("WedgePayloadStoredProc", "None") ), "Navigation Node ID:\t{}".format(form.findtext("NavigationID", "None")), "URL:\t{}".format(form.findtext("Url")), "IsCustomerCreated\t{}".format(form.findtext("IsCustomerCreated")), "Config:\t\n{}".format(config), ]) else: columns = [ "FormID", "ShortName", "NavigationName", "NavigationID", "Groups", "IsCustomerCreated", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def create(self, install_params: InstallParams) -> FormRow: if install_params.short_name: assert Form._is_shortname_valid(install_params.short_name), ( "short_name must only contain alpha-numeric characters, ., _, or -." ) await install_params.add_ui_text_elements(self.logger) result = await self.call_hq( "sdk_AddForm", _get_form_install_parameters(install_params) ) return result[0]
[docs] async def create_central( self, install_params: CentralInstallParams ) -> CentralFormRow: if install_params.short_name: assert Form._is_shortname_valid(install_params.short_name), ( "short_name must only contain alpha-numeric characters, ., _, or -." ) result = await self.call_hq( "sdk_AddCentralForm", _get_central_form_install_parameters(install_params) ) return result[0]
[docs] async def get_by_name(self, name, **kwargs) -> FormRow: serialize_for_cli = kwargs.get("serialize_for_cli", False) kwargs.pop("serialize_for_cli", None) all_response = await self.get(**kwargs) response = None # Because of the join against Nav tables, one form can show up multiple # times when searching by name. We just grab the first one for this call for row in all_response: if row.ShortName == name: response = row break if response is None: raise DatabaseDataError(f"No Form with ShortName: {name}") if serialize_for_cli: response = await self._serialize_get([response], True, detailed=True) return response
[docs] async def delete(self, short_name: str, skip_wedge_address=False): return await self.call_hq( "sdk_RemoveForm", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "short_name", short_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "skip_wedge_address", skip_wedge_address, ), ]), )
[docs] async def delete_central(self, short_name: str): return await self.call_hq( "sdk_RemoveCentralForm", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "short_name", short_name, ) ]), )
async def _update_from_cli(self, short_name: str): existing = await self.get_by_name(short_name) url = query("URL: ", existing.Url.text) config = existing.Config.text config = json.loads(config) account_rights_bit_flag = existing.AccountRightsBitFlag.text await self.update(short_name, url, config, account_rights_bit_flag)
[docs] async def update( self, short_name: str, url: str, config: dict, account_rights_bit_flag: int, db_plan_requirements: Optional[DbPlanRequirements] = None, property_id: Optional[int] = None, ): json_config = json.dumps(config) if db_plan_requirements is not None: account_rights_bit_flag = db_plan_requirements.account_rights_bit_flag if short_name: assert Form._is_shortname_valid(short_name), ( "short_name must only contain alpha-numeric characters, ., _, or -." ) if not db_plan_requirements: wedge_address_obj = WedgeAddress( self.logger, self.hq_credentials, ret_table_obj=True ) wedge_address_row = await wedge_address_obj.get(short_name=short_name) sql_parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "form_short_name", short_name ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "url", url ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "config", json_config ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "account_rights_bit_flag", account_rights_bit_flag, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "send_account_list", db_plan_requirements.send_account_list if db_plan_requirements else wedge_address_row[0].SendAccountList.pyval, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "account_details_enum", SendAccountDetailsOption.convert_input( db_plan_requirements.send_account_details if db_plan_requirements else wedge_address_row[0].SendAccountDetails.pyval ), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "wedge_payload_stored_proc", db_plan_requirements.wedge_payload_stored_proc if db_plan_requirements else None, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "property_id", property_id ), ] return await self.call_hq( "sdk_UpdateInstalledForm", ExecuteStoredProcedure.SqlParameters(sql_parameters), )
async def _update_central_from_cli(self, short_name: str): installed = await self.get_central() for form in installed: if form.WedgeAddressTypeName.text == short_name: existing = form break else: puts_err("Cannot find central form to update") exit(-1) url = query("URL: ", existing.Url.text) config = existing.Config.text if config is not None: config = json.loads(config) await self.update_central(short_name, url, config)
[docs] async def update_central( self, short_name: str, url: str, config: dict, db_plan_requirements: Optional[DbPlanRequirements] = None, ): json_config = json.dumps(config) if short_name: assert Form._is_shortname_valid(short_name), ( "short_name must only contain alpha-numeric characters, ., _, or -." ) sql_parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "wedge_address_type_name", short_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "url", url ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "config", json_config ), ] if db_plan_requirements: sql_parameters.extend([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "wedge_payload_stored_proc", db_plan_requirements.wedge_payload_stored_proc, ) ]) return await self.call_hq( "sdk_UpdateInstalledCentralForm", ExecuteStoredProcedure.SqlParameters(sql_parameters), )
@staticmethod def _is_shortname_valid(short_name: str): return bool(re.match(r"[-\w.]+$", short_name))
def _get_central_form_install_parameters(install_params: CentralInstallParams): html_data = install_params.html_data if install_params.html_data else None sql_parameters = ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "form_name", install_params.short_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "description", install_params.description, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.NVarChar, "html_data", html_data ), ]) return sql_parameters def _get_form_install_parameters(install_params: InstallParams): sql_parameters = ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "form_name", install_params.form_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "form_short_name", install_params.short_name, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "account_rights_bit_flags", install_params.account_rights_bit_flags, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Bit, "account_rights_is_explicit", install_params.account_rights_is_explicit, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "account_hydra_product_codes", install_params.account_hydra_product_codes, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "display_name_ui_text_element_id", str(install_params.display_name_ui_text_id), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "demo_html_ui_text_element_id", str(install_params.demo_html_ui_text_id), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "html_data_ui_text_element_id", str(install_params.html_data_ui_text_id), ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "advanced", install_params.advanced, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "property_id", install_params.property_id, ), ]) return sql_parameters