import re
import datetime
from argparse import _SubParsersAction
from dataclasses import dataclass
from functools import partial
from typing import List, Optional
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.password_status import PasswordStatus
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
D_TYPES = ExecuteStoredProcedure.DataType
[docs]
@dataclass
class UpdateAdminParameters:
user_logon_id: int
status_id: int
invalid_logins: int
status_reason: Optional[str] = None
last_login: Optional[str] = None
last_failed: Optional[str] = None
[docs]
class AdminUserLogonRow(RepresentationRowBase):
UserLogonID: IntElement = "UserLogonID"
UserID: IntElement = "UserID"
LoginName: StringElement = "LoginName"
UserPassword: StringElement = "UserPassword"
UISourceID: IntElement = "UISourceID"
LastChange: StringElement = "LastChange"
LastLogon: StringElement = "LastLogon"
Status: IntElement = "Status"
StatusReason: StringElement = "StatusReason"
HostUser: StringElement = "HostUser"
LastFailed: StringElement = "LastFailed"
CreateDate: StringElement = "CreateDate"
NumInvalidAttempts: IntElement = "NumInvalidAttempts"
GroupID: IntElement = "GroupID"
DeletedDate: StringElement = "DeletedDate"
AuthenticationTypeID: IntElement = "AuthenticationTypeID"
[docs]
class AdminUserLogon(DbObject):
# GET_BY_NAME_KEY = "column in the db response"
NAME = "AdminUserLogon"
REPRESENTATION_ROW_CLASS = AdminUserLogonRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("disable_admin_user")
subparser.set_defaults(parser="disable")
subparser.set_defaults(func=partial(self.disable))
subparser.add_argument("admin_login_name", help="admin.Q2_UserLogon.LoginName")
subparser = parser.add_parser("enable_admin_user")
subparser.set_defaults(parser="enable")
subparser.set_defaults(func=partial(self.enable))
subparser.add_argument("admin_login_name", help="admin.Q2_UserLogon.LoginName")
subparser = parser.add_parser("get_admin_user_logon")
subparser.set_defaults(parser="get_admin_user_logon")
subparser.set_defaults(func=partial(self.get, serialize_for_cli=True))
subparser.add_argument("admin_login_name", help="admin.Q2_UserLogon.LoginName")
[docs]
async def get(
self, admin_login_name: str, serialize_for_cli=False
) -> List[AdminUserLogonRow]:
stored_proc_name = "sdk_GetAdminLoginByName"
parameters = [
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "logon_name", admin_login_name
)
]
response = await self.call_hq(
stored_proc_name, ExecuteStoredProcedure.SqlParameters(parameters)
)
if serialize_for_cli:
columns = [
"UserLogonID",
"LoginName",
"StatusReason",
"LastFailed",
"CreateDate",
"NumInvalidAttempts",
"GroupID",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def disable(
self, admin_login_name: str, status_reason: str = "Disabled via SDK"
):
admin_logon = await self.get(admin_login_name)
if not admin_logon:
raise DatabaseDataError("Login not found")
return await self._get_admin_data_for_update(admin_logon[0], status_reason)
[docs]
async def enable(
self, admin_login_name: str, status_reason: str = "Enabled via SDK"
):
admin_logon = await self.get(admin_login_name)
if not admin_logon:
raise DatabaseDataError("Login not found")
return await self._get_admin_data_for_update(
admin_logon[0], status_reason, is_disable=False
)
async def _get_admin_data_for_update(
self, admin_row: AdminUserLogonRow, status_reason, is_disable=True
):
"""
Need to satisfy the SQL where clause
(DATEDIFF(MINUTE,ul.LastLogon,@LastLogon) > 1
OR @LastLogon IS NULL OR ul.NumInvalidAttempts > 0
OR ul.LastLogon IS NULL )
HQ does not send NULL fields to SQL Server
"""
status_name = "Disabled" if is_disable else "Normal"
status_row = await PasswordStatus(
self.logger, hq_credentials=self.hq_credentials
).get_by_name(status_name)
status_id = int(status_row.StatusID)
invalid_logins = int(admin_row.findtext("NumInvalidAttempts", 0))
login_id = int(admin_row.UserLogonID)
now = datetime.datetime.now().isoformat()
last_login = self._clean_timestamp(admin_row.findtext("LastLogon", now))
if invalid_logins == 0:
last_login_obj = datetime.datetime.strptime(
last_login, "%Y-%m-%dT%H:%M:%S.%f"
)
last_login = last_login_obj + datetime.timedelta(
minutes=2
) # Needs to be greater than 1
last_login = last_login.isoformat()
else:
last_login = self._clean_timestamp(admin_row.findtext("LastLogon", now))
last_failed = self._clean_timestamp(admin_row.findtext("LastFailed", now))
update_object = UpdateAdminParameters(
login_id,
status_id,
int(invalid_logins),
status_reason=status_reason,
last_login=last_login,
last_failed=last_failed,
)
stored_proc_parameters = self._build_update_parameters(update_object)
response = await self.call_hq(
"[admin].Q2_UserLogonUpdate",
ExecuteStoredProcedure.SqlParameters(stored_proc_parameters),
)
return response
@staticmethod
def _clean_timestamp(timestamp: str):
regex = re.compile(r"(\+|-)\d{2}:\d{2}")
return re.sub(regex, "", timestamp)
@staticmethod
def _build_update_parameters(update_login: UpdateAdminParameters):
parameters = []
Param(update_login.user_logon_id, D_TYPES.Int, "UserLogonID").add_to_param_list(
parameters
)
Param(update_login.status_id, D_TYPES.Int, "Status").add_to_param_list(
parameters
)
Param(
update_login.status_reason, D_TYPES.VarChar, "StatusReason"
).add_to_param_list(parameters)
Param(
update_login.invalid_logins, D_TYPES.Int, "NumInvalidAttempts"
).add_to_param_list(parameters)
Param(update_login.last_login, D_TYPES.DateTime, "LastLogon").add_to_param_list(
parameters
)
Param(
update_login.last_failed, D_TYPES.DateTime, "LastFailed"
).add_to_param_list(parameters)
return parameters