Source code for q2_sdk.hq.db.access_code

from argparse import _SubParsersAction
from functools import partial
from typing import Optional

from q2_sdk.hq.hq_api.q2_api import (
    GetEndUserAccessCodeTargets,
    AddEndUserAccessCodeTarget,
    DeleteEndUserAccessCodeTarget,
)
from q2_sdk.core.exceptions import DatabaseDataError
from q2_sdk.hq.db.user_logon import UserLogon
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from .db_object import DbObject
from .audit_record import AuditRecord


[docs] class AccessCode(DbObject): """DbObject for interacting with User Access Codes (used for Multi Factor Authentication)""" VALID_TARGET_TYPES = ["Voice", "Sms", "Email"]
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_access_code_targets") subparser.set_defaults(parser="get_access_code_targets") subparser.set_defaults(func=partial(self.get_targets, serialize_for_cli=True)) subparser.add_argument("logon_name", help="Q2_UserLogon.LoginName") subparser.add_argument("--exclude-email", action="store_true", default=False) subparser.add_argument("--exclude-phone", action="store_true", default=False) subparser.add_argument("--exclude-sms", action="store_true", default=False) subparser = parser.add_parser("add_access_code_targets") subparser.set_defaults(parser="add_access_code_targets") subparser.set_defaults( func=partial(self.create_targets, serialize_for_cli=True) ) subparser.add_argument("logon_name", type=str, help="Q2_UserLogon.LoginName") subparser.add_argument("target_address", help="The target value") subparser.add_argument( "target_type", type=str, help="Email, Sms, Voice", default="Email" ) subparser.add_argument("-t", "--target-country-iso-code-a3", default="USA")
[docs] async def get_targets( self, logon_name: str, exclude_email=False, exclude_phone=False, exclude_sms=False, serialize_for_cli=False, ) -> GetEndUserAccessCodeTargets.HqResponse: """Convenience function for working with Q2Api.GetEndUserAccessCodeTargets HQ Endpoint""" params_obj = GetEndUserAccessCodeTargets.ParamsObj( self.logger, logon_name, get_email_targets=not exclude_email, get_phone_targets=not exclude_phone, get_sms_targets=not exclude_sms, hq_credentials=self.hq_credentials, ) response = await GetEndUserAccessCodeTargets.execute(params_obj) if serialize_for_cli: columns = ["TargetID", "UserID", "ShortName", "TargetAddress"] response = [ x for x in response.result_node.Data.DalAccessCodeTargetView.Q2_AccessCodeTargetView ] response = self.serialize_for_cli(response, columns) return response
[docs] async def create_targets( self, logon_name: str, target_address: str, target_type: str, target_country_iso_code_a3: str = "USA", serialize_for_cli: bool = False, ) -> GetEndUserAccessCodeTargets.HqResponse: """ :param: logon_name: The end users login name :param: target_address: the value to be used as the target :param: target_type: what type of target is this. Valid types: Voice, Sms, Email :param: target_country_iso_code_a3: ISO A3 country code if the Sms/Voice number is international :param: serialize_for_cli: boolean to control whether the output is displayed to the CLI """ ui_source = "OnlineBanking" if target_type.lower() not in [x.lower() for x in self.VALID_TARGET_TYPES]: raise DatabaseDataError( f"Invalid target type. Valid target types: {self.VALID_TARGET_TYPES}" ) params_obj = AddEndUserAccessCodeTarget.ParamsObj( self.logger, logon_name, ui_source, target_address, target_type, target_country_iso_code_a3=target_country_iso_code_a3, hq_credentials=self.hq_credentials, ) response = await AddEndUserAccessCodeTarget.execute(params_obj) if serialize_for_cli: columns = [ "AccessCodeTargetTypeID", "DisplayName", "NotificationTypeID", "ShortName", "TargetAddress", "TargetID", "TargetTypeDisplayName", "TargetTypeShortName", "UnmaskedTargetAddress", "UserID", ] parsed_response = response.parse_sproc_return( specific_table="Q2_AccessCodeTargetView" ) response = self.serialize_for_cli(parsed_response, columns) return response
[docs] async def delete_targets( self, user_obj: OnlineUser, session_obj: OnlineSession, target_id: Optional[int] = None, target_address: Optional[str] = None, target_type: Optional[str] = None, ): assert any([ target_id, target_address, ]), "Must provide either target_id or target_address" if target_address: assert all([ target_address, target_type, ]), "Must provide both target_address and target_type" ui_source = "OnlineBanking" if not user_obj.login_name: userinfo_po = UserLogon(self.logger) logon_list = await userinfo_po.get_login_by_id(user_obj.user_id) logon_name = logon_list[0]["LoginName"] else: logon_name = user_obj.login_name if not target_type or not target_address: result = await self.get_targets(logon_name) found = False for each in ( result.result_node.Data.DalAccessCodeTargetView.Q2_AccessCodeTargetView ): if each["TargetID"].pyval == target_id: target_address = each["UnmaskedTargetAddress"].text target_type = each["ShortName"].text found = True break if not found: return "Could not find AccessCodeTarget that matched request" response = await self.delete_accesscode_target( logon_name, ui_source, target_address, target_type, self.hq_credentials ) if response.success: audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( f"AccessCodeTarget deleted by SDk, target id: {target_id}, target address: {target_address}, target type: {target_type}", session_id=session_obj.session_id, workstation_id=session_obj.workstation, customer_id=user_obj.customer_id, user_id=user_obj.user_id, user_logon_id=user_obj.user_logon_id, ) return "deleted AccessCodeTarget successfully" else: return "Could not find AccessCodeTarget that matched request"
[docs] async def delete_accesscode_target( self, logon_name, ui_source, target_address, target_type, hq_credentials=None ): params_obj = DeleteEndUserAccessCodeTarget.ParamsObj( self.logger, logon_name, ui_source, target_address, target_type, hq_credentials=hq_credentials, ) return await DeleteEndUserAccessCodeTarget.execute(params_obj)