from argparse import _SubParsersAction
from datetime import date, datetime, timedelta
from enum import Enum
from functools import partial
from typing import List, Union
from dataclasses import dataclass
from dateutil import parser as datetime_parser
from lxml.etree import tostring
from lxml.objectify import BoolElement, E, IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq import api_helpers
from q2_sdk.hq.db.audit_record import AuditRecord
from q2_sdk.hq.hq_api.q2_api import (
ChangeEndUserLogonName,
ChangeEndUserPassword,
ChangeEndUserPasswordStatus,
ValidatePasswordComplexity,
ValidatePasswordComplexityZoned,
)
from q2_sdk.core.exceptions import BadParameterError
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from q2_sdk.hq.models.password_policy import PasswordPolicy
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
@dataclass
class ZoneData:
group_id: int = None
zone_id: int = None
def __post_init__(self):
valid = True
if not any([self.group_id, self.zone_id]):
valid = False
elif all([self.group_id, self.zone_id]):
valid = False
if not valid:
raise BadParameterError(
"ZoneData requires one input parameter of either group id or zone id."
)
@property
def context_id(self):
return self.group_id if self.group_id else self.zone_id
@property
def context_type(self):
return (
ValidatePasswordComplexityZoned.ContextType.GroupId
if self.group_id
else ValidatePasswordComplexityZoned.ContextType.ZoneId
)
[docs]
class UserLogonRow(RepresentationRowBase):
UserLogonID: IntElement = "UserLogonID"
UserID: IntElement = "UserID"
LoginName: StringElement = "LoginName"
UserPassword: StringElement = "UserPassword"
UISourceID: IntElement = "UISourceID"
LastChange: StringElement = "LastChange"
LastLogon: StringElement = "LastLogon"
Status: StringElement = "Status"
StatusReason: StringElement = "StatusReason"
LastFailed: StringElement = "LastFailed"
CreateDate: StringElement = "CreateDate"
NumInvalidAttempts: IntElement = "NumInvalidAttempts"
AutoGenerated: BoolElement = "AutoGenerated"
PasswordChangeRequired: BoolElement = "PasswordChangeRequired"
PasswordHasNeverChanged: BoolElement = "PasswordHasNeverChanged"
NumInvalidOobAttempts: IntElement = "NumInvalidOobAttempts"
[docs]
class AdminLogonRow(RepresentationRowBase):
AdminUserLogonID: IntElement = "AdminUserLogonID"
AdminUserID: IntElement = "AdminUserID"
LoginName: StringElement = "LoginName"
DeletedDate: StringElement = "DeletedDate"
[docs]
class LogonStatus(Enum):
Normal = "Normal"
New = "New"
Locked = "Locked"
Expired = "Expired"
Disabled = "Disabled"
InitialPwdExpired = "InitialPwdExpired"
TacRequired = "Tac"
ChallengeRequested = "Cqr"
Token = "Token"
NewTacNotRequired = "NewTacNotRequired"
[docs]
class UserLogon(DbObject):
GET_BY_NAME_KEY = "LoginName"
NAME = "UserLogon"
REPRESENTATION_ROW_CLASS = UserLogonRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_user_logon")
subparser.set_defaults(parser="get_user_logon")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument("-u", "--user_id", help="Q2_User.UserID")
subparser.add_argument("-l", "--logon_name", help="Q2_UserLogon.LoginName")
subparser.add_argument(
"--admin",
action="store_true",
help="Search the Q2_AdminLogons table instead",
)
subparser.add_argument(
"--include-deleted", action="store_true", help="Include any deleted logins"
)
subparser = parser.add_parser("change_user_logon_name")
subparser.set_defaults(parser="change_user_logon_name")
subparser.set_defaults(
func=partial(self.change_user_logonname), serialize_for_cli=True
)
subparser.add_argument("old_logon_name", help="Q2_UserLogon.LoginName")
subparser.add_argument("new_logon_name", help="New login name")
subparser = parser.add_parser("unlock_user")
subparser.set_defaults(parser="unlock_user")
subparser.set_defaults(func=partial(self.unlock))
subparser.add_argument("logon_name", help="Q2_UserLogon.LoginName")
subparser = parser.add_parser("reset_password")
subparser.set_defaults(parser="reset_password")
subparser.set_defaults(func=partial(self.reset_password))
subparser.add_argument("logon_name", help="Q2_UserLogon.LoginName")
subparser.add_argument("-n", "--new-password", help="new password to set")
subparser = parser.add_parser("get_logons_without_login_since")
subparser.description = (
"Returns all logons that have not logged in since the date specified"
)
subparser.set_defaults(parser="get_logons_without_login_since")
subparser.set_defaults(
func=partial(self.get_logons_without_login_since, serialize_for_cli=True)
)
subparser.add_argument(
"-l",
"--last_logon_date",
type=datetime_parser.parse,
default=datetime.today(),
help="Date to search last logons",
)
subparser = parser.add_parser("get_active_users_since")
subparser.description = (
"Returns all userids that are active since the given number "
)
subparser.set_defaults(parser="get_active_users_since")
subparser.set_defaults(
func=partial(self.get_active_users_since, serialize_for_cli=True)
)
subparser.add_argument(
"active_since_in_days",
type=int,
help="Number of days since which the users have been active",
)
subparser.add_argument(
"-pn",
"--page-number",
type=int,
default=1,
help="The starting point for pagination",
)
subparser.add_argument(
"-rc",
"--return-count",
type=int,
default=100,
help="Number of UserIDs to return per page",
)
[docs]
async def get_login_by_name(
self, logon_name: str, admin=False, include_deleted=False
) -> List[Union[UserLogonRow, AdminLogonRow]]:
params = []
Param(logon_name, D_TYPES.VarChar, "user_logon").add_to_param_list(params)
Param(include_deleted, D_TYPES.Bit, "include_deleted").add_to_param_list(params)
if admin:
Param(True, D_TYPES.Bit, "search_admin").add_to_param_list(params)
return await self.call_hq(
"sdk_GetLoginByName",
ExecuteStoredProcedure.SqlParameters(params),
representation_class_override=AdminLogonRow
if admin
else self.REPRESENTATION_ROW_CLASS,
)
[docs]
async def get_login_by_id(
self, user_id: int, admin=False, include_deleted=False
) -> List[Union[UserLogonRow, AdminLogonRow]]:
params = []
Param(user_id, D_TYPES.Int, "user_id").add_to_param_list(params)
stored_proc_name = "sdk_GetLoginByUserID"
if admin:
stored_proc_name = "sdk_GetAdminLoginByUserID"
if include_deleted:
Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(params)
return await self.call_hq(
stored_proc_name,
sql_parameters=ExecuteStoredProcedure.SqlParameters(params),
representation_class_override=AdminLogonRow
if admin
else self.REPRESENTATION_ROW_CLASS,
)
[docs]
async def get_list_of_associated_logons(self, user_id: str) -> list:
logons = []
found_logins = await UserLogon(self.logger, self.hq_credentials).get(user_id)
for login_name in found_logins:
if not hasattr(login_name, "CsrAssistAdminUserID"):
logons.append(login_name["LoginName"].text)
return logons
[docs]
async def get(
self,
user_id: int = None,
logon_name: str = None,
serialize_for_cli=False,
admin=False,
include_deleted=False,
) -> List[Union[UserLogonRow, AdminLogonRow]]:
assert any([user_id, logon_name]), "Must provide either user_id or logon_name"
if user_id:
response = await self.get_login_by_id(user_id, admin, include_deleted)
elif logon_name:
response = await self.get_login_by_name(logon_name, admin, include_deleted)
if serialize_for_cli:
if not admin:
columns = [
"UserLogonID",
"LoginName",
"StatusReason",
"LastFailed",
"CreateDate",
"NumInvalidAttempts",
"PasswordChangeRequired",
]
else:
columns = [
"AdminUserLogonID",
"AdminUserID",
"LoginName",
"DeletedDate",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def unlock(self, logon_name: str):
response = await ChangeEndUserPasswordStatus.execute(
ChangeEndUserPasswordStatus.ParamsObj(
self.logger,
logon_name,
ui_source="1",
password_action="Unlock",
hq_credentials=self.hq_credentials,
)
)
msg = "Success"
if not response.success:
msg = "Failure"
return msg
[docs]
async def reset_password(
self, logon_name: str, new_password: str = None, force_change=True
):
if not new_password:
new_password = await self.generate_password()
else:
result = await self.validate_password(new_password)
if not result.success:
return result.error_message
hq_response = await self.change_endusers_password(logon_name, new_password)
if not hq_response.success:
return hq_response.error_message
if force_change:
await self.set_password_status_as_accepted(logon_name)
return f"Login {logon_name} password was successfully reset. New Password: {new_password}"
[docs]
async def generate_password(self, group_id: int = None):
password_policy = PasswordPolicy()
await password_policy.populate_from_hq(
self.logger,
ui_source_id="OnlineBanking",
hq_credentials=self.hq_credentials,
group_id=group_id,
)
return api_helpers.generate_user_logon_password(password_policy)
[docs]
async def validate_password(self, new_password, zone_data: None | ZoneData = None):
if zone_data:
parameters = ValidatePasswordComplexityZoned.ParamsObj(
self.logger,
"OnlineBanking",
new_password,
zone_data.context_id,
zone_data.context_type,
hq_credentials=self.hq_credentials,
)
response = await ValidatePasswordComplexityZoned.execute(parameters)
else:
parameters = ValidatePasswordComplexity.ParamsObj(
self.logger,
"OnlineBanking",
new_password,
hq_credentials=self.hq_credentials,
)
response = await ValidatePasswordComplexity.execute(parameters)
return response
[docs]
async def change_endusers_password(self, logon_name, new_password):
password_change_params = ChangeEndUserPassword.ParamsObj(
self.logger,
logon_name,
ui_source="OnlineBanking",
new_end_user_password=new_password,
hq_credentials=self.hq_credentials,
)
return await ChangeEndUserPassword.execute(password_change_params)
[docs]
async def delete(
self,
user_obj: OnlineUser,
session_obj: OnlineSession,
login_name: str = None,
user_logonid: int = None,
):
assert any([
login_name,
user_logonid,
]), "Must provide either login_name or user_logonid"
# We are first going to look up as much as we can find about this login
if user_logonid:
response = await self.get_login_by_logon_id(user_logonid, False)
login_name = response[0].LoginName.text
# Ok, no matter if we received a login_name, or user_logonid, we should have a login_name now. Let's go look for
# the list of logins including deleted ones that we might have in the DB.
response = await self._get_all_logins_by_logonname(
login_name, include_deleted=True
)
# ok, we might have a list now. Let's loop through it to find the oldest one we have.
login_id, new_loginname = await self._generate_new_loginname(
login_name, response
)
# OK, we should now have a login_id, our loginname and a new_loginname. Let's call the stored proc to rename the login to the
# new_loginname and mark deleted.
params = []
Param(login_id, D_TYPES.Int, "logon_id").add_to_param_list(params)
Param(new_loginname, D_TYPES.VarChar, "new_login_name").add_to_param_list(
params
)
response = await self.call_hq(
"sdk_DeleteUserLogon", ExecuteStoredProcedure.SqlParameters(params)
)
if user_logonid:
audit_desc = (
f"SDK request to delete UserLogon with UserLogonID: {user_logonid}"
)
else:
audit_desc = f"SDK request to delete UserLogon with LoginName: {login_name}"
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
audit_desc,
session_id=session_obj.session_id,
workstation_id=session_obj.workstation,
customer_id=user_obj.customer_id,
user_id=user_obj.user_id,
user_logon_id=user_obj.user_logon_id,
)
return response
[docs]
async def get_login_by_logon_id(
self, user_logonid: int, admin=False
) -> List[Union[UserLogonRow, AdminLogonRow]]:
params = []
Param(user_logonid, D_TYPES.Int, "user_logonid").add_to_param_list(params)
stored_proc_name = "sdk_GetLoginByLogonID"
if admin:
stored_proc_name = "sdk_GetAdminLoginByLogonID"
return await self.call_hq(
stored_proc_name, ExecuteStoredProcedure.SqlParameters(params)
)
async def _get_all_logins_by_logonname(
self, login_name: str, admin=False, include_deleted=False
):
params = []
Param(login_name, D_TYPES.VarChar, "user_logon").add_to_param_list(params)
if include_deleted:
Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(params)
stored_proc_name = "sdk_GetLoginByName"
if admin:
stored_proc_name = "sdk_GetAdminLoginByName"
return await self.call_hq(
stored_proc_name, ExecuteStoredProcedure.SqlParameters(params)
)
async def _generate_new_loginname(self, login_name, response):
high_count = 0
login_id = -1
for each in response:
if each.LoginName.text == login_name:
login_id = each.UserLogonID.pyval
login_parts = each.LoginName.text.split("/")
if len(login_parts) > 1:
if login_parts[0] != login_name:
self.logger.error(
"Not sure what happened. If you see this, report error 15141541"
)
if int(login_parts[-1]) > high_count:
high_count = int(login_parts[-1])
return login_id, f"{login_name}/{high_count + 1}"
[docs]
async def get_logons_from_user_id_list(self, list_of_users, included_deleted=False):
set_of_users = set(list_of_users)
xml_list = []
for user_id in set_of_users:
xml_list.append(E.user(id=str(user_id)))
xml_string = tostring(E.userList(*xml_list), encoding="unicode")
params = []
Param(xml_string, D_TYPES.Xml, "UserList").add_to_param_list(params)
if included_deleted:
Param(True, D_TYPES.Bit, "include_deleted").add_to_param_list(params)
results = await self.call_hq(
"sdk_GetLogonsFromListOfUserIds",
ExecuteStoredProcedure.SqlParameters(params),
)
return results
[docs]
async def set_password_status_as_accepted(self, login_name: str):
"""
Just making a change to the DB so the user can log in without changing their password.
"""
sql_params = []
Param("skip_password", D_TYPES.Text, "action").add_to_param_list(sql_params)
Param(login_name, D_TYPES.Text, "login_name").add_to_param_list(sql_params)
Param("", D_TYPES.Text, "primary_cif").add_to_param_list(sql_params)
Param("", D_TYPES.Text, "ssn").add_to_param_list(sql_params)
params = ExecuteStoredProcedure.SqlParameters(sql_params)
params_obj = ExecuteStoredProcedure.ParamsObj(
self.logger,
"sdk_API_Enrollment",
sql_parameters=params,
hq_credentials=self.hq_credentials,
)
return await ExecuteStoredProcedure.execute(params_obj)
[docs]
async def get_logons_without_login_since(
self,
last_logon_date: date | datetime,
serialize_for_cli=False,
status_filter: LogonStatus = None,
):
last_logon_since = datetime.combine(last_logon_date, datetime.min.time())
sql_params = []
Param(
last_logon_since.isoformat(), D_TYPES.DateTime, "last_logon_since"
).add_to_param_list(sql_params)
if status_filter:
Param(
status_filter.value, D_TYPES.VarChar, "status_filter"
).add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetLogonWithoutLoginSince",
ExecuteStoredProcedure.SqlParameters(sql_params),
)
if serialize_for_cli:
response = self.serialize_for_cli(
response,
[
"UserLogonID",
"UserID",
"LoginName",
"LastLogon",
"CreateDate",
"Status",
],
)
return response
[docs]
async def change_user_logonname(
self, old_logon_name, new_logon_name, serialize_for_cli=False
):
params_obj = ChangeEndUserLogonName.ParamsObj(
logger=self.logger,
end_user_logon_name=old_logon_name,
ui_source="OnlineBanking",
new_logon_name=new_logon_name,
hq_credentials=self.hq_credentials,
)
response = await ChangeEndUserLogonName.execute(params_obj)
if serialize_for_cli:
return "Failure" if not response.success else "Success"
else:
return response
[docs]
async def get_active_users_since(
self,
active_since_in_days: int,
return_count: int = 100,
page_number: int = 1,
serialize_for_cli=False,
):
offset = (page_number - 1) * return_count
assert offset >= 0, "page_number must be 1 or greater"
sql_params = []
current_date = datetime.now()
time_delta = timedelta(active_since_in_days)
active_since_date = current_date - time_delta
Param(
active_since_date.isoformat(), D_TYPES.DateTime, "active_since_date"
).add_to_param_list(sql_params)
Param(offset, D_TYPES.Int, "offset").add_to_param_list(sql_params)
Param(return_count, D_TYPES.Int, "return_count").add_to_param_list(sql_params)
response = await self.call_hq(
"sdk_GetActiveUsersSince", ExecuteStoredProcedure.SqlParameters(sql_params)
)
if serialize_for_cli:
response = self.serialize_for_cli(
response,
["UserLogonID", "UserID", "LoginName", "LastLogon", "CreateDate"],
)
return response