Source code for q2_sdk.hq.db.enrolled_entity

import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Union

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq import api_helpers
from q2_sdk.hq.db.address_type import AddressType
from q2_sdk.hq.db.disclaimer import Disclaimer
from q2_sdk.hq.db.email import Email
from q2_sdk.hq.db.linked_account import LinkedAccount
from q2_sdk.hq.db.sso_user_logon import SsoUserLogon
from q2_sdk.hq.db.user_demographic_data import PhoneResults, UserDemographicData
from q2_sdk.hq.db.user_logon import UserLogon
from q2_sdk.hq.exceptions import MissingStoredProcError
from q2_sdk.hq.hq_api.q2_api import AddCustomer, AddUser, AddUserLogon
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.models.demographic import (
    Address,
    DemographicInfo,
    Phone,
    Email as EmailDemoObj,
)
from q2_sdk.models.demographic import AddressType as AddressTypes

from .db_object import DbObject

D_TYPES = ExecuteStoredProcedure.DataType


[docs] class EnrollmentException(Exception): """Enrollment Exception handling class""" def __init__(self, message, number, detail): super().__init__() self.message = message self.number = number self.details = detail
[docs] class CustomerExistsError(EnrollmentException): """Customer already exists in Q2's database"""
[docs] class UserExistsError(EnrollmentException): """User already exists in Q2's database"""
[docs] class LoginNameExistsError(EnrollmentException): """Login Name already exists in Q2's database"""
[docs] @dataclass class CreateCustomerParams: """Parameters needed to create a customer Note: grab_customer_data can be utilized to build the cust_info parameter""" cust_info: DemographicInfo group_id: int is_company: bool host_user: str host_pwd: str
[docs] @dataclass class CreateCustomerResult: """Expected response object for create customer function""" success: bool = False reason: Optional[str] = None customer_id: Optional[str] = None
[docs] @dataclass class CreateCustomerDataParams: """Parameters needed to create customer data""" customer_id: int name: str value: str
[docs] @dataclass class CreateUserParams: """Parameters needed to create a user""" user_info: DemographicInfo customer_id: int host_user: str host_pwd: str null_primary_cif: bool = False
[docs] @dataclass class CreateUserResult: """Expected response object for create user function""" success: bool = False reason: Optional[str] = None user_id: Optional[str] = None
[docs] @dataclass class CreateUserLogonParams: """Parameters needed to create a logon""" user_id: int logon_name: str user_logon_obj: UserLogon password: Optional[str] = None require_password_reset: bool = True group_id_for_zone_check: Optional[int] = None
[docs] @dataclass class CreateUserLogonResponse: """Expected response object for create user logon function""" logon_error: str = "" password: str = "" user_logon_id: Optional[int] = None
[docs] @dataclass class GrabCustomerDataParams: """Parameters needed to create a customer demographic info object""" customer_dict: dict user_dict: dict user_demographic_obj: Optional[UserDemographicData] = None
[docs] @dataclass class GetPhoneParams: """Parameters needed to retrieve and validate phone information from a customer and user dictionary""" customer_dict: dict user_dict: dict user_demographic_obj: Optional[UserDemographicData] = None
[docs] @dataclass class CheckForExistingParams: """Parameters needed to check if customer, user, or logon exists""" login_name: str primary_cif: str ssn: str = "" tax_id: str = "" skip_customer_tax_check: bool = False skip_customer_cif_check: bool = False skip_user_check: bool = False skip_existing_customer_check: bool = False group_id: Optional[int] = None def __post_init__(self): if self.skip_existing_customer_check: self.skip_customer_tax_check = True self.skip_customer_cif_check = True
[docs] @dataclass class UserDemoObjectValues: """Parameters needed to create user demographic object""" additional_user_info: dict emails: list phone_numbers: list[Phone] addresses: list[Address] ssn: int primary_cif: int dob: Optional[datetime] = None email_is_sac_target: Optional[bool] = None email_objects: Optional[list[EmailDemoObj]] = None
[docs] @dataclass class LinkAccountsViaCifParams: """Parameters needed to link accounts via primary cif""" primary_only: bool primary_cif: str customer_id: int user_id: int
[docs] @dataclass class SsoIdentifierResponse: """Expected response object for sso enrollment function""" duplicate_exists: bool = False sso_result_error: bool = False
[docs] class EnrolledEntity(DbObject): """Allows you to create a customer, user and logon in the Q2 Platform. Additional methods have been provided to assist you with the enrollment process""" NAME = "EnrolledEntity"
[docs] async def create_customer( self, params: CreateCustomerParams ) -> CreateCustomerResult: """Creates a Customer in Q2 db and returns the id""" # attempt to create a customer. If unsuccessful, provide a reason error_message = "Customer creation failed" cust_attribute_error = CreateCustomerResult() try: create_cust = await self._create_customer(params) if not create_cust.success: self.logger.debug("%s: %s", error_message, str(create_cust.reason)) return create_cust except AttributeError: self.logger.debug(error_message) cust_attribute_error.reason = error_message return cust_attribute_error # if the customer was successfully created, provide the customer ID self.logger.debug("Customer ID: %s", str(create_cust.customer_id)) create_cust.success = True return create_cust
async def _create_customer( self, params: CreateCustomerParams ) -> CreateCustomerResult: # initialize response dataclass results = CreateCustomerResult() # attempting to create a new customer self.logger.info("Creating Customer") xml_payload = api_helpers.build_add_customer_xml( params.cust_info, params.group_id, params.is_company, host_user=params.host_user, host_pwd=params.host_pwd, allow_other_address_types=True, ) self.logger.info("customer_demo_info: %s", params.cust_info) params_obj = AddCustomer.ParamsObj( self.logger, xml_payload, hq_credentials=self.hq_credentials ) response = await AddCustomer.execute(params_obj) # if customer creation was unsuccessful provide a reason otherwise if successful, store the Customer ID if response.error_message: if "It has been marked as deleted." in response.error_message: results.reason = "Provided Group ID has been marked as deleted" elif "FK_Q2_Customer_Q2_Group" in response.error_message: results.reason = "Provided Group ID may not exist" else: self.logger.exception(response.error_message) else: results.customer_id = ( response.result_node.Data.AddCustomerResults.row.CustomerID.pyval ) results.success = True self.logger.info("Customer Id: %s", results.customer_id) return results
[docs] async def create_customer_data(self, params: CreateCustomerDataParams): """Sets CustomerData in our DB to the key|value pairs passed in in the format\ [{"Name":"DataName", "Value": "DataValue"}]""" self.logger.info("create_customer_data") sql_params = [] Param(params.name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params) Param(str(params.customer_id), D_TYPES.Int, "customerID").add_to_param_list( sql_params ) Param(params.value, D_TYPES.VarChar, "data").add_to_param_list(sql_params) params_obj = ExecuteStoredProcedure.ParamsObj( logger=self.logger, stored_proc_short_name="Q2_AddUpdateCustomerData", sql_parameters=ExecuteStoredProcedure.SqlParameters(sql_params), hq_credentials=self.hq_credentials, ) return await ExecuteStoredProcedure.execute(params_obj)
[docs] async def create_user(self, params: CreateUserParams) -> CreateUserResult: """Creates a User in Q2 db and returns the id""" # attempt to create the user results = CreateUserResult() self.logger.info("Creating User") xml_payload = api_helpers.build_add_user_xml( params.user_info, params.customer_id, host_user=params.host_user, host_pwd=params.host_pwd, allow_other_address_types=True, null_primary_cif=params.null_primary_cif, ) params_obj = AddUser.ParamsObj( self.logger, xml_payload, hq_credentials=self.hq_credentials ) response = await AddUser.execute(params_obj) # provide reason if user creation was unsuccessful if response.error_message and "Q2_PhoneNumber" in response.error_message: results.reason = "Provided phone number is invalid" return results if not response.success: results.reason = response.error_message return results # otherwise, provide user id user_id = response.result_node.Data.AddUserResults.row.UserID.pyval results.user_id = user_id self.logger.info("User Id: %s", user_id) results.success = True return results
[docs] async def create_user_data(self, userdata: Union[dict, list[dict]], user_id: int): """Sets UserData in our DB to the key|value pairs passed in in the format {"key":"value"}""" hq_calls = [] if isinstance(userdata, list): temp = {} for each in userdata: for key, value in each.items(): temp[key] = value userdata = temp for key, value in userdata.items(): hq_calls.append( self._create_user_data(user_id=user_id, name=key, value=value) ) all_user_data = await asyncio.gather(*hq_calls) self.logger.debug("all_user_data: %s", all_user_data)
async def _create_user_data(self, user_id: int, name: str, value: str): self.logger.info("create_user_data") # The first use case of this requires we insert userdata with a UserNumber inside of the value. Let's check the value # and do a replacement if we need to. if isinstance(value, str): value = value.replace("{UserID}", str(user_id)) sql_params = [] Param(name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params) Param(str(user_id), D_TYPES.Int, "userID").add_to_param_list(sql_params) Param(value, D_TYPES.VarChar, "data").add_to_param_list(sql_params) params_obj = ExecuteStoredProcedure.ParamsObj( logger=self.logger, stored_proc_short_name="Q2_AddUpdateUserData", sql_parameters=ExecuteStoredProcedure.SqlParameters(sql_params), hq_credentials=self.hq_credentials, ) await ExecuteStoredProcedure.execute(params_obj)
[docs] async def create_user_logon( self, params: CreateUserLogonParams ) -> CreateUserLogonResponse: """Creates a UserLogon in Q2 db and returns the password""" self.logger.info("Creating UserLogon") # if a password was not provided, generate it if not params.password: params.password = await params.user_logon_obj.generate_password( group_id=params.group_id_for_zone_check ) # creating the user logon and raising an error if unsuccessful xml_payload = api_helpers.build_add_user_logon_xml( params.user_id, params.logon_name, params.password ) params_obj = AddUserLogon.ParamsObj( self.logger, xml_payload, hq_credentials=self.hq_credentials ) response = await AddUserLogon.execute(params_obj) logon_error = False user_logon_id = None try: user_logon_id = ( response.result_node.Data.AddUserLogonResults.row.UserLogonID.pyval ) self.logger.info("UserLogon Id: %s", user_logon_id) if not params.require_password_reset: await params.user_logon_obj.set_password_status_as_accepted( params.logon_name ) except Exception: self.logger.info("UserLogon error: %s", response.error_message) logon_error = response.error_message return CreateUserLogonResponse(logon_error, params.password, user_logon_id)
[docs] async def enroll_sso_identifier( self, user_logon_id: int, sso_identifier: str ) -> SsoIdentifierResponse: """Enrolling sso information given an sso identifier and logon id""" # creating a SSO User Logon Object sso_obj = SsoUserLogon(self.logger, hq_credentials=self.hq_credentials) duplicate_error = "" sso_result_error = "" try: # If a sso user logon already exists, raise an error duplicate = await sso_obj.get(sso_identifier) if duplicate: self.logger.error("Duplicate SSO Identifier found") duplicate_error = "SSOIdentifier provided already assigned to a user" else: # Otherwise, create an sso user logon sso_result = await sso_obj.create( user_logon_id=user_logon_id, sso_identifier=sso_identifier ) self.logger.debug("ssoResult: %s", sso_result) except MissingStoredProcError: self.logger.error("ssoResult ERROR") sso_result_error = "SSOIdentifier passed in, but current platform version doesn't support it" return SsoIdentifierResponse(duplicate_error, sso_result_error)
[docs] async def check_disclaimers( self, disclaimers_info: list[str], user_id: int ) -> bool: """Marking disclaimer as accepted. Prevents the user from having to accept them on first login""" # creating a disclaimer object disclaimer_obj = Disclaimer( self.logger, hq_credentials=self.hq_credentials, ret_table_obj=True ) try: # attempt to mark provided disclaimer information as accepted, raise an error in unsuccessful await disclaimer_obj.accept_for_user_id(user_id, disclaimers_info) except DatabaseDataError: self.logger.error("***** Error accepting disclaimers *****") return False return True
[docs] async def check_db_for_existing(self, params: CheckForExistingParams): """ If Customer, User, or UserLogon already exists, raise an exception. Otherwise, just return None """ # building a parameter object to prepare for API call self.logger.info("hq_credentials: %s", self.hq_credentials) sql_params = self._get_check_for_existing_params(params) params_obj = ExecuteStoredProcedure.ParamsObj( self.logger, "sdk_API_check_duplicates", sql_params, hq_credentials=self.hq_credentials, ) # executing the call and storing information in result result = await ExecuteStoredProcedure.execute(params_obj) self.logger.info("result: %s", result) debug_data = result.result_node.Data.findtext(".//debug_data") # verifying if user, customer, or user logon already exists and raising # an error if True if not params.skip_customer_cif_check and result.result_node.Data.find( ".//customer_exists_cif" ): self.logger.warning("Customer cif already exists") return CustomerExistsError("Customer already exists.", 1, debug_data) if not params.skip_customer_tax_check and result.result_node.Data.find( ".//customer_exists_tax" ): self.logger.warning("Customer tax id already exists") return CustomerExistsError("Customer already exists.", 1, debug_data) if not params.skip_user_check and result.result_node.Data.find( ".//user_exists" ): self.logger.warning("User already exists") return UserExistsError("User already exists.", 2, debug_data) if result.result_node.Data.find(".//login_name_exists"): self.logger.warning("Login Name already exists") return LoginNameExistsError("Logon name already exists.", 3, debug_data) return
@staticmethod def _get_check_for_existing_params(params: CheckForExistingParams): parameters = [] sql_params = [ Param(params.login_name, D_TYPES.Text, "login_name"), Param(params.primary_cif, D_TYPES.Text, "primary_cif"), Param(params.tax_id, D_TYPES.Text, "tax_id"), Param(params.ssn, D_TYPES.Text, "ssn"), ] if params.group_id: sql_params.append(Param(params.group_id, D_TYPES.Int, "group_id")) for item in sql_params: item.add_to_param_list(parameters) return ExecuteStoredProcedure.SqlParameters(parameters)
[docs] def build_user_demo_object( self, user_values: UserDemoObjectValues ) -> DemographicInfo: """Creating Demographic Info object for User. Needed to created user""" # creating a dictionary containing additional information about the user optional_info = { "DOB": user_values.dob, "MothersMaiden": None, "MiddleName": "", "Title": None, "DriversLicense": None, "UserInfo": None, "UserRoleID": None, } # Setting optional_info dictionary keys to provided values for key in optional_info: if key in user_values.additional_user_info: optional_info[key] = user_values.additional_user_info[key] emails = [] if user_values.emails: emails = [x.strip() for x in user_values.emails] # creating and populating demographic info object based on retrieved information demographic_info = DemographicInfo( optional_info["DOB"], emails, user_values.phone_numbers, user_values.addresses, user_values.additional_user_info["FirstName"], user_values.additional_user_info["LastName"], user_values.ssn, optional_info["MothersMaiden"], optional_info["MiddleName"], optional_info["Title"], optional_info["DriversLicense"], optional_info["UserInfo"], user_values.primary_cif, user_role_id=optional_info["UserRoleID"], email_is_sac_target=user_values.email_is_sac_target, ) demographic_info.email_objects = user_values.email_objects return demographic_info
[docs] async def grab_customer_data( self, params: GrabCustomerDataParams, cache=None ) -> tuple[list, DemographicInfo]: """Creating Demographic Info object for Customer. Required to created Customer Note: invalid phone information also gets returned""" # retrieving customer information from dictionary home_type = AddressTypes.HOME try: customer_name = params.customer_dict["Name"] tax_id = params.customer_dict.get("TaxID", "") address_1 = params.customer_dict.get("Address1") address_2 = params.customer_dict.get("Address2") address_type = params.customer_dict.get("AddressType", home_type) # setting address type default to "Home" if not provided try: await AddressType( self.logger, hq_credentials=self.hq_credentials ).get_by_name(address_type) except DatabaseDataError: self.logger.warning( "Address type %s not found. Defaulting", address_type ) address_type = home_type # getting the remaining customer information city = params.customer_dict.get("City") state = params.customer_dict.get("State") zip_code = params.customer_dict.get("Zip") country = params.customer_dict.get("CountryCode") province = params.customer_dict.get("Province") primary_cif = params.customer_dict.get("PrimaryCIF") if not primary_cif: primary_cif = params.user_dict.get("PrimaryCIF") phone_values = GetPhoneParams( params.customer_dict, params.user_dict, params.user_demographic_obj ) invalid_phones, customer_phones = await self.get_customer_phones( phone_values, cache ) if address_1: address_object = [ Address( address_1, address_2, city, state, zip_code, address_type=address_type, province=province, country=country, ) ] else: address_object = [] # creating and returning customer demographic object with retrieved information customer_demographic_info = DemographicInfo( "", [], customer_phones, address_object, customer_name, "", tax_id, primary_cif=primary_cif, middle_name="", ) except KeyError: customer_demographic_info = None invalid_phones = None self.logger.info( "Problem getting customer data. Defaulting to user data for customer" ) return invalid_phones, customer_demographic_info
[docs] async def get_customer_phones( self, params: GetPhoneParams, cache=None ) -> tuple[list[str], list[Phone]]: """Retrieving phone information for the customer object. Note: This function first searches for customer phone information. If not found, any user phone information provided will be used to populate the customer object""" customer_phones = PhoneResults() if "Phones" in params.customer_dict: phone_list = self.to_phone_type(params.customer_dict["Phones"]) customer_phones = await params.user_demographic_obj.validate_phones( phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache ) elif "Phones" in params.user_dict: phone_list = self.to_phone_type(params.user_dict["Phones"]) customer_phones = await params.user_demographic_obj.validate_phones( phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache ) return customer_phones.invalid_phones, customer_phones.valid_phones
[docs] async def get_user_phones( self, params: GetPhoneParams, cache=None ) -> tuple[list[str], list[Phone]]: """Retrieving phone information for the user object. Note: This function first searches for user phone information. If not found, any customer phone information provided will be used to populate the user object""" user_phones = PhoneResults() if "Phones" in params.user_dict: phone_list = self.to_phone_type(params.user_dict["Phones"]) user_phones = await params.user_demographic_obj.validate_phones( phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache ) elif "Phones" in params.customer_dict: phone_list = self.to_phone_type(params.customer_dict["Phones"]) user_phones = await params.user_demographic_obj.validate_phones( phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache ) return user_phones.invalid_phones, user_phones.valid_phones
[docs] async def get_user_email(self, email_id: str) -> str: email = "" try: user_email = await Email(self.logger, self.hq_credentials).get_by_id( email_id ) self.logger.debug(f"user_email: {user_email}") email = user_email[0].EmailAddress.text except Exception as error: self.logger.debug(f"Could not find primary email address: {error}") return email
async def _link_accounts_via_cif(self, params: LinkAccountsViaCifParams) -> bool: linked_account_obj = LinkedAccount( self.logger, hq_credentials=self.hq_credentials ) linking_accounts = await linked_account_obj.add_by_primary_cif( customer_id=params.customer_id, user_id=params.user_id, account_access=7, primary_cif=params.primary_cif, primary_only=params.primary_only, ) if linking_accounts and "ERROR" in linking_accounts: return False return True
[docs] def to_phone_type(self, phones: list[dict]) -> list[Phone]: """Turning given list of phone information into a list of Phone objects""" phone_list = [] for phone_dict in phones: phone_list.append( Phone( area_code="", phone_number=phone_dict.get("Phone"), phone_type=phone_dict.get("PhoneType"), full_phone_number=phone_dict.get("Phone"), is_sac_sms_target=phone_dict.get("isSMSTarget", True), is_sac_target=phone_dict.get("isSACTarget", True), country=phone_dict.get("CountryCode"), ) ) return phone_list
[docs] def to_address_type(self, addresses: list[dict]) -> list[Address]: """Turning given list of address information into a list of Address objects""" address_list = [] for address_dict in addresses: address_list.append( Address( address_1=address_dict["Address1"], address_2=address_dict.get("Address2", ""), city=address_dict["City"], state=address_dict["State"], zipcode=address_dict["Zip"], address_type=address_dict["AddressType"], country=address_dict.get("CountryCode"), province=address_dict.get("Province"), ) ) return address_list