Source code for q2_sdk.hq.db.user_demographic_data

import json
from socket import gaierror
from lxml import objectify
from pymemcache.exceptions import MemcacheError
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq import api_helpers
from q2_sdk.hq.db.country import Country
from q2_sdk.hq.db.address_type import AddressType
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.models.demographic import Address, DemographicInfo, Phone

from .db_object import DbObject

D_TYPES = ExecuteStoredProcedure.DataType


[docs] class PhoneResults: def __init__(self): self.valid_phones = [] self.invalid_phones = []
[docs] class UserDemographicData(DbObject): """ Communicates with the database using an instance of q2_sdk.models.demographic.DemographicInfo. If calling from an extension accessed from Q2 Online, current DemographicInfo instance exists in self.online_user.as_demographic_info() """
[docs] async def update( self, login_name: str, demographic_info: DemographicInfo, validate_phone_numbers: bool = False, validate_addresses: bool = False, ) -> HqResponse: """ This stored procedure will only update demographic information once every 5 seconds :param validate_phone_numbers: boolean that validates a list of phone objects :param validate_addresses: boolean that validates a list of address objects """ if validate_phone_numbers: for phone in demographic_info.phones: extension_str = "" if phone.extension: extension_str = f" x{phone.extension}" phone.full_phone_number = phone.phone_number + extension_str validated_phones = await self.validate_phones( demographic_info.phones, self.logger, self.hq_credentials ) for values in validated_phones.valid_phones: values_dict = vars(values) values_dict["phone_number"] = ( values_dict["area_code"] + values_dict["phone_number"] ) demographic_info.phones = validated_phones.valid_phones if validate_addresses: demographic_info.addresses = await self.validate_addresses( demographic_info.addresses, self.logger, self.hq_credentials ) demographic_update_xml = ( api_helpers.build_update_demographics_by_logon_name_xml( login_name, demographic_info ) ) params = [] Param(demographic_update_xml, D_TYPES.Xml, "demographicData").add_to_param_list( params ) hq_response = await self._call_execute_stored_procedure( "apispUpdateDemographicsByLogonName_RT", ExecuteStoredProcedure.SqlParameters(params), ) inner_table_name = "Column1" if not hq_response.result_node.find(".//Table/Column1"): inner_table_name = "ResultInfo" inner_result = objectify.fromstring( hq_response.result_node.findtext(".//Table/{}".format(inner_table_name)) ) error = hq_response.get_hq_return_error(inner_result) if error: self.logger.error(error) try: note = inner_result.find(".//msg").get("note") self.logger.warning(note) except AttributeError: pass return hq_response
[docs] async def validate_phones( self, phone_list: list[Phone], logger, hq_credentials, cache=None ): found_phones = PhoneResults() for params in phone_list: international_phone = self._is_international(params.country) if international_phone: phone, is_valid = await self._validate_international_phones( params, logger, hq_credentials, cache ) else: phone, is_valid = self._validate_local_phone(params) if not is_valid: found_phones.invalid_phones.append(params.full_phone_number) continue mark_voice_target = params.is_sac_target if not mark_voice_target: phone.is_sac_target = False mark_sms_target = params.is_sac_sms_target if not mark_sms_target: phone.is_sac_sms_target = False found_phones.valid_phones.append(phone) return found_phones
def _is_international(self, country_code): international_phone = False if country_code and len(country_code) == 3: international_phone = country_code.upper() != "USA" return international_phone async def _get_country_list(self, logger, hq_credentials, cache=None): if cache: try: country_list = await cache.get_async("country_list") except (MemcacheError, gaierror): logger.debug("Could not retrieve country_list from cache") country_list = None else: country_list = None if not country_list: country_obj = Country(logger, hq_credentials=hq_credentials) countries = await country_obj.get() country_list_dict = { x.IsoCodeA3.text.upper(): x.CityAreaCodeEditMask.text for x in countries if hasattr(x, "CityAreaCodeEditMask") } country_list = json.dumps(country_list_dict) if cache: try: await cache.set_async("country_list", country_list, 60 * 60 * 24) except (MemcacheError, gaierror): logger.debug("Problem setting cache. Skipping") return country_list def _validate_local_phone(self, params): phone = Phone.build_from_str(params.full_phone_number, params.type) is_valid = False if len(phone.phone_number) == 7: is_valid = True return phone, is_valid async def _validate_international_phones( self, params, logger, hq_credentials, cache=None ): country_list = await self._get_country_list(logger, hq_credentials, cache) country_dict = json.loads(country_list) phone_number = params.full_phone_number area_code_length = 0 mask = country_dict.get(params.country, "") is_valid = False if mask is not None and mask != "": area_code_length = len([x for x in mask if x.isalnum()]) if len(phone_number) >= area_code_length: is_valid = True area_code = phone_number[0:area_code_length] local = phone_number[area_code_length:] phone = Phone( area_code, local, phone_type=params.type, country=params.country, is_sac_sms_target=params.is_sac_sms_target, is_sac_target=params.is_sac_target, ) return phone, is_valid
[docs] async def validate_addresses(self, address_list: list[Address], logger, hq_creds): found_addresses = [] address_types = await AddressType(logger, hq_credentials=hq_creds).get() address_type_options = [x.AddressTypeName for x in address_types] for address in address_list: if not address.address_1: continue if address.address_type not in address_type_options: logger.warning( f"Address type {address.address_type} not found in DB. Defaulting to 'Home' type" ) address.address_type = "Home" this_address = Address( address.address_1, address.address_2, address.city, address.state, address.zipcode, address.address_type, address.province, address.country, ) found_addresses.append(this_address) return found_addresses
[docs] async def get(self, login_name: str) -> DemographicInfo: """ Given an online user login name, this will return a DemographicInfo object """ params = [] Param(login_name, D_TYPES.VarChar, "login_name").add_to_param_list(params) response = await self.call_hq( "sdk_GetUserDemographicData", ExecuteStoredProcedure.SqlParameters(params) ) demo_info_response = DemographicInfo( response[0].findtext("DOB", ""), [x.findtext("EmailAddress", "") for x in response], [ Phone( x.findtext("CityOrAreaCode", ""), x.findtext("LocalNumber", ""), x.findtext("PhoneType", ""), country=x.findtext("PhoneCountry", ""), extension=x.findtext("Extension", ""), ) for x in response if x.findtext("LocalNumber") ], [ Address( x.findtext("StreetAddress1", ""), x.findtext("StreetAddress2", ""), x.findtext("City", ""), x.findtext("Province", "") if hasattr(x, "Province") else x.findtext("State", ""), x.findtext("PostalCode", ""), x.findtext("AddressType", ""), country=x.findtext("AddressCountry", ""), address_id=x.findtext("AddressID"), ) for x in response if x.findtext("StreetAddress1") ], response[0].findtext("FirstName", ""), response[0].findtext("LastName", ""), "", middle_name=response[0].findtext("MiddleName", ""), ) return demo_info_response