Source code for q2_sdk.hq.db.user

from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import date, datetime
from functools import partial
from typing import List, Union

from dateutil import parser as datetime_parser
from lxml.etree import tostring
from lxml.objectify import BoolElement, E, IntElement, StringElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.db.sso_user_logon import SsoUserLogon
from q2_sdk.hq.db.user_logon import UserLogon
from q2_sdk.hq.hq_api.q2_api import DeleteUser
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.models.demographic import Phone
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] @dataclass class UpdateUserValues: user_id: int ssn: str = None primary_cif: str = None unset_primary_cif: bool = False
[docs] @dataclass class DeleteUserParams: success: bool error_message: None response: HqResponse
[docs] @dataclass class GetUserValues: social_security: str primary_cif: str user_id: int login_name: str
[docs] @dataclass class SearchUserValues: first_name: str = None last_name: str = None email_address: str = None phone_number: str = None local_number: str = None city_or_areacode: str = None sso_identifier: str = None
[docs] def valid_inputs(self): required_params = [ self.first_name, self.last_name, self.email_address, self.phone_number, self.sso_identifier, ] if any(required_params): return True return False
[docs] def hydrate_stored_proc_params(self): params = [] if self.first_name: Param(self.first_name, D_TYPES.VarChar, "firstname").add_to_param_list( params ) if self.last_name: Param(self.last_name, D_TYPES.VarChar, "lastname").add_to_param_list(params) if self.local_number: Param(self.local_number, D_TYPES.VarChar, "localnumber").add_to_param_list( params ) if self.city_or_areacode: Param( self.city_or_areacode, D_TYPES.VarChar, "city_or_areacode" ).add_to_param_list(params) if self.email_address: Param( self.email_address, D_TYPES.VarChar, "emailaddress" ).add_to_param_list(params) return params
[docs] @dataclass class SearchUserResponse: success: bool result: list = None error_message: str = None
[docs] class UserRow(RepresentationRowBase): UserID: IntElement = "UserID" CustomerID: IntElement = "CustomerID" FirstName: StringElement = "FirstName" MiddleName: StringElement = "MiddleName" LastName: StringElement = "LastName" Salutation: StringElement = "Salutation" Suffix: StringElement = "Suffix" SSN: StringElement = "SSN" CreateDate: StringElement = "CreateDate" DeletedDate: StringElement = "DeletedDate" DefaultEmailID: IntElement = "DefaultEmailID" DefaultAddressID: IntElement = "DefaultAddressID" AutoGenerated: BoolElement = "AutoGenerated" HostUser: StringElement = "HostUser" HostPwd: StringElement = "HostPwd" UserInfo: StringElement = "UserInfo" ProfileUpdated: BoolElement = "ProfileUpdated" MobileAuthCode: StringElement = "MobileAuthCode" UseCustomerAccounts: BoolElement = "UseCustomerAccounts" PrimaryCIF: StringElement = "PrimaryCIF" LastUserProfileUpdate: StringElement = "LastUserProfileUpdate" UserRoleID: IntElement = "UserRoleID" PolicyID: IntElement = "PolicyID" UserStatusID: IntElement = "UserStatusID" StatusChangeDate: StringElement = "StatusChangeDate" DOB: StringElement = "DOB" IsOpenBanking: BoolElement = "IsOpenBanking"
[docs] class User(DbObject): """ Queries the Q2_User table for rows that match the given arguments. Gathers additional details on specific users """ NAME = "User" REPRESENTATION_ROW_CLASS = UserRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_users") subparser.set_defaults(parser="get_users") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "--extended", action="store_true", help="Extends the output of rows" ) subparser.add_argument( "--combine", action="store_true", help="Search by both ssn and cif" ) group = subparser.add_mutually_exclusive_group() group.add_argument("-u", "--user-id", help="Q2_User.UserID") group.add_argument("-l", "--login-name", help="Q2_UserLogin.LoginName") subparser.add_argument("-s", "--social-security", help="Q2_User.SSN") subparser.add_argument("-p", "--primary-cif", help="Q2_User.PrimaryCIF") subparser = parser.add_parser("search_users") subparser.set_defaults(parser="search_users") subparser.set_defaults(func=partial(self.search, serialize_for_cli=True)) subparser.add_argument("-f", "--first-name", help="Q2_User.FirstName") subparser.add_argument("-l", "--last-name", help="Q2_User.LastName") subparser.add_argument("-e", "--email-address", help="Q2_User.SSN") subparser.add_argument( "-p", "--phone-number", help="Q2_PhoneNumber.CityOrAreaCode and Q2_PhoneNumber.LocalNumber", ) subparser.add_argument("-s", "--sso-identifier") subparser.add_argument( "--extended", action="store_true", help="Extends the output of the rows" ) subparser = parser.add_parser("get_users_under_customer") subparser.set_defaults(parser="get_users_under_customer") subparser.set_defaults( func=partial(self.get_users_under_customer, serialize_for_cli=True) ) subparser.add_argument("-c", "--customer-id", help="Q2_User.CustomerID") subparser.add_argument( "--extended", action="store_true", help="Extends the output of the rows" ) subparser.add_argument( "--include-deleted", action="store_true", help="include deleted users" ) subparser = parser.add_parser("get_account_related_users") subparser.set_defaults(parser="get_account_related_users") subparser.set_defaults( func=partial(self.get_account_related_users, serialize_for_cli=True) ) subparser.add_argument( "host_account_id", help="Q2_AccountAddress.HostAccountID" ) subparser = parser.add_parser("set_profile_updated_flag") subparser.set_defaults(parser="set_profile_updated_flag") subparser.set_defaults(func=partial(self.set_profile_updated_flag)) subparser.add_argument("user_id", help="Q2_User.UserID") subparser = parser.add_parser("get_deleted_users") subparser.description = ( "Returns all users deleted since the beginning of the current day" ) subparser.set_defaults(parser="get_deleted_users") subparser.set_defaults(func=partial(self.get_deleted, serialize_for_cli=True)) subparser.add_argument( "-d", "--deleted-after", type=datetime_parser.parse, default=datetime.today(), help=""" minimum Q2_User.DeletedDate to search in iso format ([YYYY]-[MM]-[DD]T[hh]:[mm]:[ss]-[Z]). (Defaults to beginning of current day if not provided). """, ) subparser = parser.add_parser("remove_user") subparser.set_defaults(parser="remove_user") subparser.set_defaults(func=partial(self.delete), serialize_for_cli=True) subparser.add_argument("user_id", help="Q2_User.UserID") subparser = parser.add_parser("update_user") subparser.set_defaults(parser="update_user") subparser.set_defaults(func=partial(self.update)) subparser.add_argument("user_id", type=int, help="Q2_User.UserID") subparser.add_argument("-s", "--ssn", help="Q2_User.SSN") subparser.add_argument("-p", "--primary-cif", help="Q2_User.PrimaryCif") subparser = parser.add_parser("get_users_created_on") subparser.description = "Returns all users created on specified date" subparser.set_defaults(parser="get_users_created_on") subparser.set_defaults( func=partial(self.search_by_create_date, serialize_for_cli=True) ) subparser.add_argument( "-c", "--created-on", type=datetime_parser.parse, default=datetime.today(), help=""" Minimum Q2_User.CreatedBy to search. (Defaults to beginning of current day if not provided). """, ) subparser.add_argument( "--extended", action="store_true", help="Extends the output of rows" ) subparser = parser.add_parser("get_admin_users_by_customer_id") subparser.set_defaults(parser="get_admin_users_by_customer_id") subparser.set_defaults( func=partial(self.get_admin_users_by_customer_id, serialize_for_cli=True) ) subparser.add_argument("-c", "--customer-id", help="Q2_User.CustomerID")
[docs] async def get_deleted( self, deleted_after: datetime, serialize_for_cli=False, extended=False ): params = [] Param( deleted_after.isoformat(), D_TYPES.Date, "deleted_after" ).add_to_param_list(params) response = await self.call_hq( "sdk_GetDeletedUsers", ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: response = self.get_cli_response(extended, response) return response
[docs] async def get( self, user_id: int = None, login_name: str = None, social_security: str = None, primary_cif: str = None, extended=False, combine=False, serialize_for_cli=False, ) -> List[UserRow]: if combine: assert sum([True for x in (social_security, primary_cif) if x]) == 2, ( "If combine then both social security and primary cif must be provided" ) else: assert ( sum([ True for x in (user_id, login_name, social_security, primary_cif) if x ]) == 1 ), ( "Either user_id, login_name, social_security, or primary_cif must be defined" ) if social_security: social_security = "".join(x for x in social_security if x.isdigit()) sql_params = self._build_get_parameters( GetUserValues( social_security=social_security, primary_cif=primary_cif, user_id=user_id, login_name=login_name, ), combine=combine, ) response = await self.call_hq( "sdk_GetUsers", ExecuteStoredProcedure.SqlParameters(sql_params) ) if serialize_for_cli: response = self.get_cli_response(extended, response) return response
[docs] async def get_many(self, user_ids: list[int]) -> list[UserRow]: """ Retrieves multiple users matching the given user IDs :param: user_ids: list of Q2_User.UserIDs """ # <request><get u="1" /><get u="2" /></request> user_nodes = [E.get(u=str(x)) for x in user_ids] request_node = E.request(*user_nodes) request_string = tostring(request_node, encoding="utf-8") params = [] Param(request_string.decode(), D_TYPES.Xml, "request").add_to_param_list(params) return await self.call_hq( "sdk_GetMultipleUsers", ExecuteStoredProcedure.SqlParameters(params) )
def _build_get_parameters(self, get_info: GetUserValues, combine): parameters = [] possible_params = [] if combine: possible_params.extend([ Param(get_info.social_security, D_TYPES.VarChar, "social_security"), Param(get_info.primary_cif, D_TYPES.VarChar, "primary_cif"), ]) elif get_info.user_id: possible_params.append(Param(get_info.user_id, D_TYPES.Int, "user_id")) elif get_info.login_name: possible_params.append( Param(get_info.login_name, D_TYPES.VarChar, "login_name") ) elif get_info.social_security: possible_params.append( Param(get_info.social_security, D_TYPES.VarChar, "social_security") ) else: possible_params.append( Param(get_info.primary_cif, D_TYPES.VarChar, "primary_cif") ) for item in possible_params: item.add_to_param_list(parameters) return parameters
[docs] async def get_users_under_customer( self, customer_id, extended=False, serialize_for_cli=False, include_deleted=False, ): assert int(customer_id), "Please supply a valid customer id" parameters = [] Param(customer_id, D_TYPES.Int, "customer_id").add_to_param_list(parameters) if include_deleted: Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(parameters) response = await self.call_hq( "sdk_GetUsersUnderCustomer", ExecuteStoredProcedure.SqlParameters(parameters), ) if serialize_for_cli: response = self.get_cli_response(extended, response) return response
[docs] async def set_profile_updated_flag(self, user_id: int): assert isinstance(user_id, int), "Please supply a valid user id" params = [] Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(params) return await self.call_hq( "sdk_SetProfileUpdatedUser", ExecuteStoredProcedure.SqlParameters(params) )
[docs] def get_cli_response(self, extended: bool, response): columns = ["UserID", "CustomerID", "FirstName", "LastName", "SSN", "PrimaryCIF"] if extended: columns.extend([ "Salutation", "Suffix", "SSN", "CreateDate", "DeletedDate", "DefaultEmailID", "DefaultAddressID", "AutoGenerated", "HostUser", "HostPwd", "UserInfo", "ProfileUpdated", "UserRoleID", ]) return self.serialize_for_cli(response, columns)
[docs] async def update_dob(self, user_id: int, dob: datetime): assert isinstance(user_id, int), "Please supply a valid user id" assert isinstance(dob, datetime), "Please supply a DOB as a datetime object" dob_as_string = dob.strftime("%Y-%m-%d") params = [] Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(params) Param(dob_as_string, D_TYPES.Date, "dob").add_to_param_list(params) return await self.call_hq( "sdk_UpdateUserDOB", ExecuteStoredProcedure.SqlParameters(params) )
[docs] async def update( self, user_id: int, ssn=None, primary_cif=None, unset_primary_cif=False ): assert isinstance(user_id, int), "Please provide a valid user id" parameters = [ssn, primary_cif, unset_primary_cif] assert any(parameters), "No values to update" parameters.insert(0, user_id) update_values = UpdateUserValues(*parameters) sql_params = self._build_update_parameters(update_values) response = await self.call_hq( "sdk_UpdateUser", ExecuteStoredProcedure.SqlParameters(sql_params) ) return response
@staticmethod def _build_update_parameters(update_info: UpdateUserValues): parameters = [] possible_params = [ Param(update_info.user_id, D_TYPES.Int, "user_id"), Param(update_info.ssn, D_TYPES.VarChar, "ssn", True), Param(update_info.primary_cif, D_TYPES.VarChar, "primary_cif", True), ] if update_info.unset_primary_cif: possible_params.append( Param(update_info.unset_primary_cif, D_TYPES.Bit, "unset_primary_cif") ) for item in possible_params: item.add_to_param_list(parameters) return parameters
[docs] async def delete(self, user_id: int, serialize_for_cli=None): if isinstance(user_id, str): user_id = int(user_id) assert isinstance(user_id, int) params_obj = DeleteUser.ParamsObj( self.logger, user_id, hq_credentials=self.hq_credentials ) response = await DeleteUser.execute(params_obj) if not serialize_for_cli: return DeleteUserParams( success=response.success, error_message=response.error_message, response=response, )
[docs] async def search_by_create_date( self, created_on: Union[date, datetime], serialize_for_cli=False, extended=False ) -> List[UserRow]: created_on_start = datetime.combine(created_on, datetime.min.time()) created_on_stop = datetime.combine(created_on, datetime.max.time()) sql_params = [] Param( created_on_start.isoformat(), D_TYPES.DateTime, "created_on_start" ).add_to_param_list(sql_params) Param( created_on_stop.isoformat(), D_TYPES.DateTime, "created_on_stop" ).add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetUsersByCreateDate", ExecuteStoredProcedure.SqlParameters(sql_params) ) if serialize_for_cli: response = self.get_cli_response(extended, response) return response
[docs] async def search( self, first_name: str = "", last_name: str = "", email_address: str = "", phone_number: str = "", sso_identifier: str = "", serialize_for_cli=False, extended=False, ) -> SearchUserResponse: search_params = SearchUserValues( first_name=first_name, last_name=last_name, email_address=email_address, phone_number=phone_number, sso_identifier=sso_identifier, ) if not search_params.valid_inputs(): msg = "Invalid parameters. Expected any combination of first_name, last_name, email_address, phone_number, or sso_identifier]" return ( msg if serialize_for_cli else SearchUserResponse(success=False, error_message=msg) ) elif search_params.sso_identifier: results = await self.search_by_sso_identifier(search_params.sso_identifier) if not results: msg = "No users found with sso_identifier" return ( msg if serialize_for_cli else SearchUserResponse(success=False, error_message=msg) ) else: stored_proc = self._find_search_stored_proc(search_params) sql_params = search_params.hydrate_stored_proc_params() results = await self.call_hq( stored_proc, ExecuteStoredProcedure.SqlParameters(sql_params) ) if not results: msg = "No users found" return ( msg if serialize_for_cli else SearchUserResponse(success=False, error_message=msg) ) if serialize_for_cli: return self.get_cli_response(extended, results) return SearchUserResponse(success=True, result=results)
def _find_search_stored_proc(self, search_params: SearchUserValues): proc_name = "" if search_params.email_address: if search_params.phone_number: self._hydrate_domestic_phone_search(search_params) proc_name = "CaliperApi_SearchUsers" else: proc_name = "CaliperApi_SearchUsersByEmail" else: if search_params.phone_number: self._hydrate_domestic_phone_search(search_params) proc_name = "CaliperApi_SearchUsersByPhone" else: proc_name = "CaliperApi_SearchUsersByName" self.logger.debug(f"Calling stored procedure {proc_name}") return proc_name
[docs] async def search_by_sso_identifier(self, sso_identifier: str): sso_user_obj = SsoUserLogon(self.logger, hq_credentials=self.hq_credentials) response = await sso_user_obj.get(sso_identifier) if not response: return user_logon_id = response[0].UserLogonID.text user_obj = User(self.logger, hq_credentials=self.hq_credentials) login = await UserLogon( self.logger, hq_credentials=self.hq_credentials ).get_login_by_logon_id(user_logon_id) user_id = login[0].UserID.text return await user_obj.get(user_id=user_id)
def _hydrate_domestic_phone_search(self, search_params: SearchUserValues): built_phone = Phone.build_from_str(search_params.phone_number, None) if built_phone and len(built_phone.phone_number) == 7: search_params.city_or_areacode = built_phone.area_code search_params.local_number = built_phone.phone_number self.logger.debug( f"parsed phone number: {search_params.city_or_areacode}-{search_params.local_number}" )
[docs] async def get_admin_users_by_customer_id( self, customer_id: int, serialize_for_cli=False ): params = [] Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(params) response = await self.call_hq( "CaliperApi_GetAdminUserByCustomerID", ExecuteStoredProcedure.SqlParameters(params), ) if serialize_for_cli: columns = [ "Weight", "UserID", "CustomerID", "FirstName", "LastName", ] response = self.serialize_for_cli(response, columns) return response