from argparse import _SubParsersAction
from dataclasses import dataclass
from datetime import date, datetime
from functools import partial
from typing import List, Union
from dateutil import parser as datetime_parser
from lxml.etree import tostring
from lxml.objectify import BoolElement, E, IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.db.sso_user_logon import SsoUserLogon
from q2_sdk.hq.db.user_logon import UserLogon
from q2_sdk.hq.hq_api.q2_api import DeleteUser
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.models.demographic import Phone
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
@dataclass
class UpdateUserValues:
user_id: int
ssn: str = None
primary_cif: str = None
unset_primary_cif: bool = False
[docs]
@dataclass
class DeleteUserParams:
success: bool
error_message: None
response: HqResponse
[docs]
@dataclass
class GetUserValues:
social_security: str
primary_cif: str
user_id: int
login_name: str
[docs]
@dataclass
class SearchUserValues:
first_name: str = None
last_name: str = None
email_address: str = None
phone_number: str = None
local_number: str = None
city_or_areacode: str = None
sso_identifier: str = None
[docs]
def hydrate_stored_proc_params(self):
params = []
if self.first_name:
Param(self.first_name, D_TYPES.VarChar, "firstname").add_to_param_list(
params
)
if self.last_name:
Param(self.last_name, D_TYPES.VarChar, "lastname").add_to_param_list(params)
if self.local_number:
Param(self.local_number, D_TYPES.VarChar, "localnumber").add_to_param_list(
params
)
if self.city_or_areacode:
Param(
self.city_or_areacode, D_TYPES.VarChar, "city_or_areacode"
).add_to_param_list(params)
if self.email_address:
Param(
self.email_address, D_TYPES.VarChar, "emailaddress"
).add_to_param_list(params)
return params
[docs]
@dataclass
class SearchUserResponse:
success: bool
result: list = None
error_message: str = None
[docs]
class UserRow(RepresentationRowBase):
UserID: IntElement = "UserID"
CustomerID: IntElement = "CustomerID"
FirstName: StringElement = "FirstName"
MiddleName: StringElement = "MiddleName"
LastName: StringElement = "LastName"
Salutation: StringElement = "Salutation"
Suffix: StringElement = "Suffix"
SSN: StringElement = "SSN"
CreateDate: StringElement = "CreateDate"
DeletedDate: StringElement = "DeletedDate"
DefaultEmailID: IntElement = "DefaultEmailID"
DefaultAddressID: IntElement = "DefaultAddressID"
AutoGenerated: BoolElement = "AutoGenerated"
HostUser: StringElement = "HostUser"
HostPwd: StringElement = "HostPwd"
UserInfo: StringElement = "UserInfo"
ProfileUpdated: BoolElement = "ProfileUpdated"
MobileAuthCode: StringElement = "MobileAuthCode"
UseCustomerAccounts: BoolElement = "UseCustomerAccounts"
PrimaryCIF: StringElement = "PrimaryCIF"
LastUserProfileUpdate: StringElement = "LastUserProfileUpdate"
UserRoleID: IntElement = "UserRoleID"
PolicyID: IntElement = "PolicyID"
UserStatusID: IntElement = "UserStatusID"
StatusChangeDate: StringElement = "StatusChangeDate"
DOB: StringElement = "DOB"
IsOpenBanking: BoolElement = "IsOpenBanking"
[docs]
class User(DbObject):
"""
Queries the Q2_User table for rows that match the given arguments. Gathers additional details on specific
users
"""
NAME = "User"
REPRESENTATION_ROW_CLASS = UserRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_users")
subparser.set_defaults(parser="get_users")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument(
"--extended", action="store_true", help="Extends the output of rows"
)
subparser.add_argument(
"--combine", action="store_true", help="Search by both ssn and cif"
)
group = subparser.add_mutually_exclusive_group()
group.add_argument("-u", "--user-id", help="Q2_User.UserID")
group.add_argument("-l", "--login-name", help="Q2_UserLogin.LoginName")
subparser.add_argument("-s", "--social-security", help="Q2_User.SSN")
subparser.add_argument("-p", "--primary-cif", help="Q2_User.PrimaryCIF")
subparser = parser.add_parser("search_users")
subparser.set_defaults(parser="search_users")
subparser.set_defaults(func=partial(self.search, serialize_for_cli=True))
subparser.add_argument("-f", "--first-name", help="Q2_User.FirstName")
subparser.add_argument("-l", "--last-name", help="Q2_User.LastName")
subparser.add_argument("-e", "--email-address", help="Q2_User.SSN")
subparser.add_argument(
"-p",
"--phone-number",
help="Q2_PhoneNumber.CityOrAreaCode and Q2_PhoneNumber.LocalNumber",
)
subparser.add_argument("-s", "--sso-identifier")
subparser.add_argument(
"--extended", action="store_true", help="Extends the output of the rows"
)
subparser = parser.add_parser("get_users_under_customer")
subparser.set_defaults(parser="get_users_under_customer")
subparser.set_defaults(
func=partial(self.get_users_under_customer, serialize_for_cli=True)
)
subparser.add_argument("-c", "--customer-id", help="Q2_User.CustomerID")
subparser.add_argument(
"--extended", action="store_true", help="Extends the output of the rows"
)
subparser.add_argument(
"--include-deleted", action="store_true", help="include deleted users"
)
subparser = parser.add_parser("get_account_related_users")
subparser.set_defaults(parser="get_account_related_users")
subparser.set_defaults(
func=partial(self.get_account_related_users, serialize_for_cli=True)
)
subparser.add_argument(
"host_account_id", help="Q2_AccountAddress.HostAccountID"
)
subparser = parser.add_parser("set_profile_updated_flag")
subparser.set_defaults(parser="set_profile_updated_flag")
subparser.set_defaults(func=partial(self.set_profile_updated_flag))
subparser.add_argument("user_id", help="Q2_User.UserID")
subparser = parser.add_parser("get_deleted_users")
subparser.description = (
"Returns all users deleted since the beginning of the current day"
)
subparser.set_defaults(parser="get_deleted_users")
subparser.set_defaults(func=partial(self.get_deleted, serialize_for_cli=True))
subparser.add_argument(
"-d",
"--deleted-after",
type=datetime_parser.parse,
default=datetime.today(),
help="""
minimum Q2_User.DeletedDate to search in iso format ([YYYY]-[MM]-[DD]T[hh]:[mm]:[ss]-[Z]).
(Defaults to beginning of current day if not provided).
""",
)
subparser = parser.add_parser("remove_user")
subparser.set_defaults(parser="remove_user")
subparser.set_defaults(func=partial(self.delete), serialize_for_cli=True)
subparser.add_argument("user_id", help="Q2_User.UserID")
subparser = parser.add_parser("update_user")
subparser.set_defaults(parser="update_user")
subparser.set_defaults(func=partial(self.update))
subparser.add_argument("user_id", type=int, help="Q2_User.UserID")
subparser.add_argument("-s", "--ssn", help="Q2_User.SSN")
subparser.add_argument("-p", "--primary-cif", help="Q2_User.PrimaryCif")
subparser = parser.add_parser("get_users_created_on")
subparser.description = "Returns all users created on specified date"
subparser.set_defaults(parser="get_users_created_on")
subparser.set_defaults(
func=partial(self.search_by_create_date, serialize_for_cli=True)
)
subparser.add_argument(
"-c",
"--created-on",
type=datetime_parser.parse,
default=datetime.today(),
help="""
Minimum Q2_User.CreatedBy to search. (Defaults to beginning of current day if not provided).
""",
)
subparser.add_argument(
"--extended", action="store_true", help="Extends the output of rows"
)
subparser = parser.add_parser("get_admin_users_by_customer_id")
subparser.set_defaults(parser="get_admin_users_by_customer_id")
subparser.set_defaults(
func=partial(self.get_admin_users_by_customer_id, serialize_for_cli=True)
)
subparser.add_argument("-c", "--customer-id", help="Q2_User.CustomerID")
[docs]
async def get_deleted(
self, deleted_after: datetime, serialize_for_cli=False, extended=False
):
params = []
Param(
deleted_after.isoformat(), D_TYPES.Date, "deleted_after"
).add_to_param_list(params)
response = await self.call_hq(
"sdk_GetDeletedUsers", ExecuteStoredProcedure.SqlParameters(params)
)
if serialize_for_cli:
response = self.get_cli_response(extended, response)
return response
[docs]
async def get(
self,
user_id: int = None,
login_name: str = None,
social_security: str = None,
primary_cif: str = None,
extended=False,
combine=False,
serialize_for_cli=False,
) -> List[UserRow]:
if combine:
assert sum([True for x in (social_security, primary_cif) if x]) == 2, (
"If combine then both social security and primary cif must be provided"
)
else:
assert (
sum([
True
for x in (user_id, login_name, social_security, primary_cif)
if x
])
== 1
), (
"Either user_id, login_name, social_security, or primary_cif must be defined"
)
if social_security:
social_security = "".join(x for x in social_security if x.isdigit())
sql_params = self._build_get_parameters(
GetUserValues(
social_security=social_security,
primary_cif=primary_cif,
user_id=user_id,
login_name=login_name,
),
combine=combine,
)
response = await self.call_hq(
"sdk_GetUsers", ExecuteStoredProcedure.SqlParameters(sql_params)
)
if serialize_for_cli:
response = self.get_cli_response(extended, response)
return response
[docs]
async def get_many(self, user_ids: list[int]) -> list[UserRow]:
"""
Retrieves multiple users matching the given user IDs
:param: user_ids: list of Q2_User.UserIDs
"""
# <request><get u="1" /><get u="2" /></request>
user_nodes = [E.get(u=str(x)) for x in user_ids]
request_node = E.request(*user_nodes)
request_string = tostring(request_node, encoding="utf-8")
params = []
Param(request_string.decode(), D_TYPES.Xml, "request").add_to_param_list(params)
return await self.call_hq(
"sdk_GetMultipleUsers", ExecuteStoredProcedure.SqlParameters(params)
)
def _build_get_parameters(self, get_info: GetUserValues, combine):
parameters = []
possible_params = []
if combine:
possible_params.extend([
Param(get_info.social_security, D_TYPES.VarChar, "social_security"),
Param(get_info.primary_cif, D_TYPES.VarChar, "primary_cif"),
])
elif get_info.user_id:
possible_params.append(Param(get_info.user_id, D_TYPES.Int, "user_id"))
elif get_info.login_name:
possible_params.append(
Param(get_info.login_name, D_TYPES.VarChar, "login_name")
)
elif get_info.social_security:
possible_params.append(
Param(get_info.social_security, D_TYPES.VarChar, "social_security")
)
else:
possible_params.append(
Param(get_info.primary_cif, D_TYPES.VarChar, "primary_cif")
)
for item in possible_params:
item.add_to_param_list(parameters)
return parameters
[docs]
async def get_users_under_customer(
self,
customer_id,
extended=False,
serialize_for_cli=False,
include_deleted=False,
):
assert int(customer_id), "Please supply a valid customer id"
parameters = []
Param(customer_id, D_TYPES.Int, "customer_id").add_to_param_list(parameters)
if include_deleted:
Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(parameters)
response = await self.call_hq(
"sdk_GetUsersUnderCustomer",
ExecuteStoredProcedure.SqlParameters(parameters),
)
if serialize_for_cli:
response = self.get_cli_response(extended, response)
return response
[docs]
async def set_profile_updated_flag(self, user_id: int):
assert isinstance(user_id, int), "Please supply a valid user id"
params = []
Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(params)
return await self.call_hq(
"sdk_SetProfileUpdatedUser", ExecuteStoredProcedure.SqlParameters(params)
)
[docs]
def get_cli_response(self, extended: bool, response):
columns = ["UserID", "CustomerID", "FirstName", "LastName", "SSN", "PrimaryCIF"]
if extended:
columns.extend([
"Salutation",
"Suffix",
"SSN",
"CreateDate",
"DeletedDate",
"DefaultEmailID",
"DefaultAddressID",
"AutoGenerated",
"HostUser",
"HostPwd",
"UserInfo",
"ProfileUpdated",
"UserRoleID",
])
return self.serialize_for_cli(response, columns)
[docs]
async def update_dob(self, user_id: int, dob: datetime):
assert isinstance(user_id, int), "Please supply a valid user id"
assert isinstance(dob, datetime), "Please supply a DOB as a datetime object"
dob_as_string = dob.strftime("%Y-%m-%d")
params = []
Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(params)
Param(dob_as_string, D_TYPES.Date, "dob").add_to_param_list(params)
return await self.call_hq(
"sdk_UpdateUserDOB", ExecuteStoredProcedure.SqlParameters(params)
)
[docs]
async def update(
self, user_id: int, ssn=None, primary_cif=None, unset_primary_cif=False
):
assert isinstance(user_id, int), "Please provide a valid user id"
parameters = [ssn, primary_cif, unset_primary_cif]
assert any(parameters), "No values to update"
parameters.insert(0, user_id)
update_values = UpdateUserValues(*parameters)
sql_params = self._build_update_parameters(update_values)
response = await self.call_hq(
"sdk_UpdateUser", ExecuteStoredProcedure.SqlParameters(sql_params)
)
return response
@staticmethod
def _build_update_parameters(update_info: UpdateUserValues):
parameters = []
possible_params = [
Param(update_info.user_id, D_TYPES.Int, "user_id"),
Param(update_info.ssn, D_TYPES.VarChar, "ssn", True),
Param(update_info.primary_cif, D_TYPES.VarChar, "primary_cif", True),
]
if update_info.unset_primary_cif:
possible_params.append(
Param(update_info.unset_primary_cif, D_TYPES.Bit, "unset_primary_cif")
)
for item in possible_params:
item.add_to_param_list(parameters)
return parameters
[docs]
async def delete(self, user_id: int, serialize_for_cli=None):
if isinstance(user_id, str):
user_id = int(user_id)
assert isinstance(user_id, int)
params_obj = DeleteUser.ParamsObj(
self.logger, user_id, hq_credentials=self.hq_credentials
)
response = await DeleteUser.execute(params_obj)
if not serialize_for_cli:
return DeleteUserParams(
success=response.success,
error_message=response.error_message,
response=response,
)
[docs]
async def search_by_create_date(
self, created_on: Union[date, datetime], serialize_for_cli=False, extended=False
) -> List[UserRow]:
created_on_start = datetime.combine(created_on, datetime.min.time())
created_on_stop = datetime.combine(created_on, datetime.max.time())
sql_params = []
Param(
created_on_start.isoformat(), D_TYPES.DateTime, "created_on_start"
).add_to_param_list(sql_params)
Param(
created_on_stop.isoformat(), D_TYPES.DateTime, "created_on_stop"
).add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetUsersByCreateDate", ExecuteStoredProcedure.SqlParameters(sql_params)
)
if serialize_for_cli:
response = self.get_cli_response(extended, response)
return response
[docs]
async def search(
self,
first_name: str = "",
last_name: str = "",
email_address: str = "",
phone_number: str = "",
sso_identifier: str = "",
serialize_for_cli=False,
extended=False,
) -> SearchUserResponse:
search_params = SearchUserValues(
first_name=first_name,
last_name=last_name,
email_address=email_address,
phone_number=phone_number,
sso_identifier=sso_identifier,
)
if not search_params.valid_inputs():
msg = "Invalid parameters. Expected any combination of first_name, last_name, email_address, phone_number, or sso_identifier]"
return (
msg
if serialize_for_cli
else SearchUserResponse(success=False, error_message=msg)
)
elif search_params.sso_identifier:
results = await self.search_by_sso_identifier(search_params.sso_identifier)
if not results:
msg = "No users found with sso_identifier"
return (
msg
if serialize_for_cli
else SearchUserResponse(success=False, error_message=msg)
)
else:
stored_proc = self._find_search_stored_proc(search_params)
sql_params = search_params.hydrate_stored_proc_params()
results = await self.call_hq(
stored_proc, ExecuteStoredProcedure.SqlParameters(sql_params)
)
if not results:
msg = "No users found"
return (
msg
if serialize_for_cli
else SearchUserResponse(success=False, error_message=msg)
)
if serialize_for_cli:
return self.get_cli_response(extended, results)
return SearchUserResponse(success=True, result=results)
def _find_search_stored_proc(self, search_params: SearchUserValues):
proc_name = ""
if search_params.email_address:
if search_params.phone_number:
self._hydrate_domestic_phone_search(search_params)
proc_name = "CaliperApi_SearchUsers"
else:
proc_name = "CaliperApi_SearchUsersByEmail"
else:
if search_params.phone_number:
self._hydrate_domestic_phone_search(search_params)
proc_name = "CaliperApi_SearchUsersByPhone"
else:
proc_name = "CaliperApi_SearchUsersByName"
self.logger.debug(f"Calling stored procedure {proc_name}")
return proc_name
[docs]
async def search_by_sso_identifier(self, sso_identifier: str):
sso_user_obj = SsoUserLogon(self.logger, hq_credentials=self.hq_credentials)
response = await sso_user_obj.get(sso_identifier)
if not response:
return
user_logon_id = response[0].UserLogonID.text
user_obj = User(self.logger, hq_credentials=self.hq_credentials)
login = await UserLogon(
self.logger, hq_credentials=self.hq_credentials
).get_login_by_logon_id(user_logon_id)
user_id = login[0].UserID.text
return await user_obj.get(user_id=user_id)
def _hydrate_domestic_phone_search(self, search_params: SearchUserValues):
built_phone = Phone.build_from_str(search_params.phone_number, None)
if built_phone and len(built_phone.phone_number) == 7:
search_params.city_or_areacode = built_phone.area_code
search_params.local_number = built_phone.phone_number
self.logger.debug(
f"parsed phone number: {search_params.city_or_areacode}-{search_params.local_number}"
)
[docs]
async def get_admin_users_by_customer_id(
self, customer_id: int, serialize_for_cli=False
):
params = []
Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(params)
response = await self.call_hq(
"CaliperApi_GetAdminUserByCustomerID",
ExecuteStoredProcedure.SqlParameters(params),
)
if serialize_for_cli:
columns = [
"Weight",
"UserID",
"CustomerID",
"FirstName",
"LastName",
]
response = self.serialize_for_cli(response, columns)
return response