from argparse import _SubParsersAction
from functools import partial
from typing import List
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.hq_api.q2_api import GetHqVersion as api_GetHqVersion
from q2_sdk.hq.hq_api.wedge_online_banking import GetHqVersion as ob_GetHqVersion
from q2_sdk.tools.decorators import dev_only
from q2_sdk.hq.db.user_logon import UserLogon, UserLogonRow
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class SsoUserLogonRow(RepresentationRowBase):
SSOUserLogonID: IntElement = "SSOUserLogonID"
UserLogonID: IntElement = "UserLogonID"
LoginName: StringElement = "LoginName"
SSOIdentifier: StringElement = "SSOIdentifier"
CreateDate: StringElement = "CreateDate"
ModifiedDate: StringElement = "ModifiedDate"
[docs]
class SsoUserLogon(DbObject):
NAME = "SsoUserLogon"
REPRESENTATION_ROW_CLASS = SsoUserLogonRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_sso_user_logon")
subparser.set_defaults(parser="get")
subparser.set_defaults(
func=partial(self.get_by_login_name, serialize_for_cli=True)
)
subparser.add_argument("login_name", help="Q2_UserLogon.LoginName")
subparser = parser.add_parser("get_sso_user_logon_list")
subparser.set_defaults(parser="get_list")
subparser.set_defaults(func=partial(self.get_list, serialize_for_cli=True))
subparser = parser.add_parser("add_sso_user_logon")
subparser.set_defaults(parser="create")
subparser.set_defaults(func=partial(self.create_by_login_name))
subparser.add_argument("login_name", help="Q2_UserLogon.LoginName")
subparser.add_argument("sso_identifier")
subparser = parser.add_parser("update_sso_user_logon")
subparser.set_defaults(parser="update")
subparser.add_argument(
"user_logon_id", type=int, help="Q2_UserLogon.UserLogonID"
)
subparser.add_argument("sso_identifier")
subparser.set_defaults(func=partial(self.update, serialize_for_cli=True))
subparser = parser.add_parser("remove_sso_user_logon")
subparser.set_defaults(parser="remove")
subparser.add_argument("login_name", help="Q2_UserLogon.LoginName")
subparser.set_defaults(func=partial(self.delete_by_login_name))
[docs]
async def get(
self, sso_identifier: str, serialize_for_cli=False
) -> List[SsoUserLogonRow]:
assert isinstance(sso_identifier, str), "Please supply a valid sso identifier"
response = await self.call_hq(
"Q2_GetSSOIdentifierToUserLogon",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"SSOIdentifier",
sso_identifier,
)
]),
)
if serialize_for_cli:
columns = [
"SSOUserLogonID",
"UserLogonID",
"SSOIdentifier",
"CreateDate",
"ModifiedDate",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
@dev_only
async def get_by_login_name(
self, login_name: str, serialize_for_cli=True
) -> SsoUserLogonRow:
response = await self.call_hq(
"sdk_GetSSOUserLogonByName",
sql_parameters=ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"login_name",
login_name,
)
]),
)
if serialize_for_cli:
response = self.serialize_for_cli(response)
if isinstance(response, list):
response = response[0]
return response
[docs]
@dev_only
async def get_list(self, serialize_for_cli=False) -> List[SsoUserLogonRow]:
response = await self.call_hq(
"sdk_GetSSOUserLogons",
)
if serialize_for_cli:
response = self.serialize_for_cli(response)
return response
[docs]
async def create_by_login_name(self, login_name: str, sso_identifier: str):
ul_obj = UserLogon(self.logger, self.hq_credentials)
user_logon: UserLogonRow = (await ul_obj.get_login_by_name(login_name))[0]
await self.create(user_logon.UserLogonID.pyval, sso_identifier)
[docs]
async def create(
self, user_logon_id: int, sso_identifier: str, serialize_for_cli=False
) -> List[SsoUserLogonRow]:
assert isinstance(user_logon_id, int), "Please supply a valid user logon id"
assert isinstance(sso_identifier, str), "Please supply a SSOIdentifier"
current_date_time = await self._get_current_date_time()
response = await self.call_hq(
"Q2_AddSSOIdentifierToUserLogon",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"UserLogonID",
user_logon_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"SSOIdentifier",
sso_identifier,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.DateTime,
"CreateDate",
current_date_time,
),
]),
)
if serialize_for_cli:
columns = [
"SSOUserLogonID",
"UserLogonID",
"SSOIdentifier",
"CreateDate",
"ModifiedDate",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def update(
self, user_logon_id: int, sso_identifier: str, serialize_for_cli=False
) -> List[SsoUserLogonRow]:
assert isinstance(user_logon_id, int), "Please supply a valid user logon id"
assert isinstance(sso_identifier, str), "Please supply a SSOIdentifier"
current_date_time = await self._get_current_date_time()
response = await self.call_hq(
"Q2_UpdateSSOIdentifierToUserLogon",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"UserLogonID",
user_logon_id,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar,
"SSOIdentifier",
sso_identifier,
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.DateTime,
"ModifiedDate",
current_date_time,
),
]),
)
if serialize_for_cli:
columns = [
"SSOUserLogonID",
"UserLogonID",
"SSOIdentifier",
"CreateDate",
"ModifiedDate",
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def delete_by_login_name(self, login_name: str):
ul_obj = UserLogon(self.logger, self.hq_credentials)
user_logon: UserLogonRow = (await ul_obj.get_login_by_name(login_name))[0]
await self.delete(user_logon.UserLogonID.pyval)
[docs]
async def delete(
self, user_logon_id: int, serialize_for_cli=False
) -> List[SsoUserLogonRow]:
assert isinstance(user_logon_id, int), "Please supply a valid user logon id"
response = await self.call_hq(
"Q2_DeleteSSOIdentifierToUserLogon",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int,
"UserLogonID",
user_logon_id,
)
]),
)
if serialize_for_cli:
response = "success"
return response
async def _get_current_date_time(self):
if self.hq_credentials and self.hq_credentials.auth_token:
get_hq_version = ob_GetHqVersion
else:
get_hq_version = api_GetHqVersion
get_version_obj = get_hq_version.ParamsObj(
self.logger, hq_credentials=self.hq_credentials
)
get_version = await get_hq_version.execute(get_version_obj)
current_date_time = get_version.server_date_time.isoformat()
return current_date_time