Source code for q2_sdk.hq.db.user_logon

from argparse import _SubParsersAction
from datetime import date, datetime, timedelta
from enum import Enum
from functools import partial
from typing import List, Union
from dataclasses import dataclass

from dateutil import parser as datetime_parser
from lxml.etree import tostring
from lxml.objectify import BoolElement, E, IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq import api_helpers
from q2_sdk.hq.db.audit_record import AuditRecord
from q2_sdk.hq.hq_api.q2_api import (
    ChangeEndUserLogonName,
    ChangeEndUserPassword,
    ChangeEndUserPasswordStatus,
    ValidatePasswordComplexity,
    ValidatePasswordComplexityZoned,
)
from q2_sdk.core.exceptions import BadParameterError
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from q2_sdk.hq.models.password_policy import PasswordPolicy

from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] @dataclass class ZoneData: group_id: int = None zone_id: int = None def __post_init__(self): valid = True if not any([self.group_id, self.zone_id]): valid = False elif all([self.group_id, self.zone_id]): valid = False if not valid: raise BadParameterError( "ZoneData requires one input parameter of either group id or zone id." ) @property def context_id(self): return self.group_id if self.group_id else self.zone_id @property def context_type(self): return ( ValidatePasswordComplexityZoned.ContextType.GroupId if self.group_id else ValidatePasswordComplexityZoned.ContextType.ZoneId )
[docs] class UserLogonRow(RepresentationRowBase): UserLogonID: IntElement = "UserLogonID" UserID: IntElement = "UserID" LoginName: StringElement = "LoginName" UserPassword: StringElement = "UserPassword" UISourceID: IntElement = "UISourceID" LastChange: StringElement = "LastChange" LastLogon: StringElement = "LastLogon" Status: StringElement = "Status" StatusReason: StringElement = "StatusReason" LastFailed: StringElement = "LastFailed" CreateDate: StringElement = "CreateDate" NumInvalidAttempts: IntElement = "NumInvalidAttempts" AutoGenerated: BoolElement = "AutoGenerated" PasswordChangeRequired: BoolElement = "PasswordChangeRequired" PasswordHasNeverChanged: BoolElement = "PasswordHasNeverChanged" NumInvalidOobAttempts: IntElement = "NumInvalidOobAttempts"
[docs] class AdminLogonRow(RepresentationRowBase): AdminUserLogonID: IntElement = "AdminUserLogonID" AdminUserID: IntElement = "AdminUserID" LoginName: StringElement = "LoginName" DeletedDate: StringElement = "DeletedDate"
[docs] class LogonStatus(Enum): Normal = "Normal" New = "New" Locked = "Locked" Expired = "Expired" Disabled = "Disabled" InitialPwdExpired = "InitialPwdExpired" TacRequired = "Tac" ChallengeRequested = "Cqr" Token = "Token" NewTacNotRequired = "NewTacNotRequired"
[docs] class UserLogon(DbObject): GET_BY_NAME_KEY = "LoginName" NAME = "UserLogon" REPRESENTATION_ROW_CLASS = UserLogonRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_user_logon") subparser.set_defaults(parser="get_user_logon") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("-u", "--user_id", help="Q2_User.UserID") subparser.add_argument("-l", "--logon_name", help="Q2_UserLogon.LoginName") subparser.add_argument( "--admin", action="store_true", help="Search the Q2_AdminLogons table instead", ) subparser.add_argument( "--include-deleted", action="store_true", help="Include any deleted logins" ) subparser = parser.add_parser("change_user_logon_name") subparser.set_defaults(parser="change_user_logon_name") subparser.set_defaults( func=partial(self.change_user_logonname), serialize_for_cli=True ) subparser.add_argument("old_logon_name", help="Q2_UserLogon.LoginName") subparser.add_argument("new_logon_name", help="New login name") subparser = parser.add_parser("unlock_user") subparser.set_defaults(parser="unlock_user") subparser.set_defaults(func=partial(self.unlock)) subparser.add_argument("logon_name", help="Q2_UserLogon.LoginName") subparser = parser.add_parser("reset_password") subparser.set_defaults(parser="reset_password") subparser.set_defaults(func=partial(self.reset_password)) subparser.add_argument("logon_name", help="Q2_UserLogon.LoginName") subparser.add_argument("-n", "--new-password", help="new password to set") subparser = parser.add_parser("get_logons_without_login_since") subparser.description = ( "Returns all logons that have not logged in since the date specified" ) subparser.set_defaults(parser="get_logons_without_login_since") subparser.set_defaults( func=partial(self.get_logons_without_login_since, serialize_for_cli=True) ) subparser.add_argument( "-l", "--last_logon_date", type=datetime_parser.parse, default=datetime.today(), help="Date to search last logons", ) subparser = parser.add_parser("get_active_users_since") subparser.description = ( "Returns all userids that are active since the given number " ) subparser.set_defaults(parser="get_active_users_since") subparser.set_defaults( func=partial(self.get_active_users_since, serialize_for_cli=True) ) subparser.add_argument( "active_since_in_days", type=int, help="Number of days since which the users have been active", ) subparser.add_argument( "-pn", "--page-number", type=int, default=1, help="The starting point for pagination", ) subparser.add_argument( "-rc", "--return-count", type=int, default=100, help="Number of UserIDs to return per page", )
[docs] async def get_login_by_name( self, logon_name: str, admin=False, include_deleted=False ) -> List[Union[UserLogonRow, AdminLogonRow]]: params = [] Param(logon_name, D_TYPES.VarChar, "user_logon").add_to_param_list(params) Param(include_deleted, D_TYPES.Bit, "include_deleted").add_to_param_list(params) if admin: Param(True, D_TYPES.Bit, "search_admin").add_to_param_list(params) return await self.call_hq( "sdk_GetLoginByName", ExecuteStoredProcedure.SqlParameters(params), representation_class_override=AdminLogonRow if admin else self.REPRESENTATION_ROW_CLASS, )
[docs] async def get_login_by_id( self, user_id: int, admin=False, include_deleted=False ) -> List[Union[UserLogonRow, AdminLogonRow]]: params = [] Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(params) stored_proc_name = "sdk_GetLoginByUserID" if admin: stored_proc_name = "sdk_GetAdminLoginByUserID" if include_deleted: Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(params) return await self.call_hq( stored_proc_name, sql_parameters=ExecuteStoredProcedure.SqlParameters(params), representation_class_override=AdminLogonRow if admin else self.REPRESENTATION_ROW_CLASS, )
[docs] async def get_list_of_associated_logons(self, user_id: str) -> list: logons = [] found_logins = await UserLogon(self.logger, self.hq_credentials).get(user_id) for login_name in found_logins: if not hasattr(login_name, "CsrAssistAdminUserID"): logons.append(login_name["LoginName"].text) return logons
[docs] async def get( self, user_id: int = None, logon_name: str = None, serialize_for_cli=False, admin=False, include_deleted=False, ) -> List[Union[UserLogonRow, AdminLogonRow]]: assert any([user_id, logon_name]), "Must provide either user_id or logon_name" if user_id: response = await self.get_login_by_id(user_id, admin, include_deleted) elif logon_name: response = await self.get_login_by_name(logon_name, admin, include_deleted) if serialize_for_cli: if not admin: columns = [ "UserLogonID", "LoginName", "StatusReason", "LastFailed", "CreateDate", "NumInvalidAttempts", "PasswordChangeRequired", ] else: columns = [ "AdminUserLogonID", "AdminUserID", "LoginName", "DeletedDate", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def unlock(self, logon_name: str): response = await ChangeEndUserPasswordStatus.execute( ChangeEndUserPasswordStatus.ParamsObj( self.logger, logon_name, ui_source="1", password_action="Unlock", hq_credentials=self.hq_credentials, ) ) msg = "Success" if not response.success: msg = "Failure" return msg
[docs] async def reset_password( self, logon_name: str, new_password: str = None, force_change=True ): if not new_password: new_password = await self.generate_password() else: result = await self.validate_password(new_password) if not result.success: return result.error_message hq_response = await self.change_endusers_password(logon_name, new_password) if not hq_response.success: return hq_response.error_message if force_change: await self.set_password_status_as_accepted(logon_name) return f"Login {logon_name} password was successfully reset. New Password: {new_password}"
[docs] async def generate_password(self, group_id: int = None): password_policy = PasswordPolicy() await password_policy.populate_from_hq( self.logger, ui_source_id="OnlineBanking", hq_credentials=self.hq_credentials, group_id=group_id, ) return api_helpers.generate_user_logon_password(password_policy)
[docs] async def validate_password(self, new_password, zone_data: None | ZoneData = None): if zone_data: parameters = ValidatePasswordComplexityZoned.ParamsObj( self.logger, "OnlineBanking", new_password, zone_data.context_id, zone_data.context_type, hq_credentials=self.hq_credentials, ) response = await ValidatePasswordComplexityZoned.execute(parameters) else: parameters = ValidatePasswordComplexity.ParamsObj( self.logger, "OnlineBanking", new_password, hq_credentials=self.hq_credentials, ) response = await ValidatePasswordComplexity.execute(parameters) return response
[docs] async def change_endusers_password(self, logon_name, new_password): password_change_params = ChangeEndUserPassword.ParamsObj( self.logger, logon_name, ui_source="OnlineBanking", new_end_user_password=new_password, hq_credentials=self.hq_credentials, ) return await ChangeEndUserPassword.execute(password_change_params)
[docs] async def delete( self, user_obj: OnlineUser, session_obj: OnlineSession, login_name: str = None, user_logonid: int = None, ): assert any([ login_name, user_logonid, ]), "Must provide either login_name or user_logonid" # We are first going to look up as much as we can find about this login if user_logonid: response = await self.get_login_by_logon_id(user_logonid, False) login_name = response[0].LoginName.text # Ok, no matter if we received a login_name, or user_logonid, we should have a login_name now. Let's go look for # the list of logins including deleted ones that we might have in the DB. response = await self._get_all_logins_by_logonname( login_name, include_deleted=True ) # ok, we might have a list now. Let's loop through it to find the oldest one we have. login_id, new_loginname = await self._generate_new_loginname( login_name, response ) # OK, we should now have a login_id, our loginname and a new_loginname. Let's call the stored proc to rename the login to the # new_loginname and mark deleted. params = [] Param(login_id, D_TYPES.Int, "logon_id").add_to_param_list(params) Param(new_loginname, D_TYPES.VarChar, "new_login_name").add_to_param_list( params ) response = await self.call_hq( "sdk_DeleteUserLogon", ExecuteStoredProcedure.SqlParameters(params) ) if user_logonid: audit_desc = ( f"SDK request to delete UserLogon with UserLogonID: {user_logonid}" ) else: audit_desc = f"SDK request to delete UserLogon with LoginName: {login_name}" audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( audit_desc, session_id=session_obj.session_id, workstation_id=session_obj.workstation, customer_id=user_obj.customer_id, user_id=user_obj.user_id, user_logon_id=user_obj.user_logon_id, ) return response
[docs] async def get_login_by_logon_id( self, user_logonid: int, admin=False ) -> List[Union[UserLogonRow, AdminLogonRow]]: params = [] Param(user_logonid, D_TYPES.Int, "user_logonid").add_to_param_list(params) stored_proc_name = "sdk_GetLoginByLogonID" if admin: stored_proc_name = "sdk_GetAdminLoginByLogonID" return await self.call_hq( stored_proc_name, ExecuteStoredProcedure.SqlParameters(params) )
async def _get_all_logins_by_logonname( self, login_name: str, admin=False, include_deleted=False ): params = [] Param(login_name, D_TYPES.VarChar, "user_logon").add_to_param_list(params) if include_deleted: Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(params) stored_proc_name = "sdk_GetLoginByName" if admin: stored_proc_name = "sdk_GetAdminLoginByName" return await self.call_hq( stored_proc_name, ExecuteStoredProcedure.SqlParameters(params) ) async def _generate_new_loginname(self, login_name, response): high_count = 0 login_id = -1 for each in response: if each.LoginName.text == login_name: login_id = each.UserLogonID.pyval login_parts = each.LoginName.text.split("/") if len(login_parts) > 1: if login_parts[0] != login_name: self.logger.error( "Not sure what happened. If you see this, report error 15141541" ) if int(login_parts[-1]) > high_count: high_count = int(login_parts[-1]) return login_id, f"{login_name}/{high_count + 1}"
[docs] async def get_logons_from_user_id_list(self, list_of_users, included_deleted=False): set_of_users = set(list_of_users) xml_list = [] for user_id in set_of_users: xml_list.append(E.user(id=str(user_id))) xml_string = tostring(E.userList(*xml_list), encoding="unicode") params = [] Param(xml_string, D_TYPES.Xml, "UserList").add_to_param_list(params) if included_deleted: Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(params) results = await self.call_hq( "sdk_GetLogonsFromListOfUserIds", ExecuteStoredProcedure.SqlParameters(params), ) return results
[docs] async def set_password_status_as_accepted(self, login_name: str): """ Just making a change to the DB so the user can log in without changing their password. """ sql_params = [] Param("skip_password", D_TYPES.Text, "action").add_to_param_list(sql_params) Param(login_name, D_TYPES.Text, "login_name").add_to_param_list(sql_params) Param("", D_TYPES.Text, "primary_cif").add_to_param_list(sql_params) Param("", D_TYPES.Text, "ssn").add_to_param_list(sql_params) params = ExecuteStoredProcedure.SqlParameters(sql_params) params_obj = ExecuteStoredProcedure.ParamsObj( self.logger, "sdk_API_Enrollment", sql_parameters=params, hq_credentials=self.hq_credentials, ) return await ExecuteStoredProcedure.execute(params_obj)
[docs] async def get_logons_without_login_since( self, last_logon_date: date | datetime, serialize_for_cli=False, status_filter: LogonStatus = None, ): last_logon_since = datetime.combine(last_logon_date, datetime.min.time()) sql_params = [] Param( last_logon_since.isoformat(), D_TYPES.DateTime, "last_logon_since" ).add_to_param_list(sql_params) if status_filter: Param( status_filter.value, D_TYPES.VarChar, "status_filter" ).add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetLogonWithoutLoginSince", ExecuteStoredProcedure.SqlParameters(sql_params), ) if serialize_for_cli: response = self.serialize_for_cli( response, [ "UserLogonID", "UserID", "LoginName", "LastLogon", "CreateDate", "Status", ], ) return response
[docs] async def change_user_logonname( self, old_logon_name, new_logon_name, serialize_for_cli=False ): params_obj = ChangeEndUserLogonName.ParamsObj( logger=self.logger, end_user_logon_name=old_logon_name, ui_source="OnlineBanking", new_logon_name=new_logon_name, hq_credentials=self.hq_credentials, ) response = await ChangeEndUserLogonName.execute(params_obj) if serialize_for_cli: return "Failure" if not response.success else "Success" else: return response
[docs] async def get_active_users_since( self, active_since_in_days: int, return_count: int = 100, page_number: int = 1, serialize_for_cli=False, ): offset = (page_number - 1) * return_count assert offset >= 0, "page_number must be 1 or greater" sql_params = [] current_date = datetime.now() time_delta = timedelta(active_since_in_days) active_since_date = current_date - time_delta Param( active_since_date.isoformat(), D_TYPES.DateTime, "active_since_date" ).add_to_param_list(sql_params) Param(offset, D_TYPES.Int, "offset").add_to_param_list(sql_params) Param(return_count, D_TYPES.Int, "return_count").add_to_param_list(sql_params) response = await self.call_hq( "sdk_GetActiveUsersSince", ExecuteStoredProcedure.SqlParameters(sql_params) ) if serialize_for_cli: response = self.serialize_for_cli( response, ["UserLogonID", "UserID", "LoginName", "LastLogon", "CreateDate"], ) return response