Source code for q2_sdk.hq.db.sso_user_logon

from argparse import _SubParsersAction
from functools import partial
from typing import List

from lxml.objectify import IntElement, StringElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.hq_api.q2_api import GetHqVersion as api_GetHqVersion
from q2_sdk.hq.hq_api.wedge_online_banking import GetHqVersion as ob_GetHqVersion
from q2_sdk.tools.decorators import dev_only
from q2_sdk.hq.db.user_logon import UserLogon, UserLogonRow

from .db_object import DbObject
from .representation_row_base import RepresentationRowBase


[docs] class SsoUserLogonRow(RepresentationRowBase): SSOUserLogonID: IntElement = "SSOUserLogonID" UserLogonID: IntElement = "UserLogonID" LoginName: StringElement = "LoginName" SSOIdentifier: StringElement = "SSOIdentifier" CreateDate: StringElement = "CreateDate" ModifiedDate: StringElement = "ModifiedDate"
[docs] class SsoUserLogon(DbObject): NAME = "SsoUserLogon" REPRESENTATION_ROW_CLASS = SsoUserLogonRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_sso_user_logon") subparser.set_defaults(parser="get") subparser.set_defaults( func=partial(self.get_by_login_name, serialize_for_cli=True) ) subparser.add_argument("login_name", help="Q2_UserLogon.LoginName") subparser = parser.add_parser("get_sso_user_logon_list") subparser.set_defaults(parser="get_list") subparser.set_defaults(func=partial(self.get_list, serialize_for_cli=True)) subparser = parser.add_parser("add_sso_user_logon") subparser.set_defaults(parser="create") subparser.set_defaults(func=partial(self.create_by_login_name)) subparser.add_argument("login_name", help="Q2_UserLogon.LoginName") subparser.add_argument("sso_identifier") subparser = parser.add_parser("update_sso_user_logon") subparser.set_defaults(parser="update") subparser.add_argument( "user_logon_id", type=int, help="Q2_UserLogon.UserLogonID" ) subparser.add_argument("sso_identifier") subparser.set_defaults(func=partial(self.update, serialize_for_cli=True)) subparser = parser.add_parser("remove_sso_user_logon") subparser.set_defaults(parser="remove") subparser.add_argument("login_name", help="Q2_UserLogon.LoginName") subparser.set_defaults(func=partial(self.delete_by_login_name))
[docs] async def get( self, sso_identifier: str, serialize_for_cli=False ) -> List[SsoUserLogonRow]: assert isinstance(sso_identifier, str), "Please supply a valid sso identifier" response = await self.call_hq( "Q2_GetSSOIdentifierToUserLogon", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "SSOIdentifier", sso_identifier, ) ]), ) if serialize_for_cli: columns = [ "SSOUserLogonID", "UserLogonID", "SSOIdentifier", "CreateDate", "ModifiedDate", ] response = self.serialize_for_cli(response, columns) return response
[docs] @dev_only async def get_by_login_name( self, login_name: str, serialize_for_cli=True ) -> SsoUserLogonRow: response = await self.call_hq( "sdk_GetSSOUserLogonByName", sql_parameters=ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "login_name", login_name, ) ]), ) if serialize_for_cli: response = self.serialize_for_cli(response) if isinstance(response, list): response = response[0] return response
[docs] @dev_only async def get_list(self, serialize_for_cli=False) -> List[SsoUserLogonRow]: response = await self.call_hq( "sdk_GetSSOUserLogons", ) if serialize_for_cli: response = self.serialize_for_cli(response) return response
[docs] async def create_by_login_name(self, login_name: str, sso_identifier: str): ul_obj = UserLogon(self.logger, self.hq_credentials) user_logon: UserLogonRow = (await ul_obj.get_login_by_name(login_name))[0] await self.create(user_logon.UserLogonID.pyval, sso_identifier)
[docs] async def create( self, user_logon_id: int, sso_identifier: str, serialize_for_cli=False ) -> List[SsoUserLogonRow]: assert isinstance(user_logon_id, int), "Please supply a valid user logon id" assert isinstance(sso_identifier, str), "Please supply a SSOIdentifier" current_date_time = await self._get_current_date_time() response = await self.call_hq( "Q2_AddSSOIdentifierToUserLogon", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "UserLogonID", user_logon_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "SSOIdentifier", sso_identifier, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.DateTime, "CreateDate", current_date_time, ), ]), ) if serialize_for_cli: columns = [ "SSOUserLogonID", "UserLogonID", "SSOIdentifier", "CreateDate", "ModifiedDate", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def update( self, user_logon_id: int, sso_identifier: str, serialize_for_cli=False ) -> List[SsoUserLogonRow]: assert isinstance(user_logon_id, int), "Please supply a valid user logon id" assert isinstance(sso_identifier, str), "Please supply a SSOIdentifier" current_date_time = await self._get_current_date_time() response = await self.call_hq( "Q2_UpdateSSOIdentifierToUserLogon", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "UserLogonID", user_logon_id, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "SSOIdentifier", sso_identifier, ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.DateTime, "ModifiedDate", current_date_time, ), ]), ) if serialize_for_cli: columns = [ "SSOUserLogonID", "UserLogonID", "SSOIdentifier", "CreateDate", "ModifiedDate", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def delete_by_login_name(self, login_name: str): ul_obj = UserLogon(self.logger, self.hq_credentials) user_logon: UserLogonRow = (await ul_obj.get_login_by_name(login_name))[0] await self.delete(user_logon.UserLogonID.pyval)
[docs] async def delete( self, user_logon_id: int, serialize_for_cli=False ) -> List[SsoUserLogonRow]: assert isinstance(user_logon_id, int), "Please supply a valid user logon id" response = await self.call_hq( "Q2_DeleteSSOIdentifierToUserLogon", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "UserLogonID", user_logon_id, ) ]), ) if serialize_for_cli: response = "success" return response
async def _get_current_date_time(self): if self.hq_credentials and self.hq_credentials.auth_token: get_hq_version = ob_GetHqVersion else: get_hq_version = api_GetHqVersion get_version_obj = get_hq_version.ParamsObj( self.logger, hq_credentials=self.hq_credentials ) get_version = await get_hq_version.execute(get_version_obj) current_date_time = get_version.server_date_time.isoformat() return current_date_time