from argparse import _SubParsersAction
from functools import partial
from typing import Optional
from q2_sdk.hq.hq_api.q2_api import (
GetEndUserAccessCodeTargets,
AddEndUserAccessCodeTarget,
DeleteEndUserAccessCodeTarget,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.user_logon import UserLogon
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from .db_object import DbObject
from .audit_record import AuditRecord
[docs]
class AccessCode(DbObject):
"""DbObject for interacting with User Access Codes (used for Multi Factor Authentication)"""
VALID_TARGET_TYPES = ["Voice", "Sms", "Email"]
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_access_code_targets")
subparser.set_defaults(parser="get_access_code_targets")
subparser.set_defaults(func=partial(self.get_targets, serialize_for_cli=True))
subparser.add_argument("logon_name", help="Q2_UserLogon.LoginName")
subparser.add_argument("--exclude-email", action="store_true", default=False)
subparser.add_argument("--exclude-phone", action="store_true", default=False)
subparser.add_argument("--exclude-sms", action="store_true", default=False)
subparser = parser.add_parser("add_access_code_targets")
subparser.set_defaults(parser="add_access_code_targets")
subparser.set_defaults(
func=partial(self.create_targets, serialize_for_cli=True)
)
subparser.add_argument("logon_name", type=str, help="Q2_UserLogon.LoginName")
subparser.add_argument("target_address", help="The target value")
subparser.add_argument(
"target_type", type=str, help="Email, Sms, Voice", default="Email"
)
subparser.add_argument("-t", "--target-country-iso-code-a3", default="USA")
[docs]
async def get_targets(
self,
logon_name: str,
exclude_email=False,
exclude_phone=False,
exclude_sms=False,
serialize_for_cli=False,
) -> GetEndUserAccessCodeTargets.HqResponse:
"""Convenience function for working with Q2Api.GetEndUserAccessCodeTargets HQ Endpoint"""
params_obj = GetEndUserAccessCodeTargets.ParamsObj(
self.logger,
logon_name,
get_email_targets=not exclude_email,
get_phone_targets=not exclude_phone,
get_sms_targets=not exclude_sms,
hq_credentials=self.hq_credentials,
)
response = await GetEndUserAccessCodeTargets.execute(params_obj)
if serialize_for_cli:
columns = ["TargetID", "UserID", "ShortName", "TargetAddress"]
response = [
x
for x in response.result_node.Data.DalAccessCodeTargetView.Q2_AccessCodeTargetView
]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def create_targets(
self,
logon_name: str,
target_address: str,
target_type: str,
target_country_iso_code_a3: str = "USA",
serialize_for_cli: bool = False,
) -> GetEndUserAccessCodeTargets.HqResponse:
"""
:param: logon_name: The end users login name
:param: target_address: the value to be used as the target
:param: target_type: what type of target is this. Valid types: Voice, Sms, Email
:param: target_country_iso_code_a3: ISO A3 country code if the Sms/Voice number is international
:param: serialize_for_cli: boolean to control whether the output is displayed to the CLI
"""
ui_source = "OnlineBanking"
if target_type.lower() not in [x.lower() for x in self.VALID_TARGET_TYPES]:
raise DatabaseDataError(
f"Invalid target type. Valid target types: {self.VALID_TARGET_TYPES}"
)
params_obj = AddEndUserAccessCodeTarget.ParamsObj(
self.logger,
logon_name,
ui_source,
target_address,
target_type,
target_country_iso_code_a3=target_country_iso_code_a3,
hq_credentials=self.hq_credentials,
)
response = await AddEndUserAccessCodeTarget.execute(params_obj)
if serialize_for_cli:
columns = [
"AccessCodeTargetTypeID",
"DisplayName",
"NotificationTypeID",
"ShortName",
"TargetAddress",
"TargetID",
"TargetTypeDisplayName",
"TargetTypeShortName",
"UnmaskedTargetAddress",
"UserID",
]
parsed_response = response.parse_sproc_return(
specific_table="Q2_AccessCodeTargetView"
)
response = self.serialize_for_cli(parsed_response, columns)
return response
[docs]
async def delete_targets(
self,
user_obj: OnlineUser,
session_obj: OnlineSession,
target_id: Optional[int] = None,
target_address: Optional[str] = None,
target_type: Optional[str] = None,
):
assert any([
target_id,
target_address,
]), "Must provide either target_id or target_address"
if target_address:
assert all([
target_address,
target_type,
]), "Must provide both target_address and target_type"
ui_source = "OnlineBanking"
if not user_obj.login_name:
userinfo_po = UserLogon(self.logger)
logon_list = await userinfo_po.get_login_by_id(user_obj.user_id)
logon_name = logon_list[0]["LoginName"]
else:
logon_name = user_obj.login_name
if not target_type or not target_address:
result = await self.get_targets(logon_name)
found = False
for each in (
result.result_node.Data.DalAccessCodeTargetView.Q2_AccessCodeTargetView
):
if each["TargetID"].pyval == target_id:
target_address = each["UnmaskedTargetAddress"].text
target_type = each["ShortName"].text
found = True
break
if not found:
return "Could not find AccessCodeTarget that matched request"
response = await self.delete_accesscode_target(
logon_name, ui_source, target_address, target_type, self.hq_credentials
)
if response.success:
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"AccessCodeTarget deleted by SDk, target id: {target_id}, target address: {target_address}, target type: {target_type}",
session_id=session_obj.session_id,
workstation_id=session_obj.workstation,
customer_id=user_obj.customer_id,
user_id=user_obj.user_id,
user_logon_id=user_obj.user_logon_id,
)
return "deleted AccessCodeTarget successfully"
else:
return "Could not find AccessCodeTarget that matched request"
[docs]
async def delete_accesscode_target(
self, logon_name, ui_source, target_address, target_type, hq_credentials=None
):
params_obj = DeleteEndUserAccessCodeTarget.ParamsObj(
self.logger,
logon_name,
ui_source,
target_address,
target_type,
hq_credentials=hq_credentials,
)
return await DeleteEndUserAccessCodeTarget.execute(params_obj)