Source code for q2_sdk.core.http_handlers.tip_handler

import json

from abc import abstractmethod
from typing import Optional, Dict, Any
from dataclasses import dataclass
from q2_sdk.models.tip.schemas.base import (
    TIPRequest,
    TIPResponse,
    get_execute_request_as_xml_response_from_tip_response,
)
from q2_sdk.models.tip.schemas.tip_request_factory import obj_elem_to_tip_request
from q2_sdk.models.tip.exceptions import (
    TIPRequestHandlerException,
    TIPAuditEventException,
    TIPBaseException,
    TIPResponseValidationException,
    async_catch_uncaught_exceptions,
    catch_uncaught_exceptions,
)
from q2_sdk.core.http_handlers.hq_handler import Q2HqRequestHandler
from q2_sdk.models.tip.variables import TIPAuditEvent, TIPResponseVersion
from q2_sdk.core.opentelemetry.span import Q2Span, SpanKind
from opentelemetry.util.types import AttributeValue


@dataclass
class TrackingParams:
    tracking_id: Optional[str] = None
    transaction_id: Optional[int] = None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "TransactionId": self.transaction_id,
            "TrackingId": self.tracking_id,
        }

    def audit_details(
        self,
        error_msg: Optional[str] = None,
        extra_audit_details: Optional[Dict[str, str]] = None,
    ) -> Dict[str, str]:
        ad_dict = self.to_dict()
        if error_msg:
            ad_dict.update({"Error": error_msg})

        if extra_audit_details:
            ad_dict.update(extra_audit_details)

        return ad_dict


@dataclass
class AuditParams:
    session_id: str
    workstation_id: Optional[str] = None
    customer_id: Optional[int] = -1
    user_id: Optional[int] = -1
    user_logon_id: Optional[int] = -1
    host_account_id: Optional[int] = -1
    transaction_id: Optional[int] = -1
    exception_message: Optional[str] = (
        "Audit Parameters have not been set in TIP Request"
    )
    ui_source: Optional[str] = "OnlineBanking"

    def update_from_tip_request(self, tip_request: TIPRequest) -> None:
        self.user_id = tip_request.event_user_id
        self.customer_id = tip_request.customer_id
        self.transaction_id = tip_request.transaction_id
        self.session_id = tip_request.session_id
        self.ui_source = tip_request.ui_source
        self.exception_message = ""


