import json
from socket import gaierror
from lxml import objectify
from pymemcache.exceptions import MemcacheError
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq import api_helpers
from q2_sdk.hq.db.country import Country
from q2_sdk.hq.db.address_type import AddressType
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.models.demographic import Address, DemographicInfo, Phone
from .db_object import DbObject
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
class PhoneResults:
def __init__(self):
self.valid_phones = []
self.invalid_phones = []
[docs]
class UserDemographicData(DbObject):
"""
Communicates with the database using an instance of q2_sdk.models.demographic.DemographicInfo.
If calling from an extension accessed from Q2 Online, current DemographicInfo instance exists in
self.online_user.as_demographic_info()
"""
[docs]
async def update(
self,
login_name: str,
demographic_info: DemographicInfo,
validate_phone_numbers: bool = False,
validate_addresses: bool = False,
) -> HqResponse:
"""
This stored procedure will only update demographic information once
every 5 seconds
:param validate_phone_numbers: boolean that validates a list of phone objects
:param validate_addresses: boolean that validates a list of address objects
"""
if validate_phone_numbers:
for phone in demographic_info.phones:
extension_str = ""
if phone.extension:
extension_str = f" x{phone.extension}"
phone.full_phone_number = phone.phone_number + extension_str
validated_phones = await self.validate_phones(
demographic_info.phones, self.logger, self.hq_credentials
)
for values in validated_phones.valid_phones:
values_dict = vars(values)
values_dict["phone_number"] = (
values_dict["area_code"] + values_dict["phone_number"]
)
demographic_info.phones = validated_phones.valid_phones
if validate_addresses:
demographic_info.addresses = await self.validate_addresses(
demographic_info.addresses, self.logger, self.hq_credentials
)
demographic_update_xml = (
api_helpers.build_update_demographics_by_logon_name_xml(
login_name, demographic_info
)
)
params = []
Param(demographic_update_xml, D_TYPES.Xml, "demographicData").add_to_param_list(
params
)
hq_response = await self._call_execute_stored_procedure(
"apispUpdateDemographicsByLogonName_RT",
ExecuteStoredProcedure.SqlParameters(params),
)
inner_table_name = "Column1"
if not hq_response.result_node.find(".//Table/Column1"):
inner_table_name = "ResultInfo"
inner_result = objectify.fromstring(
hq_response.result_node.findtext(".//Table/{}".format(inner_table_name))
)
error = hq_response.get_hq_return_error(inner_result)
if error:
self.logger.error(error)
try:
note = inner_result.find(".//msg").get("note")
self.logger.warning(note)
except AttributeError:
pass
return hq_response
[docs]
async def validate_phones(
self, phone_list: list[Phone], logger, hq_credentials, cache=None
):
found_phones = PhoneResults()
for params in phone_list:
international_phone = self._is_international(params.country)
if international_phone:
phone, is_valid = await self._validate_international_phones(
params, logger, hq_credentials, cache
)
else:
phone, is_valid = self._validate_local_phone(params)
if not is_valid:
found_phones.invalid_phones.append(params.full_phone_number)
continue
mark_voice_target = params.is_sac_target
if not mark_voice_target:
phone.is_sac_target = False
mark_sms_target = params.is_sac_sms_target
if not mark_sms_target:
phone.is_sac_sms_target = False
found_phones.valid_phones.append(phone)
return found_phones
def _is_international(self, country_code):
international_phone = False
if country_code and len(country_code) == 3:
international_phone = country_code.upper() != "USA"
return international_phone
async def _get_country_list(self, logger, hq_credentials, cache=None):
if cache:
try:
country_list = await cache.get_async("country_list")
except (MemcacheError, gaierror):
logger.debug("Could not retrieve country_list from cache")
country_list = None
else:
country_list = None
if not country_list:
country_obj = Country(logger, hq_credentials=hq_credentials)
countries = await country_obj.get()
country_list_dict = {
x.IsoCodeA3.text.upper(): x.CityAreaCodeEditMask.text
for x in countries
if hasattr(x, "CityAreaCodeEditMask")
}
country_list = json.dumps(country_list_dict)
if cache:
try:
await cache.set_async("country_list", country_list, 60 * 60 * 24)
except (MemcacheError, gaierror):
logger.debug("Problem setting cache. Skipping")
return country_list
def _validate_local_phone(self, params):
phone = Phone.build_from_str(params.full_phone_number, params.type)
is_valid = False
if len(phone.phone_number) == 7:
is_valid = True
return phone, is_valid
async def _validate_international_phones(
self, params, logger, hq_credentials, cache=None
):
country_list = await self._get_country_list(logger, hq_credentials, cache)
country_dict = json.loads(country_list)
phone_number = params.full_phone_number
area_code_length = 0
mask = country_dict.get(params.country, "")
is_valid = False
if mask is not None and mask != "":
area_code_length = len([x for x in mask if x.isalnum()])
if len(phone_number) >= area_code_length:
is_valid = True
area_code = phone_number[0:area_code_length]
local = phone_number[area_code_length:]
phone = Phone(
area_code,
local,
phone_type=params.type,
country=params.country,
is_sac_sms_target=params.is_sac_sms_target,
is_sac_target=params.is_sac_target,
)
return phone, is_valid
[docs]
async def validate_addresses(self, address_list: list[Address], logger, hq_creds):
found_addresses = []
address_types = await AddressType(logger, hq_credentials=hq_creds).get()
address_type_options = [x.AddressTypeName for x in address_types]
for address in address_list:
if not address.address_1:
continue
if address.address_type not in address_type_options:
logger.warning(
f"Address type {address.address_type} not found in DB. Defaulting to 'Home' type"
)
address.address_type = "Home"
this_address = Address(
address.address_1,
address.address_2,
address.city,
address.state,
address.zipcode,
address.address_type,
address.province,
address.country,
)
found_addresses.append(this_address)
return found_addresses
[docs]
async def get(self, login_name: str) -> DemographicInfo:
"""
Given an online user login name, this will return a DemographicInfo object
"""
params = []
Param(login_name, D_TYPES.VarChar, "login_name").add_to_param_list(params)
response = await self.call_hq(
"sdk_GetUserDemographicData", ExecuteStoredProcedure.SqlParameters(params)
)
demo_info_response = DemographicInfo(
response[0].findtext("DOB", ""),
[x.findtext("EmailAddress", "") for x in response],
[
Phone(
x.findtext("CityOrAreaCode", ""),
x.findtext("LocalNumber", ""),
x.findtext("PhoneType", ""),
country=x.findtext("PhoneCountry", ""),
extension=x.findtext("Extension", ""),
)
for x in response
if x.findtext("LocalNumber")
],
[
Address(
x.findtext("StreetAddress1", ""),
x.findtext("StreetAddress2", ""),
x.findtext("City", ""),
x.findtext("Province", "")
if hasattr(x, "Province")
else x.findtext("State", ""),
x.findtext("PostalCode", ""),
x.findtext("AddressType", ""),
country=x.findtext("AddressCountry", ""),
address_id=x.findtext("AddressID"),
)
for x in response
if x.findtext("StreetAddress1")
],
response[0].findtext("FirstName", ""),
response[0].findtext("LastName", ""),
"",
middle_name=response[0].findtext("MiddleName", ""),
)
return demo_info_response