import json
from abc import abstractmethod
from typing import Optional, Dict, Any
from dataclasses import dataclass
from q2_sdk.models.tip.schemas.base import (
TIPRequest,
TIPResponse,
get_execute_request_as_xml_response_from_tip_response,
)
from q2_sdk.models.tip.schemas.tip_request_factory import obj_elem_to_tip_request
from q2_sdk.models.tip.exceptions import (
TIPRequestHandlerException,
TIPAuditEventException,
TIPBaseException,
TIPResponseValidationException,
async_catch_uncaught_exceptions,
catch_uncaught_exceptions,
)
from q2_sdk.core.http_handlers.hq_handler import Q2HqRequestHandler
from q2_sdk.models.tip.variables import TIPAuditEvent, TIPResponseVersion
from q2_sdk.core.opentelemetry.span import Q2Span, SpanKind
from opentelemetry.util.types import AttributeValue
@dataclass
class TrackingParams:
tracking_id: Optional[str] = None
transaction_id: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"TransactionId": self.transaction_id,
"TrackingId": self.tracking_id,
}
def audit_details(
self,
error_msg: Optional[str] = None,
extra_audit_details: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
ad_dict = self.to_dict()
if error_msg:
ad_dict.update({"Error": error_msg})
if extra_audit_details:
ad_dict.update(extra_audit_details)
return ad_dict
@dataclass
class AuditParams:
session_id: str
workstation_id: Optional[str] = None
customer_id: Optional[int] = -1
user_id: Optional[int] = -1
user_logon_id: Optional[int] = -1
host_account_id: Optional[int] = -1
transaction_id: Optional[int] = -1
exception_message: Optional[str] = (
"Audit Parameters have not been set in TIP Request"
)
ui_source: Optional[str] = "OnlineBanking"
def update_from_tip_request(self, tip_request: TIPRequest) -> None:
self.user_id = tip_request.event_user_id
self.customer_id = tip_request.customer_id
self.transaction_id = tip_request.transaction_id
self.session_id = tip_request.session_id
self.ui_source = tip_request.ui_source
self.exception_message = ""
[docs]
class TIPRequestHandler(Q2HqRequestHandler):
"""
TIP (Transaction Interdiction Protocol)
This class is a request handler that can respond to Authorize Transaction
requests from HQ with a decision for how to handle the transaction.
Responses are defined in q2_sdk.models.tip.schemas.base.TIPResponse
"""
_EXTENSION_TYPE = "tip"
_EVENT_NAME_TIP_STATUS = "tip_status"
_EVENT_NAME_TIP_ERROR = "tip_error"
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
# check to make sure that sub-classes have applied the correct
# decorators to abstract methods
self._validate_required_decorators()
self._tip_request: Optional[TIPRequest] = None
self._audit_params: Optional[AuditParams] = None
self._tracking_params: Optional[TrackingParams] = None
# we are returning xml to HQ and not html
self.return_as_html = False
def _validate_required_decorators(self):
"""Ensure subclass methods have required decorators applied"""
# Check generate_tracking_id has catch_uncaught_exceptions
if not getattr(
self.generate_tracking_id, "_has_catch_uncaught_decorator", False
):
raise RuntimeError(
f"{self.__class__.__name__}.generate_tracking_id must be decorated "
f"with @catch_uncaught_exceptions"
)
# Check decision_transaction has async_catch_uncaught_exceptions
if not getattr(
self.decision_transaction, "_has_async_catch_uncaught_decorator", False
):
raise RuntimeError(
f"{self.__class__.__name__}.decision_transaction must be decorated "
f"with @async_catch_uncaught_exceptions"
)
def _get_otel_default_attributes(self) -> Dict[str, AttributeValue]:
"""Returns default attributes for otel instrumentation"""
return {
"extension_type": self._EXTENSION_TYPE,
"extension_name": self.extension_name,
"customer_key": self.hq_credentials.customer_key or "unknown",
}
def _add_tip_status_event(self, tip_response: TIPResponse):
Q2Span.add_event(
self._EVENT_NAME_TIP_STATUS, {"value": tip_response.status.value}
)
def _add_tip_error_event(self, error_type: str):
Q2Span.add_event(self._EVENT_NAME_TIP_ERROR, {"value": error_type})
@property
def tip_request(self) -> TIPRequest:
if self._tip_request is None:
raise TIPRequestHandlerException("self._tip_request has not been set")
return self._tip_request
@tip_request.setter
def tip_request(self, value) -> None:
self._tip_request = value
@property
def audit_params(self) -> AuditParams:
if self._audit_params is None:
raise TIPRequestHandlerException("self._audit_params has not been set")
return self._audit_params
@audit_params.setter
def audit_params(self, value) -> None:
self._audit_params = value
@property
def tracking_params(self) -> TrackingParams:
if self._tracking_params is None:
raise TIPRequestHandlerException("self._tracking_params has not been set")
return self._tracking_params
@tracking_params.setter
def tracking_params(self, value) -> None:
self._tracking_params = value
[docs]
def set_audit_params(self) -> None:
"""
Sets the audit_params attribute to reasonable and best-guess default values.
This is necessary because if the request is invalid, we may not be
able to determine all of the audit parameters.
"""
# set some initial audit parameters so that we can have some values
# to send for audit events no matter where a failure occurs
session_id = ""
workstation_id = None
if self.online_session:
if self.online_session.session_id:
session_id = self.online_session.session_id
if self.online_session.workstation:
workstation_id = self.online_session.workstation
self.audit_params = AuditParams(
session_id=session_id,
workstation_id=workstation_id,
)
[docs]
def set_tracking_params(self) -> None:
"""
Sets the tracking_params attribute to reasonable default values.
This is necessary because if the request is invalid we may not be
able to determine all of the tracking parameters.
"""
# set some initial parameters so that we can have some values
# to send in response to hq where a failure occurs
self.tracking_params = TrackingParams()
@async_catch_uncaught_exceptions
async def _hq_request_to_tip_request(self) -> TIPRequest:
"""
The parent class parses the XML Request from HQ and converts it to a python
object. This method converts that object to a TIPRequest class.
This provides a standard data class that users of this extension will
receive in the decision_transaction method.
"""
self.logger.info("Converting hq request to tip request")
if self.request_name_elem is None:
raise TIPRequestHandlerException(
"self.request_name_elem must be set in order to call _hq_request_to_tip_request"
)
tip_request = obj_elem_to_tip_request(self.request_name_elem)
# once we successfully convert the HQ request to the TIP request
# we need to update our audit and tracking params
self.tracking_params.transaction_id = tip_request.transaction_id
self.audit_params.update_from_tip_request(tip_request)
return tip_request
@catch_uncaught_exceptions
def _validate_tip_response(self, tip_response: TIPResponse) -> TIPResponse:
"""
This validates the TIP response to ensure that when it is
converted to an HQ response, that HQ will be able to handle it.
Some potential checks we could do:
- confirm that the decision is TIPStatus.
- We may also want to check that MFARequired is not returned
if the user has already validated MFA
"""
self.logger.info("Validating tip response")
# we want to ensure that all of the value in the TIPResponse are the
# expected type and none of them are None
tip_response.validate_no_optional()
# we want to ensure that tracking_id and transaction_id in the TIPResponse
# match the tracking_id generated and set on self.tracking_params.tracking_id
# and the transaction_id set on self.tracking_params.transaction_id
if tip_response.tracking_id != self.tracking_params.tracking_id:
raise TIPResponseValidationException(
"tip_response.tracking_id != self.tracking_params.tracking_id "
f"{tip_response.tracking_id} != {self.tracking_params.tracking_id}"
)
if tip_response.transaction_id != self.tracking_params.transaction_id:
raise TIPResponseValidationException(
"tip_response.transaction_id != self.tracking_params.transaction_id "
f"{tip_response.transaction_id} != {self.tracking_params.transaction_id}"
)
# we want to ensure that the TIPResponseVersion in the TIPResponse returned
# form decision_transaction is the expected version
if tip_response.version != self.tip_response_version:
raise TIPResponseValidationException(
"tip_response.version != self.tip_response_version "
f"{tip_response.version} != {self.tip_response_version}"
)
return tip_response
@Q2Span.instrument(kind=SpanKind.SERVER)
@async_catch_uncaught_exceptions
async def _send_audit_event(
self, audit_event: TIPAuditEvent, audit_details: dict
) -> None:
try:
Q2Span.add_event(audit_event.value.short_name, audit_details)
await self.db.audit_record.create(
audit_details=json.dumps(audit_details),
session_id=self.audit_params.session_id,
audit_action_short_name=audit_event.value.short_name,
transaction_id=self.audit_params.transaction_id,
user_id=self.audit_params.user_id,
customer_id=self.audit_params.customer_id,
exception_message=self.audit_params.exception_message,
user_logon_id=self.audit_params.user_logon_id,
host_account_id=self.audit_params.host_account_id,
workstation_id=self.audit_params.workstation_id,
)
except Exception as e:
raise TIPAuditEventException(
f"Uncaught exception {e} when sending audit event: {audit_event.value.short_name}"
)
def _build_exception_response_log_metrics(
self, error_type: str, error_description: str
) -> str:
"""
Builds XML response to HQ from a given error_type and an error_description. This utility
method should be called every time we handle an exception.
:param error_type: str representation of the error class
:param error_description: str description of the error
:return: str representation of exception XML to return to HQ
"""
self.logger.exception(error_description)
tip_response = TIPResponse.from_exception(error_description=error_description)
self._add_tip_status_event(tip_response)
self._add_tip_error_event(error_type)
return tip_response.to_xml()
@async_catch_uncaught_exceptions
async def _handle_exception(
self, e: TIPBaseException, audit_event: TIPAuditEvent
) -> str:
# build the response and log the metrics first so that sending the audit event
# does not impact the duration metric calculation.
response = self._build_exception_response_log_metrics(
error_type=e.__class__.__name__, error_description=e.error_description
)
await self._send_audit_event(
audit_event,
audit_details=self.tracking_params.audit_details(
error_msg=e.error_description
),
)
return response
@property
def tip_response_version(self) -> TIPResponseVersion:
return TIPResponseVersion.V0
[docs]
def wrap_soap_response(self, custom_response: str) -> str:
"""
If the incoming request is wrapped in a soap envelope, this prepares
it for return back to HQ.
This overrides the Q2HqRequestHandler version of this method to
provide an ErrorDescription and set the ErrorCode in the response.
:param custom_response: Response to wrap in an appropriate soap envelope
:returns: SOAP wrapped XML body to be sent to HQ
"""
return get_execute_request_as_xml_response_from_tip_response(
str(self.request_name), custom_response, self.hq_response_attrs
)
[docs]
@Q2Span.instrument(kind=SpanKind.SERVER)
async def q2_post(self, *args, **kwargs) -> str:
"""
Core logic for responding to TIP POST requests. This method does the following:
1. Validates request from HQ and converts the XML to a data class
2. Logs an audit event to indicate whether or not the decision has started
- TIPDecisionStarted if validation succeeded
- TIPRequestValidationFailed if validation failed
- TIPDecisionFailed if some other error occurred prior to decision_transaction being called
3. Calls decision_transaction (abstract method)
4. Logs an audit event
- TIPDecisionSucceeded if the decision succeeded
- TIPDecisionFailed if the decision failed
5. Validates returned value from decision_transaction and forms the XML to send back to HQ
:returns: TIPResponse formatted as an XML string
"""
try:
Q2Span.set_attributes(self._get_otel_default_attributes())
self.set_audit_params()
self.set_tracking_params()
if self._tag_name != "ExecuteRequestAsXml":
raise TIPRequestHandlerException(
f"self._tag_name must be ExecuteRequestAsXml but got self._tag_name: {self._tag_name}"
)
# If parsing the hq request fails, we will not have the
# self.tip_request to use for passing fields into an error response.
# Therefore, we send a TIPRequestValidationFailed audit event so we know
# to not expect the tracking parameters in the audit details.
try:
self.tip_request = await self._hq_request_to_tip_request()
except TIPBaseException as e:
return await self._handle_exception(
e, audit_event=TIPAuditEvent.TIPRequestValidationFailed
)
# If generating the tracking id fails, we will not have the
# self.tracking_id to use for passing fields into an error response.
try:
self.logger.info("Generating tracking id")
self.tracking_params.tracking_id = self.generate_tracking_id(
self.tip_request
)
except TIPBaseException as e:
return await self._handle_exception(
e, audit_event=TIPAuditEvent.TIPDecisionFailed
)
# now that we have the parsed the request and created the tracking_id
# successfully, we can send a TIPDecisionStarted audit event and any
# subsequent failures can send a TIPDecisionFailed event
self.logger.info(
f"Starting TIP decision for transaction id: {self.tracking_params.transaction_id} "
f"and tracking id: {self.tracking_params.tracking_id}"
)
# we keep the _send_audit_event outside of the try/except block where
# the exception block calls self._handle_exception because that method
# sends an audit event. We probably don't want to try to send another
# audit event if the first one fails (although I could be convinced otherwise
# in the event that the first audit event is transient)
await self._send_audit_event(
TIPAuditEvent.TIPDecisionStarted, self.tracking_params.audit_details()
)
# all of these method calls are wrapped with either catch_uncaught_exceptions
# or async_catch_uncaught_exceptions, and they all handle the exception the
# same way. That said, we can put them all in one try block and catch the
# TIPBaseException
# decision_start = time.time()
try:
tip_response = await self._call_decision_transaction(self.tip_request)
tip_response = self._validate_tip_response(tip_response)
self.logger.info("Converting TIP response to XML")
xml_response = tip_response.to_xml()
except TIPBaseException as e:
return await self._handle_exception(
e, audit_event=TIPAuditEvent.TIPDecisionFailed
)
# see comment above for keeping self._send_audit_event outside of try/except block
# if we make it here, we know that the decision succeeded and we can send the
# TIPDecisionSucceeded audit event.
await self._send_audit_event(
audit_event=TIPAuditEvent.TIPDecisionSucceeded,
audit_details=self.tracking_params.audit_details(
extra_audit_details=tip_response.to_audit_details()
),
)
self._add_tip_status_event(tip_response)
return xml_response
# This can be hit if we get an exception while handling another exception
# or if the request being sent does not have self._tag_name = ExecuteRequestAsXml
# For example, when we try to send an audit event for the failure and
# sending the audit event fails.
except TIPBaseException as e:
# note that the only difference between this method and _handle_exception
# is that _handle_exception is also calling _send_audit_event. We don't
# want to call _send_audit_event here in case it was _send_audit_event that
# failed and got us here.
return self._build_exception_response_log_metrics(
error_type=e.__class__.__name__,
error_description=e.error_description,
)
# this should be impossible to hit but if all of our exceptions go
# uncaught, we still need to send something to HQ
except Exception as e:
error_description = f"Uncaught error creating TIP response: {e}"
return self._build_exception_response_log_metrics(
error_type=e.__class__.__name__,
error_description=error_description,
)
@Q2Span.instrument(kind=SpanKind.SERVER)
async def _call_decision_transaction(self, tip_request: TIPRequest) -> TIPResponse:
"""
Method that calls the abstract method decision_transaction. The main reason
for having this wrapper is so that we can instrument this call with OpenTelemetry
to better track decision_transaction calls.
"""
Q2Span.set_attributes(self._get_otel_default_attributes())
return await self.decision_transaction(tip_request)
[docs]
@abstractmethod
@catch_uncaught_exceptions
def generate_tracking_id(self, tip_request: TIPRequest) -> str:
"""
Using a TIPRequest as input, generates a tracking id. This will be
used to set the attribute self.tracking_params.tracking_id that will ultimately be
included the the TIPResponse sent back to HQ.
:param tip_request: TIPRequest
:return: string containing the generated tracking id
"""
raise NotImplementedError
[docs]
@abstractmethod
@async_catch_uncaught_exceptions
async def decision_transaction(self, tip_request: TIPRequest) -> TIPResponse:
"""
Using a TIPRequest as input, determines what the status of a transaction
should be and returns it as a TIPResponse.
:param tip_request: TIPRequest
:return: TIPResponse containing the decision for transaction being authorized
"""
raise NotImplementedError