from __future__ import annotations
import base64
from dataclasses import dataclass
from lxml import objectify # type: ignore
from q2_sdk.tools.utils import to_bool
from enum import Enum
[docs]
class EdvResultType(Enum):
"""Enum of valid ResponseTypes expected by HQ"""
Allow = 0
Disallow = 1
RequireStepUp = 2
TerminateSession = 3
[docs]
@dataclass
class EdvResponse:
edv_result_type: EdvResultType
edv_payload: dict
[docs]
class SupportedAuditActions(Enum):
UpdateUserProfile = "UpdateUserProfile"
UpdateAlerts = "UpdateAlerts"
AddChangeOfAddress = "AddChangeOfAddress"
CreateUserOnline = "CreateUserOnline"
ChangeMlaChallengeCode = "ChangeMlaChallengeCode"
EditSecureAccessCodeTarget = "EditSecureAccessCodeTarget"
AddSecureAccessCodeTarget = "AddSecureAccessCodeTarget"
DeleteSecureAccessCodeTarget = "DeleteSecureAccessCodeTarget"
AddTACTarget = "AddTACTarget"
DeleteTACTarget = "DeleteTACTarget"
UpdateAlertHadeCurrency = "UpdateAlertHadeCurrency"
UpdateAlertPostedTransaction = "UpdateAlertPostedTransaction"
UpdateAlertOnlineActivity = "UpdateAlertOnlineActivity"
SetSecurityAlertUserPreferences = "SetSecurityAlertUserPreferences"
MakeLoginName = "MakeLoginName"
ChangeMlaChallengeCodeEncryptionLevel = "ChangeMlaChallengeCodeEncryptionLevel"
ChangeUserLoginName = "ChangeUserLoginName"
ChangePassword = "ChangePassword"
[docs]
@dataclass
class EdvRequest:
raw: objectify.Element
user_id: int
workstation: str
login_id: int
customer_id: int
group_id: int
login_name: str
audit_action: str
audit_details: str
login_completed_mfa: bool
login_is_suspect: bool
session_id: str
ip_address: str
sso_identifier: str | None
is_prelogon_session: bool
is_csr_assist_session: bool
audit_id: int
[docs]
@staticmethod
def from_hq_request(inp: dict) -> EdvRequest:
"""
.. code-block:: json
{
"RequestorData": [
{
"TransactionId": "d26a3afc-b6d3-46c8-9fb0-2a582c1ecedc",
"UiSource": 1,
"IpAddress": "4.35.205.117",
"UserData": "retail0",
"HydraName": "ip-10-30-0-90.ec2.internal:ip-10-30-0-90.ec2.internal",
"XmlPayload": "PEhRIHJlcXVlc3Q9IkV2ZW50RHJpdmVuVmFsaWRhdGlvbiI+PFdvcmtzdGF0aW9uPmQyNmEzYWZjLWI2ZDMtNDZjOC05ZmIwLTJhNTgyYzFlY2VkYzwvV29ya3N0YXRpb24+PExvZ2luSWQ+MjwvTG9naW5JZD48VXNlcklkPjI8L1VzZXJJZD48Q3VzdG9tZXJJZD4yPC9DdXN0b21lcklkPjxHcm91cElkPjE8L0dyb3VwSWQ+PExvZ2luTmFtZT5yZXRhaWwwPC9Mb2dpbk5hbWU+PEF1ZGl0QWN0aW9uPkNoYW5nZVBhc3N3b3JkPC9BdWRpdEFjdGlvbj48QXVkaXREZXRhaWxzPiZsdDtsb2dpbk5hbWUmZ3Q7cmV0YWlsMCZsdDsvbG9naW5OYW1lJmd0OyZsdDtMb2dpbklkJmd0OzImbHQ7L0xvZ2luSWQmZ3Q7PC9BdWRpdERldGFpbHM+PElzTXVsdGlDaGFubmVsQXV0aGVudGljYXRlZD5GYWxzZTwvSXNNdWx0aUNoYW5uZWxBdXRoZW50aWNhdGVkPjxMb2dpbklzU3VzcGVjdD5GYWxzZTwvTG9naW5Jc1N1c3BlY3Q+PElzUHJlbG9nb25TZXNzaW9uPkZhbHNlPC9Jc1ByZWxvZ29uU2Vzc2lvbj48U2Vzc2lvbklkPmwzbmVhenlhNWl1cmVodHJjejRnYXQyMDwvU2Vzc2lvbklkPjxTc29JZGVudGlmaWVyPmFkODdmMGRiLWYzNDEtNDQ4ZC1hMDI0LWI3ZmNjYTg4MGE5MjwvU3NvSWRlbnRpZmllcj48L0hRPg==",
"UserID": 2,
"CustomerID": 2,
"RenderingType": 0,
"IsContainer": false
}
],
}
"""
xml_payload = base64.b64decode(inp["XmlPayload"])
xml = objectify.fromstring(xml_payload)
user_id = int(xml.UserId.text)
login_id = int(xml.LoginId.text)
customer_id = int(xml.CustomerId.text)
group_id = int(xml.GroupId.text)
login_name = xml.LoginName.text
audit_action = xml.AuditAction.text
audit_details = xml.AuditDetails.text
login_completed_mfa = to_bool(xml.IsMultiChannelAuthenticated.text)
login_is_suspect = to_bool(xml.LoginIsSuspect.text)
session_id = xml.SessionId.text
ip_address = inp["IpAddress"]
sso_identifier = xml.findtext("SsoIdentifier")
workstation = xml.Workstation.text
is_prelogon_session = xml.findtext("IsPrelogonSession")
is_csr_assist_session = xml.findtext("IsCsrAssistSession")
audit_id = xml.findtext("AuditID")
return EdvRequest(
xml,
user_id,
workstation,
login_id,
customer_id,
group_id,
login_name,
audit_action,
audit_details,
login_completed_mfa,
login_is_suspect,
session_id,
ip_address,
sso_identifier,
is_prelogon_session,
is_csr_assist_session,
audit_id,
)