Source code for q2_sdk.models.tip.schemas.base

import json
from typing import (
    Any,
    Dict,
    get_origin,
    get_args,
    List,
    Union,
    Set,
    Optional,
    Tuple,
    get_type_hints,
    Callable,
    TypeVar,
)
from datetime import datetime
from lxml import objectify, etree
from abc import ABC, abstractmethod
from dataclasses import dataclass, fields, Field, field
from q2_sdk.models.tip.variables import (
    TIPStatus,
    TransactionTypeEnum,
    TIPResponseVersion,
)
from q2_sdk.models.tip.exceptions import (
    TIPModelValidationFromLxmlObjectifyException,
    TIPModelValidationException,
    TIPModelParsingException,
    catch_uncaught_exceptions,
)
from q2_sdk.models.tip.schemas import descriptions
from q2_sdk.tools.utils import to_bool
from q2_sdk.hq.http import XML_PARSER


[docs] @dataclass class TIPModel(ABC): DesiredTipType = TypeVar("DesiredTipType") ParserType = TypeVar("ParserType") def __post_init__(self): """ Post-initialization hook that triggers validation. """ self._allow_optional = True self.validate()
[docs] @staticmethod def try_getattr_from_elem(obj_elem: etree.Element, attr: str): """ Safely get an attribute from an etree Element, raising TIPModelParsingException if not found. :param obj_elem: The etree Element to get the attribute from :param attr: The attribute name to retrieve :return: The attribute value :raises TIPModelParsingException: If the attribute is not found """ try: return getattr(obj_elem, attr) except AttributeError as e: raise TIPModelParsingException( f"Missing required attribute '{attr}' in obj_elem {obj_elem}" ) from e
[docs] @staticmethod def parse_optional( elem: Optional[Any], parser_func: Callable[[str], ParserType] ) -> Optional[ParserType]: """Generic method to parse optional fields with any parser function.""" if elem is None or elem == "": return None return parser_func(elem)
[docs] @staticmethod def parse_datetime( elem: Union[ str, datetime, objectify.StringElement, objectify.ObjectifiedElement ], ) -> datetime: """ Convert a date string from elem.text that should be in iso format to datetime object. :param elem: The date string :return: Parsed datetime object :raises TIPModelParsingException: If the date string cannot be parsed """ if isinstance(elem, datetime): return elem elif isinstance(elem, (objectify.StringElement, objectify.ObjectifiedElement)): elem = str(elem) try: return datetime.fromisoformat(elem) except Exception as e: raise TIPModelParsingException( f"failed to parse elem_text: {elem} as datetime with Exception {e}" ) from e
[docs] @staticmethod def parse_datetime_optional( elem: Optional[ Union[str, datetime, objectify.StringElement, objectify.ObjectifiedElement] ], ) -> Optional[datetime]: """ Parse optional datetime field. :param elem: The date string :return: Parsed Optional[datetime] """ return TIPModel.parse_optional(elem, TIPModel.parse_datetime)
[docs] @staticmethod def parse_str(elem: Optional[str]) -> str: """ Ensure that a str from elem.text is a str and not empty. :param elem: The The Optional[str] from elem.text :return: Parsed str :raises TIPModelParsingException: If elem_text is None or empty string """ if not elem: raise TIPModelParsingException( f"failed to parse elem_text: {elem} as str. elem_text cannot be None or ''." ) return str(elem)
[docs] @staticmethod def parse_str_optional(elem: Optional[str]) -> Optional[str]: """ Parse optional str field. :param elem: The Optional[str] from elem.text :return: Parsed Optional[str] """ return TIPModel.parse_optional(elem, TIPModel.parse_str)
[docs] @staticmethod def parse_int( elem: Union[str, int, objectify.IntElement, objectify.ObjectifiedElement], ) -> int: """ Convert the value coming from element to an integer type. :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed int :raises TIPModelParsingException: If the elem cannot be converted to an int """ try: if not isinstance(elem, (str, int, objectify.IntElement)): raise TIPModelParsingException( f"Cannot convert elem {elem} of type {type(elem)} to int" ) return int(elem) except Exception as e: raise TIPModelParsingException( f"failed to parse elem: {elem} as int with Exception {e}" ) from e
[docs] @staticmethod def parse_int_optional( elem: Optional[ Union[str, int, objectify.IntElement, objectify.ObjectifiedElement] ], ) -> Optional[int]: """ Parse optional int field. :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed Optional[int] """ return TIPModel.parse_optional(elem, TIPModel.parse_int)
[docs] @staticmethod def parse_int_id_optional( elem: Optional[ Union[str, int, objectify.IntElement, objectify.ObjectifiedElement] ], ) -> Optional[int]: """ Parse optional int id field. In TIP Models, we have some ID fields which are optional. In our stored procedures, if a field is NULL, its xml node is not included in the xml request body. To make the NULL more explicit, we set this value to -1. But here we want to set this -1 to None to be more clear to users of TIP that the value is missing. :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed Optional[int] """ int_id = TIPModel.parse_optional(elem, TIPModel.parse_int) # if the value is -1 we want to parse it as None if int_id == -1: return None return int_id
[docs] @staticmethod def parse_bool( elem: Union[bool, str, int, objectify.IntElement, objectify.ObjectifiedElement], ) -> bool: """ Convert the value coming from element to a bool type :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed bool :raises TIPModelParsingException: If the elem cannot be converted to an bool """ try: if not isinstance(elem, (bool, str, int, objectify.IntElement)): raise TIPModelParsingException( f"Cannot convert elem {elem} of type {type(elem)} to int" ) return to_bool(elem) except Exception as e: raise TIPModelParsingException( f"failed to parse elem: {elem} as bool with Exception {e}" ) from e
[docs] @staticmethod def parse_bool_optional( elem: Optional[ Union[bool, str, int, objectify.IntElement, objectify.ObjectifiedElement] ], ) -> Optional[bool]: """ Parse optional bool field. :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed Optional[int] """ return TIPModel.parse_optional(elem, TIPModel.parse_bool)
[docs] @staticmethod def parse_float( elem: Union[ str, int, float, objectify.IntElement, objectify.FloatElement, objectify.ObjectifiedElement, ], ) -> float: """ Convert the value coming from element to a float type :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed float :raises TIPModelParsingException: If the elem cannot be converted to an float """ try: if not isinstance( elem, (str, int, float, objectify.IntElement, objectify.FloatElement) ): raise TIPModelParsingException( f"Cannot convert elem {elem} of type {type(elem)} to float" ) return float(elem) except Exception as e: raise TIPModelParsingException( f"failed to parse elem: {elem} as float with Exception {e}" ) from e
[docs] @staticmethod def parse_float_optional( elem: Optional[Union[str, objectify.FloatElement, float]], ) -> Optional[float]: """ Parse optional float field. :param elem: One element inside an objectify.ObjectifiedElement :return: Parsed Optional[float] """ return TIPModel.parse_optional(elem, TIPModel.parse_float)
[docs] def validate(self) -> None: """ registers all validation checks, ensures all succeed :returns: None :raises TIPModelValidationException: If any validation checks fail. """ # structuring it in this way to allow for additional validation checks # to be included down the road validation_errors: List[str] = [ *self.validate_types(), ] if len(validation_errors) > 0: errors_str = "\n".join(validation_errors) raise TIPModelValidationException(errors_str)
[docs] def validate_type( self, f: Field, field_val: Any, field_types: Set[type], is_optional: bool, validation_errors: List[str], parent_field_str: str, ) -> None: """ Validate a single field's type against expected types. :param f: The dataclass field being validated :param field_val: The value of the field :param field_types: Set of expected types for the field :param is_optional: Whether the field is optional (can be None) :param validation_errors: List to append validation errors to :param parent_field_str: String prefix for nested field names :returns: None """ # if the value is None, check if it has an Optional type if field_val is None: if not self._allow_optional: validation_errors.append( f"{f.name} has value: {field_val} but optional values are not allowed." ) return None # if it does not have an Optional type, we need to add a validation error if not is_optional: validation_errors.append( f"{f.name} has value: {field_val} but does not have an Optional type: {field_types}" ) return None # if validation check pass move to the next field return None # if the field_val is a list, then we need to validate its elements # if the field_val being validated is a sub-class of TIPModel, we need to # recursively validate its fields if isinstance(field_val, TIPModel): field_val.validate_types( validation_errors=validation_errors, parent_field=f"{parent_field_str}{f.name}", ) # if the field_val is not one of its expected types, we need to add a validation error if not isinstance(field_val, tuple(field_types)): validation_errors.append( f"{parent_field_str}{f.name} has value {field_val} with type {type(field_val)} which is not one of its expected types: {field_types}" )
[docs] def validate_types( self, validation_errors: Optional[List] = None, parent_field: str = "" ) -> List[str]: """ Validate all field types in this model. :param validation_errors: Optional list to append errors to :param parent_field: Parent field name for nested validation :return: List of validation error messages :raises TIPModelValidationException: If any field type is a dict """ parent_field_str = "" if parent_field == "" else f"{parent_field}." if validation_errors is None: validation_errors = [] for f in fields(self): field_types, is_optional = self._get_expected_types(f.type, f.name) field_val = getattr(self, f.name) if isinstance(field_val, (list, set)): for fv in field_val: # need to get the type inside of the list or set self.validate_type( f, fv, field_types, is_optional, validation_errors, parent_field_str, ) elif isinstance(field_val, dict): raise TIPModelValidationException( f"field {f.name} has value {field_val} with expected types: {field_types}; however, dict types are currently not supported by TIPModel" ) else: self.validate_type( f, field_val, field_types, is_optional, validation_errors, parent_field_str, ) return validation_errors
@classmethod def _type_to_set(cls, field_type: Union[type, Set[type]]) -> Set[type]: """ Convert a type or set of types to a set. :param field_type: Type or set of types to convert :return: Set containing the type(s) """ if isinstance(field_type, set): return field_type return set([field_type]) @classmethod def _handle_forward_references( cls, field_type: Any, field_name: str ) -> Tuple[Set[type], bool]: # Handle forward references using get_type_hints try: # Get type hints for this class to resolve forward references type_hints = get_type_hints(cls) # Find the field with this type annotation try: resolved_type = type_hints[field_name] except KeyError as e: raise TIPModelValidationException( f"Unable to find type hint forward reference {field_type} for field {field_name}" ) from e return cls._get_expected_types(resolved_type, field_name) except Exception as e: raise TIPModelValidationException( f"Unable to resolve forward reference {field_type} when validating type" ) from e @classmethod def _handle_union_type(cls, args: Tuple[Any, ...]) -> Tuple[Set[type], bool]: non_none_types = [arg for arg in args if arg is not type(None)] is_optional = type(None) in args return set(non_none_types), is_optional @classmethod def _handle_type_with_origin(cls, field_type: Any) -> Tuple[Set[type], bool]: origin = get_origin(field_type) if origin: args = get_args(field_type) if origin in (list, List, set, Set): return set(args), False elif origin in (Union,): # Handle Optional[T] which is Union[T, None] return cls._handle_union_type(args) return cls._type_to_set(field_type), False @classmethod def _get_expected_types( cls, field_type: Any, field_name: str ) -> Tuple[Set[type], bool]: """Resolve various type annotations to find the actual type.""" if isinstance(field_type, str): return cls._handle_forward_references( field_name=field_name, field_type=field_type ) return cls._handle_type_with_origin(field_type) @classmethod def _get_expected_types_for_data_dictionary( cls, field_type: Any, field_name: str ) -> Tuple[Set[type], bool, Optional[type]]: """Resolve various type annotations to find the actual type.""" if isinstance(field_type, str): tup = cls._handle_forward_references( field_name=field_name, field_type=field_type ) else: tup = cls._handle_type_with_origin(field_type) return tup[0], tup[1], get_origin(field_type)
[docs] def to_kwargs(self) -> Dict[str, Any]: """ Convert this model to a dictionary of field names and values. :return: Dictionary mapping field names to their values """ return {f.name: getattr(self, f.name) for f in fields(self)}
[docs] @classmethod @abstractmethod def model_validate_from_elem(cls, data: etree.Element) -> "TIPModel": """Abstract method for converting lxml objectify data to TIPModel""" pass
@classmethod def _format_for_data_dictionary( cls, f: Field, field_types: Set[type], is_optional: bool ) -> Dict[str, Any]: return { "description": f.metadata.get("description", "No description provided"), "type": ", ".join([str(ft.__name__) for ft in field_types]), "required": str(not is_optional), } @classmethod def _field_to_data_dictionary( cls, f: Field, field_types: Set[type], is_optional: bool ) -> Dict[str, Any]: for ft in field_types: if issubclass(ft, TIPModel): new_fields = fields(ft) return { "type": str(ft.__name__), "description": "Data class containing the fields described in fields property", "fields": cls._fields_to_data_dictionary(new_fields), } else: return cls._format_for_data_dictionary(f, field_types, is_optional) @classmethod def _fields_to_data_dictionary( cls, dc_fields: Tuple[Field, ...] ) -> Dict[str, Dict[str, Any]]: dd = dict() for f in dc_fields: field_types, is_optional, origin = ( cls._get_expected_types_for_data_dictionary(f.type, f.name) ) if origin in (list, List, set, Set): str_type = str(origin.__name__).title() dd[f.name] = { "type": str_type, "description": f"{str_type} type with elements described in elements property", "elements": cls._field_to_data_dictionary( f, field_types, is_optional ), } else: dd[f.name] = cls._field_to_data_dictionary(f, field_types, is_optional) return dd
[docs] @classmethod def data_dictionary(cls) -> Dict[str, Any]: """ Generates a data dictionary as a dict type. :return: dict containing all fields with their data types and descriptions """ return cls._fields_to_data_dictionary(fields(cls))
[docs] @classmethod def data_dictionary_json(cls) -> str: """ Generates a data dictionary as a JSON string. :return: JSON string containing all fields with their data types and descriptions """ return json.dumps(cls.data_dictionary())
[docs] @dataclass class TIPRequest(TIPModel): """ Data class that contains fields that are shared across transaction types and should be included in all TIP requests. """ customer_key: str = field( metadata={ "description": descriptions.CUSTOMER_KEY_DESCRIPTION, } ) event_user_id: int = field( metadata={ "description": descriptions.EVENT_USER_ID_DESCRIPTION, } ) session_id: str = field( metadata={ "description": descriptions.SESSION_ID_DESCRIPTION, } ) ui_source: str = field( metadata={ "description": descriptions.UI_SOURCE_DESCRIPTION, } ) transaction_id: int = field( metadata={ "description": descriptions.TRANSACTION_ID_DESCRIPTION, } ) audit_id: int = field( metadata={ "description": descriptions.AUDIT_ID_DESCRIPTION, } ) event_date_time: datetime = field( metadata={ "description": descriptions.EVENT_DATE_TIME_DESCRIPTION, } ) customer_id: int = field( metadata={ "description": descriptions.CUSTOMER_ID_DESCRIPTION, } ) group_id: int = field( metadata={ "description": descriptions.GROUP_ID_DESCRIPTION, } ) transaction_type: TransactionTypeEnum = field( metadata={ "description": f"{descriptions.TRANSACTION_TYPE_DESCRIPTION} Will be one of the following: {', '.join(TransactionTypeEnum.valid_names())}", } ) gt_type_short_name: str = field( metadata={ "description": descriptions.GT_TYPE_SHORT_NAME_DESCRIPTION, } ) transaction_type_version: str = field( metadata={ "description": descriptions.TRANSACTION_TYPE_VERSION_DESCRIPTION, } ) create_date: datetime = field( metadata={ "description": descriptions.CREATE_DATE_DESCRIPTION, } ) transaction_amount: float = field( metadata={ "description": descriptions.AUTHORIZED_DATE_DESCRIPTION, } ) login_audit_id: int = field( metadata={ "description": descriptions.LOGIN_AUDIT_ID_DESCRIPTION, } ) user_logon_id: int = field( metadata={ "description": descriptions.USER_LOGON_ID_DESCRIPTION, } ) created_user_id: Optional[int] = field( metadata={ "description": descriptions.CREATED_USER_ID_DESCRIPTION, } ) authorized_user_id: Optional[int] = field( metadata={ "description": descriptions.AUTHORIZED_USER_ID_DESCRIPTION, } ) modified_user_id: Optional[int] = field( metadata={ "description": descriptions.MODIFIED_USER_ID_DESCRIPTION, } )
[docs] @classmethod def model_validate_from_elem(cls, data: etree.Element) -> "TIPRequest": """ Converts lxml objectify data to a TIPRequest model. :param data: Input data as etree.Element """ if isinstance(data, objectify.ObjectifiedElement): try: # For the optional fields, there is the possibility that the node is not present # in the xml at all. In this case, accessing the attribute on the objectified # element will raise an AttributeError. But since the field is optional, we do not # want to fail and should instead set it to None. try: created_user_id = data.CreatedUserID except AttributeError: created_user_id = None try: authorized_user_id = data.AuthorizedUserID except AttributeError: authorized_user_id = None try: modified_user_id = data.ModifiedUserID except AttributeError: modified_user_id = None return cls( # parsed differently because it comes from attributes in the root XML node customer_key=TIPModel.parse_str(data.get("InsightCustomerKey")), event_user_id=TIPModel.parse_int(data.EventUserID), session_id=TIPModel.parse_str(data.SessionID.text), ui_source=TIPModel.parse_str(data.UISource.text), transaction_id=TIPModel.parse_int(data.TransactionID), audit_id=TIPModel.parse_int(data.AuditID), event_date_time=TIPModel.parse_datetime(data.EventDateTime), customer_id=TIPModel.parse_int(data.CustomerID), group_id=TIPModel.parse_int(data.GroupID), transaction_type=TransactionTypeEnum.from_elem_str( TIPModel.parse_str(data.TransactionType.text) ), gt_type_short_name=TIPModel.parse_str(data.GTTypeShortName.text), transaction_type_version=TIPModel.parse_str( data.TransactionTypeVersion.text ), create_date=TIPModel.parse_datetime(data.CreateDate), transaction_amount=TIPModel.parse_float(data.TransactionAmount), login_audit_id=TIPModel.parse_int(data.LoginAuditID), user_logon_id=TIPModel.parse_int(data.UserLogonID), created_user_id=TIPModel.parse_int_id_optional(created_user_id), authorized_user_id=TIPModel.parse_int_id_optional( authorized_user_id ), modified_user_id=TIPModel.parse_int_id_optional(modified_user_id), ) except AttributeError as e: raise TIPModelParsingException( f"Missing required attribute in obj_elem: {str(e)}" ) raise TIPModelValidationFromLxmlObjectifyException(cls.__name__, type(data))
[docs] def to_dict(self) -> Dict[str, Any]: return { "CustomerKey": self.customer_key, "EventUserId": self.event_user_id, "CreatedUserId": self.created_user_id, "AuthorizedUserId": self.authorized_user_id, "ModifiedUserId": self.modified_user_id, "UISource": self.ui_source, "SessionId": self.session_id, "TransactionId": self.transaction_id, "AuditId": self.audit_id, "EventDateTime": self.event_date_time, "CustomerId": self.customer_id, "GroupId": self.group_id, "TransactionType": self.transaction_type, "TransactionTypeVersion": self.transaction_type_version, "CreateDate": self.create_date, "TransactionAmount": self.transaction_amount, "LoginAuditId": self.login_audit_id, "UserLogonId": self.user_logon_id, }
[docs] @dataclass class TIPResponse(TIPModel): """ Data class used to build the TIP response XML returned to HQ """ status: TIPStatus version: TIPResponseVersion = TIPResponseVersion.latest_version() # these fields are optional because the TIP handler still need to send a response # to HQ even if it fails before it knows all of these fields tracking_id: Optional[str] = None transaction_id: Optional[int] = None error_type: str = "Success" error_code: str = "0" error_description: str = ""
[docs] def to_audit_details(self) -> Dict[str, str]: return { "TrackingId": self.tracking_id or "", "TransactionId": str(self.transaction_id or ""), "Status": self.status.value, "ErrorType": self.error_type, "ErrorCode": self.error_code, "ErrorDescription": self.error_description, "Version": self.version, }
[docs] @catch_uncaught_exceptions def to_xml(self) -> str: """ Convert this TIPResponse to XML formatted string. :return: XML string representation of the response """ root = etree.Element("TIP") result_elem = etree.SubElement(root, "Result") error_code_elem = etree.SubElement( result_elem, "ErrorCode", ErrorType=self.error_type ) error_code_elem.text = self.error_code etree.SubElement(result_elem, "ErrorDescription").text = self.error_description etree.SubElement(result_elem, "Version").text = self.version data_elem = etree.SubElement(root, "Data") etree.SubElement(data_elem, "Status").text = self.status.value if self.tracking_id is not None: etree.SubElement(data_elem, "TrackingID").text = str(self.tracking_id) if self.transaction_id is not None: etree.SubElement(data_elem, "TransactionID").text = str(self.transaction_id) return etree.tostring(root).decode("utf8")
[docs] @classmethod def from_exception( cls, error_description: str, version: TIPResponseVersion = TIPResponseVersion.latest_version(), tracking_id: Optional[str] = None, transaction_id: Optional[int] = None, ) -> "TIPResponse": """ Converts an error description along with other optional parameters to a TIPResponse :param cls: Description :param error_description: Description :type error_description: str :param version: Description :type version: TIPResponseVersion :param tracking_id: Description :type tracking_id: Optional[str] :param transaction_id: Description :type transaction_id: Optional[int] """ return cls( tracking_id=tracking_id, version=version, error_description=error_description, transaction_id=transaction_id, status=TIPStatus.Exception, error_type="Failure", error_code="1", )
[docs] @classmethod def model_validate_from_elem(cls, data: etree.Element) -> "TIPResponse": """ We don't need this method implemented but we still want to inherit from TIPModel so that we can get type validation """ raise NotImplementedError()
[docs] def validate_no_optional(self): """ This is the same method as super().validate() except we temporarily set self._allow_optional to False so that we force all the fields in this class to be required. """ self._allow_optional = False try: self.validate() except Exception as e: raise e # we need to ensure that even when an exception is raised, # we still restore the _allow_optional attribute finally: self._allow_optional = True
[docs] def get_execute_request_as_xml_response_from_tip_response( root_node: str, xml_str: str, attrs: dict, ) -> str: """ Builds a soap envelope to respond to an "ExecuteRequestAsXML" request from HQ :param root_node: Request nodename provided by HQ :param form_data: Response data :param attrs: Attributes to add to the interior Data node, passed through to HQ :param hq_commands: (Optional) Instance of `.HqCommands` to send additional commands with the response """ response = f""" <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <soap:Body> <ExecuteRequestAsXmlResponse xmlns="http://tempuri.org/"> <ExecuteRequestAsXmlResult> {xml_str} </ExecuteRequestAsXmlResult> </ExecuteRequestAsXmlResponse> </soap:Body> </soap:Envelope> """ response = etree.XML(response, parser=XML_PARSER) data_node = response.find(".//{*}Data") if data_node is not None: for key, value in sorted(attrs.items()): data_node.attrib[key] = str(value) return etree.tostring(response).decode()