Source code for q2_sdk.hq.db.admin_user_logon

import re
import datetime
from argparse import _SubParsersAction
from dataclasses import dataclass
from functools import partial
from typing import List, Optional
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.password_status import PasswordStatus
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] @dataclass class UpdateAdminParameters: user_logon_id: int status_id: int invalid_logins: int status_reason: Optional[str] = None last_login: Optional[str] = None last_failed: Optional[str] = None
[docs] class AdminUserLogonRow(RepresentationRowBase): UserLogonID: IntElement = "UserLogonID" UserID: IntElement = "UserID" LoginName: StringElement = "LoginName" UserPassword: StringElement = "UserPassword" UISourceID: IntElement = "UISourceID" LastChange: StringElement = "LastChange" LastLogon: StringElement = "LastLogon" Status: IntElement = "Status" StatusReason: StringElement = "StatusReason" HostUser: StringElement = "HostUser" LastFailed: StringElement = "LastFailed" CreateDate: StringElement = "CreateDate" NumInvalidAttempts: IntElement = "NumInvalidAttempts" GroupID: IntElement = "GroupID" DeletedDate: StringElement = "DeletedDate" AuthenticationTypeID: IntElement = "AuthenticationTypeID"
[docs] class AdminUserLogon(DbObject): # GET_BY_NAME_KEY = "column in the db response" NAME = "AdminUserLogon" REPRESENTATION_ROW_CLASS = AdminUserLogonRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("disable_admin_user") subparser.set_defaults(parser="disable") subparser.set_defaults(func=partial(self.disable)) subparser.add_argument("admin_login_name", help="admin.Q2_UserLogon.LoginName") subparser = parser.add_parser("enable_admin_user") subparser.set_defaults(parser="enable") subparser.set_defaults(func=partial(self.enable)) subparser.add_argument("admin_login_name", help="admin.Q2_UserLogon.LoginName") subparser = parser.add_parser("get_admin_user_logon") subparser.set_defaults(parser="get_admin_user_logon") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument("admin_login_name", help="admin.Q2_UserLogon.LoginName")
[docs] async def get( self, admin_login_name: str, serialize_for_cli=False ) -> List[AdminUserLogonRow]: stored_proc_name = "sdk_GetAdminLoginByName" parameters = [ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "logon_name", admin_login_name ) ] response = await self.call_hq( stored_proc_name, ExecuteStoredProcedure.SqlParameters(parameters) ) if serialize_for_cli: columns = [ "UserLogonID", "LoginName", "StatusReason", "LastFailed", "CreateDate", "NumInvalidAttempts", "GroupID", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def disable( self, admin_login_name: str, status_reason: str = "Disabled via SDK" ): admin_logon = await self.get(admin_login_name) if not admin_logon: raise DatabaseDataError("Login not found") return await self._get_admin_data_for_update(admin_logon[0], status_reason)
[docs] async def enable( self, admin_login_name: str, status_reason: str = "Enabled via SDK" ): admin_logon = await self.get(admin_login_name) if not admin_logon: raise DatabaseDataError("Login not found") return await self._get_admin_data_for_update( admin_logon[0], status_reason, is_disable=False )
async def _get_admin_data_for_update( self, admin_row: AdminUserLogonRow, status_reason, is_disable=True ): """ Need to satisfy the SQL where clause (DATEDIFF(MINUTE,ul.LastLogon,@LastLogon) > 1 OR @LastLogon IS NULL OR ul.NumInvalidAttempts > 0 OR ul.LastLogon IS NULL ) HQ does not send NULL fields to SQL Server """ status_name = "Disabled" if is_disable else "Normal" status_row = await PasswordStatus( self.logger, hq_credentials=self.hq_credentials ).get_by_name(status_name) status_id = int(status_row.StatusID) invalid_logins = int(admin_row.findtext("NumInvalidAttempts", 0)) login_id = int(admin_row.UserLogonID) now = datetime.datetime.now().isoformat() last_login = self._clean_timestamp(admin_row.findtext("LastLogon", now)) if invalid_logins == 0: last_login_obj = datetime.datetime.strptime( last_login, "%Y-%m-%dT%H:%M:%S.%f" ) last_login = last_login_obj + datetime.timedelta( minutes=2 ) # Needs to be greater than 1 last_login = last_login.isoformat() else: last_login = self._clean_timestamp(admin_row.findtext("LastLogon", now)) last_failed = self._clean_timestamp(admin_row.findtext("LastFailed", now)) update_object = UpdateAdminParameters( login_id, status_id, int(invalid_logins), status_reason=status_reason, last_login=last_login, last_failed=last_failed, ) stored_proc_parameters = self._build_update_parameters(update_object) response = await self.call_hq( "[admin].Q2_UserLogonUpdate", ExecuteStoredProcedure.SqlParameters(stored_proc_parameters), ) return response @staticmethod def _clean_timestamp(timestamp: str): regex = re.compile(r"(\+|-)\d{2}:\d{2}") return re.sub(regex, "", timestamp) @staticmethod def _build_update_parameters(update_login: UpdateAdminParameters): parameters = [] Param(update_login.user_logon_id, D_TYPES.Int, "UserLogonID").add_to_param_list( parameters ) Param(update_login.status_id, D_TYPES.Int, "Status").add_to_param_list( parameters ) Param( update_login.status_reason, D_TYPES.VarChar, "StatusReason" ).add_to_param_list(parameters) Param( update_login.invalid_logins, D_TYPES.Int, "NumInvalidAttempts" ).add_to_param_list(parameters) Param(update_login.last_login, D_TYPES.DateTime, "LastLogon").add_to_param_list( parameters ) Param( update_login.last_failed, D_TYPES.DateTime, "LastFailed" ).add_to_param_list(parameters) return parameters