import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Union
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq import api_helpers
from q2_sdk.hq.db.address_type import AddressType
from q2_sdk.hq.db.disclaimer import Disclaimer
from q2_sdk.hq.db.email import Email
from q2_sdk.hq.db.linked_account import LinkedAccount
from q2_sdk.hq.db.sso_user_logon import SsoUserLogon
from q2_sdk.hq.db.user_demographic_data import PhoneResults, UserDemographicData
from q2_sdk.hq.db.user_logon import UserLogon
from q2_sdk.hq.exceptions import MissingStoredProcError
from q2_sdk.hq.hq_api.q2_api import AddCustomer, AddUser, AddUserLogon
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.models.demographic import (
Address,
DemographicInfo,
Phone,
Email as EmailDemoObj,
)
from q2_sdk.models.demographic import AddressType as AddressTypes
from .db_object import DbObject
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
class EnrollmentException(Exception):
"""Enrollment Exception handling class"""
def __init__(self, message, number, detail):
super().__init__()
self.message = message
self.number = number
self.details = detail
[docs]
class CustomerExistsError(EnrollmentException):
"""Customer already exists in Q2's database"""
[docs]
class UserExistsError(EnrollmentException):
"""User already exists in Q2's database"""
[docs]
class LoginNameExistsError(EnrollmentException):
"""Login Name already exists in Q2's database"""
[docs]
@dataclass
class CreateCustomerParams:
"""Parameters needed to create a customer
Note: grab_customer_data can be utilized to build the cust_info parameter"""
cust_info: DemographicInfo
group_id: int
is_company: bool
host_user: str
host_pwd: str
[docs]
@dataclass
class CreateCustomerResult:
"""Expected response object for create customer function"""
success: bool = False
reason: Optional[str] = None
customer_id: Optional[str] = None
[docs]
@dataclass
class CreateCustomerDataParams:
"""Parameters needed to create customer data"""
customer_id: int
name: str
value: str
[docs]
@dataclass
class CreateUserParams:
"""Parameters needed to create a user"""
user_info: DemographicInfo
customer_id: int
host_user: str
host_pwd: str
null_primary_cif: bool = False
[docs]
@dataclass
class CreateUserResult:
"""Expected response object for create user function"""
success: bool = False
reason: Optional[str] = None
user_id: Optional[str] = None
[docs]
@dataclass
class CreateUserLogonParams:
"""Parameters needed to create a logon"""
user_id: int
logon_name: str
user_logon_obj: UserLogon
password: Optional[str] = None
require_password_reset: bool = True
group_id_for_zone_check: Optional[int] = None
[docs]
@dataclass
class CreateUserLogonResponse:
"""Expected response object for create user logon function"""
logon_error: str = ""
password: str = ""
user_logon_id: Optional[int] = None
[docs]
@dataclass
class GrabCustomerDataParams:
"""Parameters needed to create a customer demographic info object"""
customer_dict: dict
user_dict: dict
user_demographic_obj: Optional[UserDemographicData] = None
[docs]
@dataclass
class GetPhoneParams:
"""Parameters needed to retrieve and validate phone information from a customer and user dictionary"""
customer_dict: dict
user_dict: dict
user_demographic_obj: Optional[UserDemographicData] = None
[docs]
@dataclass
class CheckForExistingParams:
"""Parameters needed to check if customer, user, or logon exists"""
login_name: str
primary_cif: str
ssn: str = ""
tax_id: str = ""
skip_customer_tax_check: bool = False
skip_customer_cif_check: bool = False
skip_user_check: bool = False
skip_existing_customer_check: bool = False
group_id: Optional[int] = None
def __post_init__(self):
if self.skip_existing_customer_check:
self.skip_customer_tax_check = True
self.skip_customer_cif_check = True
[docs]
@dataclass
class UserDemoObjectValues:
"""Parameters needed to create user demographic object"""
additional_user_info: dict
emails: list
phone_numbers: list[Phone]
addresses: list[Address]
ssn: int
primary_cif: int
dob: Optional[datetime] = None
email_is_sac_target: Optional[bool] = None
email_objects: Optional[list[EmailDemoObj]] = None
[docs]
@dataclass
class LinkAccountsViaCifParams:
"""Parameters needed to link accounts via primary cif"""
primary_only: bool
primary_cif: str
customer_id: int
user_id: int
[docs]
@dataclass
class SsoIdentifierResponse:
"""Expected response object for sso enrollment function"""
duplicate_exists: bool = False
sso_result_error: bool = False
[docs]
class EnrolledEntity(DbObject):
"""Allows you to create a customer, user and logon in the Q2 Platform.
Additional methods have been provided to assist you with the enrollment process"""
NAME = "EnrolledEntity"
[docs]
async def create_customer(
self, params: CreateCustomerParams
) -> CreateCustomerResult:
"""Creates a Customer in Q2 db and returns the id"""
# attempt to create a customer. If unsuccessful, provide a reason
error_message = "Customer creation failed"
cust_attribute_error = CreateCustomerResult()
try:
create_cust = await self._create_customer(params)
if not create_cust.success:
self.logger.debug("%s: %s", error_message, str(create_cust.reason))
return create_cust
except AttributeError:
self.logger.debug(error_message)
cust_attribute_error.reason = error_message
return cust_attribute_error
# if the customer was successfully created, provide the customer ID
self.logger.debug("Customer ID: %s", str(create_cust.customer_id))
create_cust.success = True
return create_cust
async def _create_customer(
self, params: CreateCustomerParams
) -> CreateCustomerResult:
# initialize response dataclass
results = CreateCustomerResult()
# attempting to create a new customer
self.logger.info("Creating Customer")
xml_payload = api_helpers.build_add_customer_xml(
params.cust_info,
params.group_id,
params.is_company,
host_user=params.host_user,
host_pwd=params.host_pwd,
allow_other_address_types=True,
)
self.logger.info("customer_demo_info: %s", params.cust_info)
params_obj = AddCustomer.ParamsObj(
self.logger, xml_payload, hq_credentials=self.hq_credentials
)
response = await AddCustomer.execute(params_obj)
# if customer creation was unsuccessful provide a reason otherwise if successful, store the Customer ID
if response.error_message:
if "It has been marked as deleted." in response.error_message:
results.reason = "Provided Group ID has been marked as deleted"
elif "FK_Q2_Customer_Q2_Group" in response.error_message:
results.reason = "Provided Group ID may not exist"
else:
self.logger.exception(response.error_message)
else:
results.customer_id = (
response.result_node.Data.AddCustomerResults.row.CustomerID.pyval
)
results.success = True
self.logger.info("Customer Id: %s", results.customer_id)
return results
[docs]
async def create_customer_data(self, params: CreateCustomerDataParams):
"""Sets CustomerData in our DB to the key|value pairs passed in in the format\
[{"Name":"DataName", "Value": "DataValue"}]"""
self.logger.info("create_customer_data")
sql_params = []
Param(params.name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params)
Param(str(params.customer_id), D_TYPES.Int, "customerID").add_to_param_list(
sql_params
)
Param(params.value, D_TYPES.VarChar, "data").add_to_param_list(sql_params)
params_obj = ExecuteStoredProcedure.ParamsObj(
logger=self.logger,
stored_proc_short_name="Q2_AddUpdateCustomerData",
sql_parameters=ExecuteStoredProcedure.SqlParameters(sql_params),
hq_credentials=self.hq_credentials,
)
return await ExecuteStoredProcedure.execute(params_obj)
[docs]
async def create_user(self, params: CreateUserParams) -> CreateUserResult:
"""Creates a User in Q2 db and returns the id"""
# attempt to create the user
results = CreateUserResult()
self.logger.info("Creating User")
xml_payload = api_helpers.build_add_user_xml(
params.user_info,
params.customer_id,
host_user=params.host_user,
host_pwd=params.host_pwd,
allow_other_address_types=True,
null_primary_cif=params.null_primary_cif,
)
params_obj = AddUser.ParamsObj(
self.logger, xml_payload, hq_credentials=self.hq_credentials
)
response = await AddUser.execute(params_obj)
# provide reason if user creation was unsuccessful
if response.error_message and "Q2_PhoneNumber" in response.error_message:
results.reason = "Provided phone number is invalid"
return results
if not response.success:
results.reason = response.error_message
return results
# otherwise, provide user id
user_id = response.result_node.Data.AddUserResults.row.UserID.pyval
results.user_id = user_id
self.logger.info("User Id: %s", user_id)
results.success = True
return results
[docs]
async def create_user_data(self, userdata: Union[dict, list[dict]], user_id: int):
"""Sets UserData in our DB to the key|value pairs passed in in the format {"key":"value"}"""
hq_calls = []
if isinstance(userdata, list):
temp = {}
for each in userdata:
for key, value in each.items():
temp[key] = value
userdata = temp
for key, value in userdata.items():
hq_calls.append(
self._create_user_data(user_id=user_id, name=key, value=value)
)
all_user_data = await asyncio.gather(*hq_calls)
self.logger.debug("all_user_data: %s", all_user_data)
async def _create_user_data(self, user_id: int, name: str, value: str):
self.logger.info("create_user_data")
# The first use case of this requires we insert userdata with a UserNumber inside of the value. Let's check the value
# and do a replacement if we need to.
if isinstance(value, str):
value = value.replace("{UserID}", str(user_id))
sql_params = []
Param(name, D_TYPES.VarChar, "dataElement").add_to_param_list(sql_params)
Param(str(user_id), D_TYPES.Int, "userID").add_to_param_list(sql_params)
Param(value, D_TYPES.VarChar, "data").add_to_param_list(sql_params)
params_obj = ExecuteStoredProcedure.ParamsObj(
logger=self.logger,
stored_proc_short_name="Q2_AddUpdateUserData",
sql_parameters=ExecuteStoredProcedure.SqlParameters(sql_params),
hq_credentials=self.hq_credentials,
)
await ExecuteStoredProcedure.execute(params_obj)
[docs]
async def create_user_logon(
self, params: CreateUserLogonParams
) -> CreateUserLogonResponse:
"""Creates a UserLogon in Q2 db and returns the password"""
self.logger.info("Creating UserLogon")
# if a password was not provided, generate it
if not params.password:
params.password = await params.user_logon_obj.generate_password(
group_id=params.group_id_for_zone_check
)
# creating the user logon and raising an error if unsuccessful
xml_payload = api_helpers.build_add_user_logon_xml(
params.user_id, params.logon_name, params.password
)
params_obj = AddUserLogon.ParamsObj(
self.logger, xml_payload, hq_credentials=self.hq_credentials
)
response = await AddUserLogon.execute(params_obj)
logon_error = False
user_logon_id = None
try:
user_logon_id = (
response.result_node.Data.AddUserLogonResults.row.UserLogonID.pyval
)
self.logger.info("UserLogon Id: %s", user_logon_id)
if not params.require_password_reset:
await params.user_logon_obj.set_password_status_as_accepted(
params.logon_name
)
except Exception:
self.logger.info("UserLogon error: %s", response.error_message)
logon_error = response.error_message
return CreateUserLogonResponse(logon_error, params.password, user_logon_id)
[docs]
async def enroll_sso_identifier(
self, user_logon_id: int, sso_identifier: str
) -> SsoIdentifierResponse:
"""Enrolling sso information given an sso identifier and logon id"""
# creating a SSO User Logon Object
sso_obj = SsoUserLogon(self.logger, hq_credentials=self.hq_credentials)
duplicate_error = ""
sso_result_error = ""
try:
# If a sso user logon already exists, raise an error
duplicate = await sso_obj.get(sso_identifier)
if duplicate:
self.logger.error("Duplicate SSO Identifier found")
duplicate_error = "SSOIdentifier provided already assigned to a user"
else:
# Otherwise, create an sso user logon
sso_result = await sso_obj.create(
user_logon_id=user_logon_id, sso_identifier=sso_identifier
)
self.logger.debug("ssoResult: %s", sso_result)
except MissingStoredProcError:
self.logger.error("ssoResult ERROR")
sso_result_error = "SSOIdentifier passed in, but current platform version doesn't support it"
return SsoIdentifierResponse(duplicate_error, sso_result_error)
[docs]
async def check_disclaimers(
self, disclaimers_info: list[str], user_id: int
) -> bool:
"""Marking disclaimer as accepted. Prevents the user from having to accept them on first login"""
# creating a disclaimer object
disclaimer_obj = Disclaimer(
self.logger, hq_credentials=self.hq_credentials, ret_table_obj=True
)
try:
# attempt to mark provided disclaimer information as accepted, raise an error in unsuccessful
await disclaimer_obj.accept_for_user_id(user_id, disclaimers_info)
except DatabaseDataError:
self.logger.error("***** Error accepting disclaimers *****")
return False
return True
[docs]
async def check_db_for_existing(self, params: CheckForExistingParams):
"""
If Customer, User, or UserLogon already exists, raise an exception.
Otherwise, just return None
"""
# building a parameter object to prepare for API call
self.logger.info("hq_credentials: %s", self.hq_credentials)
sql_params = self._get_check_for_existing_params(params)
params_obj = ExecuteStoredProcedure.ParamsObj(
self.logger,
"sdk_API_check_duplicates",
sql_params,
hq_credentials=self.hq_credentials,
)
# executing the call and storing information in result
result = await ExecuteStoredProcedure.execute(params_obj)
self.logger.info("result: %s", result)
debug_data = result.result_node.Data.findtext(".//debug_data")
# verifying if user, customer, or user logon already exists and raising
# an error if True
if not params.skip_customer_cif_check and result.result_node.Data.find(
".//customer_exists_cif"
):
self.logger.warning("Customer cif already exists")
return CustomerExistsError("Customer already exists.", 1, debug_data)
if not params.skip_customer_tax_check and result.result_node.Data.find(
".//customer_exists_tax"
):
self.logger.warning("Customer tax id already exists")
return CustomerExistsError("Customer already exists.", 1, debug_data)
if not params.skip_user_check and result.result_node.Data.find(
".//user_exists"
):
self.logger.warning("User already exists")
return UserExistsError("User already exists.", 2, debug_data)
if result.result_node.Data.find(".//login_name_exists"):
self.logger.warning("Login Name already exists")
return LoginNameExistsError("Logon name already exists.", 3, debug_data)
return
@staticmethod
def _get_check_for_existing_params(params: CheckForExistingParams):
parameters = []
sql_params = [
Param(params.login_name, D_TYPES.Text, "login_name"),
Param(params.primary_cif, D_TYPES.Text, "primary_cif"),
Param(params.tax_id, D_TYPES.Text, "tax_id"),
Param(params.ssn, D_TYPES.Text, "ssn"),
]
if params.group_id:
sql_params.append(Param(params.group_id, D_TYPES.Int, "group_id"))
for item in sql_params:
item.add_to_param_list(parameters)
return ExecuteStoredProcedure.SqlParameters(parameters)
[docs]
def build_user_demo_object(
self, user_values: UserDemoObjectValues
) -> DemographicInfo:
"""Creating Demographic Info object for User. Needed to created user"""
# creating a dictionary containing additional information about the user
optional_info = {
"DOB": user_values.dob,
"MothersMaiden": None,
"MiddleName": "",
"Title": None,
"DriversLicense": None,
"UserInfo": None,
"UserRoleID": None,
}
# Setting optional_info dictionary keys to provided values
for key in optional_info:
if key in user_values.additional_user_info:
optional_info[key] = user_values.additional_user_info[key]
emails = []
if user_values.emails:
emails = [x.strip() for x in user_values.emails]
# creating and populating demographic info object based on retrieved information
demographic_info = DemographicInfo(
optional_info["DOB"],
emails,
user_values.phone_numbers,
user_values.addresses,
user_values.additional_user_info["FirstName"],
user_values.additional_user_info["LastName"],
user_values.ssn,
optional_info["MothersMaiden"],
optional_info["MiddleName"],
optional_info["Title"],
optional_info["DriversLicense"],
optional_info["UserInfo"],
user_values.primary_cif,
user_role_id=optional_info["UserRoleID"],
email_is_sac_target=user_values.email_is_sac_target,
)
demographic_info.email_objects = user_values.email_objects
return demographic_info
[docs]
async def grab_customer_data(
self, params: GrabCustomerDataParams, cache=None
) -> tuple[list, DemographicInfo]:
"""Creating Demographic Info object for Customer. Required to created Customer
Note: invalid phone information also gets returned"""
# retrieving customer information from dictionary
home_type = AddressTypes.HOME
try:
customer_name = params.customer_dict["Name"]
tax_id = params.customer_dict.get("TaxID", "")
address_1 = params.customer_dict.get("Address1")
address_2 = params.customer_dict.get("Address2")
address_type = params.customer_dict.get("AddressType", home_type)
# setting address type default to "Home" if not provided
try:
await AddressType(
self.logger, hq_credentials=self.hq_credentials
).get_by_name(address_type)
except DatabaseDataError:
self.logger.warning(
"Address type %s not found. Defaulting", address_type
)
address_type = home_type
# getting the remaining customer information
city = params.customer_dict.get("City")
state = params.customer_dict.get("State")
zip_code = params.customer_dict.get("Zip")
country = params.customer_dict.get("CountryCode")
province = params.customer_dict.get("Province")
primary_cif = params.customer_dict.get("PrimaryCIF")
if not primary_cif:
primary_cif = params.user_dict.get("PrimaryCIF")
phone_values = GetPhoneParams(
params.customer_dict, params.user_dict, params.user_demographic_obj
)
invalid_phones, customer_phones = await self.get_customer_phones(
phone_values, cache
)
if address_1:
address_object = [
Address(
address_1,
address_2,
city,
state,
zip_code,
address_type=address_type,
province=province,
country=country,
)
]
else:
address_object = []
# creating and returning customer demographic object with retrieved information
customer_demographic_info = DemographicInfo(
"",
[],
customer_phones,
address_object,
customer_name,
"",
tax_id,
primary_cif=primary_cif,
middle_name="",
)
except KeyError:
customer_demographic_info = None
invalid_phones = None
self.logger.info(
"Problem getting customer data. Defaulting to user data for customer"
)
return invalid_phones, customer_demographic_info
[docs]
async def get_customer_phones(
self, params: GetPhoneParams, cache=None
) -> tuple[list[str], list[Phone]]:
"""Retrieving phone information for the customer object.
Note: This function first searches for customer phone information.
If not found, any user phone information provided will be used to populate the customer object"""
customer_phones = PhoneResults()
if "Phones" in params.customer_dict:
phone_list = self.to_phone_type(params.customer_dict["Phones"])
customer_phones = await params.user_demographic_obj.validate_phones(
phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache
)
elif "Phones" in params.user_dict:
phone_list = self.to_phone_type(params.user_dict["Phones"])
customer_phones = await params.user_demographic_obj.validate_phones(
phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache
)
return customer_phones.invalid_phones, customer_phones.valid_phones
[docs]
async def get_user_phones(
self, params: GetPhoneParams, cache=None
) -> tuple[list[str], list[Phone]]:
"""Retrieving phone information for the user object.
Note: This function first searches for user phone information.
If not found, any customer phone information provided will be used to populate the user object"""
user_phones = PhoneResults()
if "Phones" in params.user_dict:
phone_list = self.to_phone_type(params.user_dict["Phones"])
user_phones = await params.user_demographic_obj.validate_phones(
phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache
)
elif "Phones" in params.customer_dict:
phone_list = self.to_phone_type(params.customer_dict["Phones"])
user_phones = await params.user_demographic_obj.validate_phones(
phone_list, self.logger, hq_credentials=self.hq_credentials, cache=cache
)
return user_phones.invalid_phones, user_phones.valid_phones
[docs]
async def get_user_email(self, email_id: str) -> str:
email = ""
try:
user_email = await Email(self.logger, self.hq_credentials).get_by_id(
email_id
)
self.logger.debug(f"user_email: {user_email}")
email = user_email[0].EmailAddress.text
except Exception as error:
self.logger.debug(f"Could not find primary email address: {error}")
return email
async def _link_accounts_via_cif(self, params: LinkAccountsViaCifParams) -> bool:
linked_account_obj = LinkedAccount(
self.logger, hq_credentials=self.hq_credentials
)
linking_accounts = await linked_account_obj.add_by_primary_cif(
customer_id=params.customer_id,
user_id=params.user_id,
account_access=7,
primary_cif=params.primary_cif,
primary_only=params.primary_only,
)
if linking_accounts and "ERROR" in linking_accounts:
return False
return True
[docs]
async def link_accounts_via_cif(self, params: LinkAccountsViaCifParams) -> bool:
"""Linking accounts associated to the created customer/user using given primary cif value"""
params.primary_only = "0" if not params.primary_only else "1"
if params.primary_cif:
return await self._link_accounts_via_cif(params)
return False
[docs]
def to_phone_type(self, phones: list[dict]) -> list[Phone]:
"""Turning given list of phone information into a list of Phone objects"""
phone_list = []
for phone_dict in phones:
phone_list.append(
Phone(
area_code="",
phone_number=phone_dict.get("Phone"),
phone_type=phone_dict.get("PhoneType"),
full_phone_number=phone_dict.get("Phone"),
is_sac_sms_target=phone_dict.get("isSMSTarget", True),
is_sac_target=phone_dict.get("isSACTarget", True),
country=phone_dict.get("CountryCode"),
)
)
return phone_list
[docs]
def to_address_type(self, addresses: list[dict]) -> list[Address]:
"""Turning given list of address information into a list of Address objects"""
address_list = []
for address_dict in addresses:
address_list.append(
Address(
address_1=address_dict["Address1"],
address_2=address_dict.get("Address2", ""),
city=address_dict["City"],
state=address_dict["State"],
zipcode=address_dict["Zip"],
address_type=address_dict["AddressType"],
country=address_dict.get("CountryCode"),
province=address_dict.get("Province"),
)
)
return address_list