[docs] class TIPRequestHandler(Q2HqRequestHandler): """ TIP (Transaction Interdiction Protocol) This class is a request handler that can respond to Authorize Transaction requests from HQ with a decision for how to handle the transaction. Responses are defined in q2_sdk.models.tip.schemas.base.TIPResponse """ _EXTENSION_TYPE = "tip" _EVENT_NAME_TIP_STATUS = "tip_status" _EVENT_NAME_TIP_ERROR = "tip_error" def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) # check to make sure that sub-classes have applied the correct # decorators to abstract methods self._validate_required_decorators() self._tip_request: Optional[TIPRequest] = None self._audit_params: Optional[AuditParams] = None self._tracking_params: Optional[TrackingParams] = None # we are returning xml to HQ and not html self.return_as_html = False def _validate_required_decorators(self): """Ensure subclass methods have required decorators applied""" # Check generate_tracking_id has catch_uncaught_exceptions if not getattr( self.generate_tracking_id, "_has_catch_uncaught_decorator", False ): raise RuntimeError( f"{self.__class__.__name__}.generate_tracking_id must be decorated " f"with @catch_uncaught_exceptions" ) # Check decision_transaction has async_catch_uncaught_exceptions if not getattr( self.decision_transaction, "_has_async_catch_uncaught_decorator", False ): raise RuntimeError( f"{self.__class__.__name__}.decision_transaction must be decorated " f"with @async_catch_uncaught_exceptions" ) def _get_otel_default_attributes(self) -> Dict[str, AttributeValue]: """Returns default attributes for otel instrumentation""" return { "extension_type": self._EXTENSION_TYPE, "extension_name": self.extension_name, "customer_key": self.hq_credentials.customer_key or "unknown", } def _add_tip_status_event(self, tip_response: TIPResponse): Q2Span.add_event( self._EVENT_NAME_TIP_STATUS, {"value": tip_response.status.value} ) def _add_tip_error_event(self, error_type: str): Q2Span.add_event(self._EVENT_NAME_TIP_ERROR, {"value": error_type}) @property def tip_request(self) -> TIPRequest: if self._tip_request is None: raise TIPRequestHandlerException("self._tip_request has not been set") return self._tip_request @tip_request.setter def tip_request(self, value) -> None: self._tip_request = value @property def audit_params(self) -> AuditParams: if self._audit_params is None: raise TIPRequestHandlerException("self._audit_params has not been set") return self._audit_params @audit_params.setter def audit_params(self, value) -> None: self._audit_params = value @property def tracking_params(self) -> TrackingParams: if self._tracking_params is None: raise TIPRequestHandlerException("self._tracking_params has not been set") return self._tracking_params @tracking_params.setter def tracking_params(self, value) -> None: self._tracking_params = value
[docs] def set_audit_params(self) -> None: """ Sets the audit_params attribute to reasonable and best-guess default values. This is necessary because if the request is invalid, we may not be able to determine all of the audit parameters. """ # set some initial audit parameters so that we can have some values # to send for audit events no matter where a failure occurs session_id = "" workstation_id = None if self.online_session: if self.online_session.session_id: session_id = self.online_session.session_id if self.online_session.workstation: workstation_id = self.online_session.workstation self.audit_params = AuditParams( session_id=session_id, workstation_id=workstation_id, )
[docs] def set_tracking_params(self) -> None: """ Sets the tracking_params attribute to reasonable default values. This is necessary because if the request is invalid we may not be able to determine all of the tracking parameters. """ # set some initial parameters so that we can have some values # to send in response to hq where a failure occurs self.tracking_params = TrackingParams()
@async_catch_uncaught_exceptions async def _hq_request_to_tip_request(self) -> TIPRequest: """ The parent class parses the XML Request from HQ and converts it to a python object. This method converts that object to a TIPRequest class. This provides a standard data class that users of this extension will receive in the decision_transaction method. """ self.logger.info("Converting hq request to tip request") if self.request_name_elem is None: raise TIPRequestHandlerException( "self.request_name_elem must be set in order to call _hq_request_to_tip_request" ) tip_request = obj_elem_to_tip_request(self.request_name_elem) # once we successfully convert the HQ request to the TIP request # we need to update our audit and tracking params self.tracking_params.transaction_id = tip_request.transaction_id self.audit_params.update_from_tip_request(tip_request) return tip_request @catch_uncaught_exceptions def _validate_tip_response(self, tip_response: TIPResponse) -> TIPResponse: """ This validates the TIP response to ensure that when it is converted to an HQ response, that HQ will be able to handle it. Some potential checks we could do: - confirm that the decision is TIPStatus. - We may also want to check that MFARequired is not returned if the user has already validated MFA """ self.logger.info("Validating tip response") # we want to ensure that all of the value in the TIPResponse are the # expected type and none of them are None tip_response.validate_no_optional() # we want to ensure that tracking_id and transaction_id in the TIPResponse # match the tracking_id generated and set on self.tracking_params.tracking_id # and the transaction_id set on self.tracking_params.transaction_id if tip_response.tracking_id != self.tracking_params.tracking_id: raise TIPResponseValidationException( "tip_response.tracking_id != self.tracking_params.tracking_id " f"{tip_response.tracking_id} != {self.tracking_params.tracking_id}" ) if tip_response.transaction_id != self.tracking_params.transaction_id: raise TIPResponseValidationException( "tip_response.transaction_id != self.tracking_params.transaction_id " f"{tip_response.transaction_id} != {self.tracking_params.transaction_id}" ) # we want to ensure that the TIPResponseVersion in the TIPResponse returned # form decision_transaction is the expected version if tip_response.version != self.tip_response_version: raise TIPResponseValidationException( "tip_response.version != self.tip_response_version " f"{tip_response.version} != {self.tip_response_version}" ) return tip_response @Q2Span.instrument(kind=SpanKind.SERVER) @async_catch_uncaught_exceptions async def _send_audit_event( self, audit_event: TIPAuditEvent, audit_details: dict ) -> None: try: Q2Span.add_event(audit_event.value.short_name, audit_details) await self.db.audit_record.create( audit_details=json.dumps(audit_details), session_id=self.audit_params.session_id, audit_action_short_name=audit_event.value.short_name, transaction_id=self.audit_params.transaction_id, user_id=self.audit_params.user_id, customer_id=self.audit_params.customer_id, exception_message=self.audit_params.exception_message, user_logon_id=self.audit_params.user_logon_id, host_account_id=self.audit_params.host_account_id, workstation_id=self.audit_params.workstation_id, ) except Exception as e: raise TIPAuditEventException( f"Uncaught exception {e} when sending audit event: {audit_event.value.short_name}" ) def _build_exception_response_log_metrics( self, error_type: str, error_description: str ) -> str: """ Builds XML response to HQ from a given error_type and an error_description. This utility method should be called every time we handle an exception. :param error_type: str representation of the error class :param error_description: str description of the error :return: str representation of exception XML to return to HQ """ self.logger.exception(error_description) tip_response = TIPResponse.from_exception(error_description=error_description) self._add_tip_status_event(tip_response) self._add_tip_error_event(error_type) return tip_response.to_xml() @async_catch_uncaught_exceptions async def _handle_exception( self, e: TIPBaseException, audit_event: TIPAuditEvent ) -> str: # build the response and log the metrics first so that sending the audit event # does not impact the duration metric calculation. response = self._build_exception_response_log_metrics( error_type=e.__class__.__name__, error_description=e.error_description ) await self._send_audit_event( audit_event, audit_details=self.tracking_params.audit_details( error_msg=e.error_description ), ) return response @property def tip_response_version(self) -> TIPResponseVersion: return TIPResponseVersion.V0
[docs] def wrap_soap_response(self, custom_response: str) -> str: """ If the incoming request is wrapped in a soap envelope, this prepares it for return back to HQ. This overrides the Q2HqRequestHandler version of this method to provide an ErrorDescription and set the ErrorCode in the response. :param custom_response: Response to wrap in an appropriate soap envelope :returns: SOAP wrapped XML body to be sent to HQ """ return get_execute_request_as_xml_response_from_tip_response( str(self.request_name), custom_response, self.hq_response_attrs )
[docs] @Q2Span.instrument(kind=SpanKind.SERVER) async def q2_post(self, *args, **kwargs) -> str: """ Core logic for responding to TIP POST requests. This method does the following: 1. Validates request from HQ and converts the XML to a data class 2. Logs an audit event to indicate whether or not the decision has started - TIPDecisionStarted if validation succeeded - TIPRequestValidationFailed if validation failed - TIPDecisionFailed if some other error occurred prior to decision_transaction being called 3. Calls decision_transaction (abstract method) 4. Logs an audit event - TIPDecisionSucceeded if the decision succeeded - TIPDecisionFailed if the decision failed 5. Validates returned value from decision_transaction and forms the XML to send back to HQ :returns: TIPResponse formatted as an XML string """ try: Q2Span.set_attributes(self._get_otel_default_attributes()) self.set_audit_params() self.set_tracking_params() if self._tag_name != "ExecuteRequestAsXml": raise TIPRequestHandlerException( f"self._tag_name must be ExecuteRequestAsXml but got self._tag_name: {self._tag_name}" ) # If parsing the hq request fails, we will not have the # self.tip_request to use for passing fields into an error response. # Therefore, we send a TIPRequestValidationFailed audit event so we know # to not expect the tracking parameters in the audit details. try: self.tip_request = await self._hq_request_to_tip_request() except TIPBaseException as e: return await self._handle_exception( e, audit_event=TIPAuditEvent.TIPRequestValidationFailed ) # If generating the tracking id fails, we will not have the # self.tracking_id to use for passing fields into an error response. try: self.logger.info("Generating tracking id") self.tracking_params.tracking_id = self.generate_tracking_id( self.tip_request ) except TIPBaseException as e: return await self._handle_exception( e, audit_event=TIPAuditEvent.TIPDecisionFailed ) # now that we have the parsed the request and created the tracking_id # successfully, we can send a TIPDecisionStarted audit event and any # subsequent failures can send a TIPDecisionFailed event self.logger.info( f"Starting TIP decision for transaction id: {self.tracking_params.transaction_id} " f"and tracking id: {self.tracking_params.tracking_id}" ) # we keep the _send_audit_event outside of the try/except block where # the exception block calls self._handle_exception because that method # sends an audit event. We probably don't want to try to send another # audit event if the first one fails (although I could be convinced otherwise # in the event that the first audit event is transient) await self._send_audit_event( TIPAuditEvent.TIPDecisionStarted, self.tracking_params.audit_details() ) # all of these method calls are wrapped with either catch_uncaught_exceptions # or async_catch_uncaught_exceptions, and they all handle the exception the # same way. That said, we can put them all in one try block and catch the # TIPBaseException # decision_start = time.time() try: tip_response = await self._call_decision_transaction(self.tip_request) tip_response = self._validate_tip_response(tip_response) self.logger.info("Converting TIP response to XML") xml_response = tip_response.to_xml() except TIPBaseException as e: return await self._handle_exception( e, audit_event=TIPAuditEvent.TIPDecisionFailed ) # see comment above for keeping self._send_audit_event outside of try/except block # if we make it here, we know that the decision succeeded and we can send the # TIPDecisionSucceeded audit event. await self._send_audit_event( audit_event=TIPAuditEvent.TIPDecisionSucceeded, audit_details=self.tracking_params.audit_details( extra_audit_details=tip_response.to_audit_details() ), ) self._add_tip_status_event(tip_response) return xml_response # This can be hit if we get an exception while handling another exception # or if the request being sent does not have self._tag_name = ExecuteRequestAsXml # For example, when we try to send an audit event for the failure and # sending the audit event fails. except TIPBaseException as e: # note that the only difference between this method and _handle_exception # is that _handle_exception is also calling _send_audit_event. We don't # want to call _send_audit_event here in case it was _send_audit_event that # failed and got us here. return self._build_exception_response_log_metrics( error_type=e.__class__.__name__, error_description=e.error_description, ) # this should be impossible to hit but if all of our exceptions go # uncaught, we still need to send something to HQ except Exception as e: error_description = f"Uncaught error creating TIP response: {e}" return self._build_exception_response_log_metrics( error_type=e.__class__.__name__, error_description=error_description, )
@Q2Span.instrument(kind=SpanKind.SERVER) async def _call_decision_transaction(self, tip_request: TIPRequest) -> TIPResponse: """ Method that calls the abstract method decision_transaction. The main reason for having this wrapper is so that we can instrument this call with OpenTelemetry to better track decision_transaction calls. """ Q2Span.set_attributes(self._get_otel_default_attributes()) return await self.decision_transaction(tip_request)
[docs] @abstractmethod @catch_uncaught_exceptions def generate_tracking_id(self, tip_request: TIPRequest) -> str: """ Using a TIPRequest as input, generates a tracking id. This will be used to set the attribute self.tracking_params.tracking_id that will ultimately be included the the TIPResponse sent back to HQ. :param tip_request: TIPRequest :return: string containing the generated tracking id """ raise NotImplementedError
[docs] @abstractmethod @async_catch_uncaught_exceptions async def decision_transaction(self, tip_request: TIPRequest) -> TIPResponse: """ Using a TIPRequest as input, determines what the status of a transaction should be and returns it as a TIPResponse. :param tip_request: TIPRequest :return: TIPResponse containing the decision for transaction being authorized """ raise NotImplementedError