import json
from typing import (
Any,
Dict,
get_origin,
get_args,
List,
Union,
Set,
Optional,
Tuple,
get_type_hints,
Callable,
TypeVar,
)
from datetime import datetime
from lxml import objectify, etree
from abc import ABC, abstractmethod
from dataclasses import dataclass, fields, Field, field
from q2_sdk.models.tip.variables import (
TIPStatus,
TransactionTypeEnum,
TIPResponseVersion,
)
from q2_sdk.models.tip.exceptions import (
TIPModelValidationFromLxmlObjectifyException,
TIPModelValidationException,
TIPModelParsingException,
catch_uncaught_exceptions,
)
from q2_sdk.models.tip.schemas import descriptions
from q2_sdk.tools.utils import to_bool
from q2_sdk.hq.http import XML_PARSER
[docs]
@dataclass
class TIPModel(ABC):
DesiredTipType = TypeVar("DesiredTipType")
ParserType = TypeVar("ParserType")
def __post_init__(self):
"""
Post-initialization hook that triggers validation.
"""
self._allow_optional = True
self.validate()
[docs]
@staticmethod
def try_getattr_from_elem(obj_elem: etree.Element, attr: str):
"""
Safely get an attribute from an etree Element, raising TIPModelParsingException if not found.
:param obj_elem: The etree Element to get the attribute from
:param attr: The attribute name to retrieve
:return: The attribute value
:raises TIPModelParsingException: If the attribute is not found
"""
try:
return getattr(obj_elem, attr)
except AttributeError as e:
raise TIPModelParsingException(
f"Missing required attribute '{attr}' in obj_elem {obj_elem}"
) from e
[docs]
@staticmethod
def parse_optional(
elem: Optional[Any], parser_func: Callable[[str], ParserType]
) -> Optional[ParserType]:
"""Generic method to parse optional fields with any parser function."""
if elem is None or elem == "":
return None
return parser_func(elem)
[docs]
@staticmethod
def parse_datetime(
elem: Union[
str, datetime, objectify.StringElement, objectify.ObjectifiedElement
],
) -> datetime:
"""
Convert a date string from elem.text that should be in iso format
to datetime object.
:param elem: The date string
:return: Parsed datetime object
:raises TIPModelParsingException: If the date string cannot be parsed
"""
if isinstance(elem, datetime):
return elem
elif isinstance(elem, (objectify.StringElement, objectify.ObjectifiedElement)):
elem = str(elem)
try:
return datetime.fromisoformat(elem)
except Exception as e:
raise TIPModelParsingException(
f"failed to parse elem_text: {elem} as datetime with Exception {e}"
) from e
[docs]
@staticmethod
def parse_datetime_optional(
elem: Optional[
Union[str, datetime, objectify.StringElement, objectify.ObjectifiedElement]
],
) -> Optional[datetime]:
"""
Parse optional datetime field.
:param elem: The date string
:return: Parsed Optional[datetime]
"""
return TIPModel.parse_optional(elem, TIPModel.parse_datetime)
[docs]
@staticmethod
def parse_str(elem: Optional[str]) -> str:
"""
Ensure that a str from elem.text is a str and not empty.
:param elem: The The Optional[str] from elem.text
:return: Parsed str
:raises TIPModelParsingException: If elem_text is None or empty string
"""
if not elem:
raise TIPModelParsingException(
f"failed to parse elem_text: {elem} as str. elem_text cannot be None or ''."
)
return str(elem)
[docs]
@staticmethod
def parse_str_optional(elem: Optional[str]) -> Optional[str]:
"""
Parse optional str field.
:param elem: The Optional[str] from elem.text
:return: Parsed Optional[str]
"""
return TIPModel.parse_optional(elem, TIPModel.parse_str)
[docs]
@staticmethod
def parse_int(
elem: Union[str, int, objectify.IntElement, objectify.ObjectifiedElement],
) -> int:
"""
Convert the value coming from element to an integer type.
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed int
:raises TIPModelParsingException: If the elem cannot be converted to an int
"""
try:
if not isinstance(elem, (str, int, objectify.IntElement)):
raise TIPModelParsingException(
f"Cannot convert elem {elem} of type {type(elem)} to int"
)
return int(elem)
except Exception as e:
raise TIPModelParsingException(
f"failed to parse elem: {elem} as int with Exception {e}"
) from e
[docs]
@staticmethod
def parse_int_optional(
elem: Optional[
Union[str, int, objectify.IntElement, objectify.ObjectifiedElement]
],
) -> Optional[int]:
"""
Parse optional int field.
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed Optional[int]
"""
return TIPModel.parse_optional(elem, TIPModel.parse_int)
[docs]
@staticmethod
def parse_int_id_optional(
elem: Optional[
Union[str, int, objectify.IntElement, objectify.ObjectifiedElement]
],
) -> Optional[int]:
"""
Parse optional int id field. In TIP Models, we have some ID fields
which are optional. In our stored procedures, if a field is NULL, its xml
node is not included in the xml request body. To make the NULL more
explicit, we set this value to -1. But here we want to set this -1 to None
to be more clear to users of TIP that the value is missing.
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed Optional[int]
"""
int_id = TIPModel.parse_optional(elem, TIPModel.parse_int)
# if the value is -1 we want to parse it as None
if int_id == -1:
return None
return int_id
[docs]
@staticmethod
def parse_bool(
elem: Union[bool, str, int, objectify.IntElement, objectify.ObjectifiedElement],
) -> bool:
"""
Convert the value coming from element to a bool type
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed bool
:raises TIPModelParsingException: If the elem cannot be converted to an bool
"""
try:
if not isinstance(elem, (bool, str, int, objectify.IntElement)):
raise TIPModelParsingException(
f"Cannot convert elem {elem} of type {type(elem)} to int"
)
return to_bool(elem)
except Exception as e:
raise TIPModelParsingException(
f"failed to parse elem: {elem} as bool with Exception {e}"
) from e
[docs]
@staticmethod
def parse_bool_optional(
elem: Optional[
Union[bool, str, int, objectify.IntElement, objectify.ObjectifiedElement]
],
) -> Optional[bool]:
"""
Parse optional bool field.
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed Optional[int]
"""
return TIPModel.parse_optional(elem, TIPModel.parse_bool)
[docs]
@staticmethod
def parse_float(
elem: Union[
str,
int,
float,
objectify.IntElement,
objectify.FloatElement,
objectify.ObjectifiedElement,
],
) -> float:
"""
Convert the value coming from element to a float type
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed float
:raises TIPModelParsingException: If the elem cannot be converted to an float
"""
try:
if not isinstance(
elem, (str, int, float, objectify.IntElement, objectify.FloatElement)
):
raise TIPModelParsingException(
f"Cannot convert elem {elem} of type {type(elem)} to float"
)
return float(elem)
except Exception as e:
raise TIPModelParsingException(
f"failed to parse elem: {elem} as float with Exception {e}"
) from e
[docs]
@staticmethod
def parse_float_optional(
elem: Optional[Union[str, objectify.FloatElement, float]],
) -> Optional[float]:
"""
Parse optional float field.
:param elem: One element inside an objectify.ObjectifiedElement
:return: Parsed Optional[float]
"""
return TIPModel.parse_optional(elem, TIPModel.parse_float)
[docs]
def validate(self) -> None:
"""
registers all validation checks, ensures all succeed
:returns: None
:raises TIPModelValidationException: If any validation checks fail.
"""
# structuring it in this way to allow for additional validation checks
# to be included down the road
validation_errors: List[str] = [
*self.validate_types(),
]
if len(validation_errors) > 0:
errors_str = "\n".join(validation_errors)
raise TIPModelValidationException(errors_str)
[docs]
def validate_type(
self,
f: Field,
field_val: Any,
field_types: Set[type],
is_optional: bool,
validation_errors: List[str],
parent_field_str: str,
) -> None:
"""
Validate a single field's type against expected types.
:param f: The dataclass field being validated
:param field_val: The value of the field
:param field_types: Set of expected types for the field
:param is_optional: Whether the field is optional (can be None)
:param validation_errors: List to append validation errors to
:param parent_field_str: String prefix for nested field names
:returns: None
"""
# if the value is None, check if it has an Optional type
if field_val is None:
if not self._allow_optional:
validation_errors.append(
f"{f.name} has value: {field_val} but optional values are not allowed."
)
return None
# if it does not have an Optional type, we need to add a validation error
if not is_optional:
validation_errors.append(
f"{f.name} has value: {field_val} but does not have an Optional type: {field_types}"
)
return None
# if validation check pass move to the next field
return None
# if the field_val is a list, then we need to validate its elements
# if the field_val being validated is a sub-class of TIPModel, we need to
# recursively validate its fields
if isinstance(field_val, TIPModel):
field_val.validate_types(
validation_errors=validation_errors,
parent_field=f"{parent_field_str}{f.name}",
)
# if the field_val is not one of its expected types, we need to add a validation error
if not isinstance(field_val, tuple(field_types)):
validation_errors.append(
f"{parent_field_str}{f.name} has value {field_val} with type {type(field_val)} which is not one of its expected types: {field_types}"
)
[docs]
def validate_types(
self, validation_errors: Optional[List] = None, parent_field: str = ""
) -> List[str]:
"""
Validate all field types in this model.
:param validation_errors: Optional list to append errors to
:param parent_field: Parent field name for nested validation
:return: List of validation error messages
:raises TIPModelValidationException: If any field type is a dict
"""
parent_field_str = "" if parent_field == "" else f"{parent_field}."
if validation_errors is None:
validation_errors = []
for f in fields(self):
field_types, is_optional = self._get_expected_types(f.type, f.name)
field_val = getattr(self, f.name)
if isinstance(field_val, (list, set)):
for fv in field_val:
# need to get the type inside of the list or set
self.validate_type(
f,
fv,
field_types,
is_optional,
validation_errors,
parent_field_str,
)
elif isinstance(field_val, dict):
raise TIPModelValidationException(
f"field {f.name} has value {field_val} with expected types: {field_types}; however, dict types are currently not supported by TIPModel"
)
else:
self.validate_type(
f,
field_val,
field_types,
is_optional,
validation_errors,
parent_field_str,
)
return validation_errors
@classmethod
def _type_to_set(cls, field_type: Union[type, Set[type]]) -> Set[type]:
"""
Convert a type or set of types to a set.
:param field_type: Type or set of types to convert
:return: Set containing the type(s)
"""
if isinstance(field_type, set):
return field_type
return set([field_type])
@classmethod
def _handle_forward_references(
cls, field_type: Any, field_name: str
) -> Tuple[Set[type], bool]:
# Handle forward references using get_type_hints
try:
# Get type hints for this class to resolve forward references
type_hints = get_type_hints(cls)
# Find the field with this type annotation
try:
resolved_type = type_hints[field_name]
except KeyError as e:
raise TIPModelValidationException(
f"Unable to find type hint forward reference {field_type} for field {field_name}"
) from e
return cls._get_expected_types(resolved_type, field_name)
except Exception as e:
raise TIPModelValidationException(
f"Unable to resolve forward reference {field_type} when validating type"
) from e
@classmethod
def _handle_union_type(cls, args: Tuple[Any, ...]) -> Tuple[Set[type], bool]:
non_none_types = [arg for arg in args if arg is not type(None)]
is_optional = type(None) in args
return set(non_none_types), is_optional
@classmethod
def _handle_type_with_origin(cls, field_type: Any) -> Tuple[Set[type], bool]:
origin = get_origin(field_type)
if origin:
args = get_args(field_type)
if origin in (list, List, set, Set):
return set(args), False
elif origin in (Union,):
# Handle Optional[T] which is Union[T, None]
return cls._handle_union_type(args)
return cls._type_to_set(field_type), False
@classmethod
def _get_expected_types(
cls, field_type: Any, field_name: str
) -> Tuple[Set[type], bool]:
"""Resolve various type annotations to find the actual type."""
if isinstance(field_type, str):
return cls._handle_forward_references(
field_name=field_name, field_type=field_type
)
return cls._handle_type_with_origin(field_type)
@classmethod
def _get_expected_types_for_data_dictionary(
cls, field_type: Any, field_name: str
) -> Tuple[Set[type], bool, Optional[type]]:
"""Resolve various type annotations to find the actual type."""
if isinstance(field_type, str):
tup = cls._handle_forward_references(
field_name=field_name, field_type=field_type
)
else:
tup = cls._handle_type_with_origin(field_type)
return tup[0], tup[1], get_origin(field_type)
[docs]
def to_kwargs(self) -> Dict[str, Any]:
"""
Convert this model to a dictionary of field names and values.
:return: Dictionary mapping field names to their values
"""
return {f.name: getattr(self, f.name) for f in fields(self)}
[docs]
@classmethod
@abstractmethod
def model_validate_from_elem(cls, data: etree.Element) -> "TIPModel":
"""Abstract method for converting lxml objectify data to TIPModel"""
pass
@classmethod
def _format_for_data_dictionary(
cls, f: Field, field_types: Set[type], is_optional: bool
) -> Dict[str, Any]:
return {
"description": f.metadata.get("description", "No description provided"),
"type": ", ".join([str(ft.__name__) for ft in field_types]),
"required": str(not is_optional),
}
@classmethod
def _field_to_data_dictionary(
cls, f: Field, field_types: Set[type], is_optional: bool
) -> Dict[str, Any]:
for ft in field_types:
if issubclass(ft, TIPModel):
new_fields = fields(ft)
return {
"type": str(ft.__name__),
"description": "Data class containing the fields described in fields property",
"fields": cls._fields_to_data_dictionary(new_fields),
}
else:
return cls._format_for_data_dictionary(f, field_types, is_optional)
@classmethod
def _fields_to_data_dictionary(
cls, dc_fields: Tuple[Field, ...]
) -> Dict[str, Dict[str, Any]]:
dd = dict()
for f in dc_fields:
field_types, is_optional, origin = (
cls._get_expected_types_for_data_dictionary(f.type, f.name)
)
if origin in (list, List, set, Set):
str_type = str(origin.__name__).title()
dd[f.name] = {
"type": str_type,
"description": f"{str_type} type with elements described in elements property",
"elements": cls._field_to_data_dictionary(
f, field_types, is_optional
),
}
else:
dd[f.name] = cls._field_to_data_dictionary(f, field_types, is_optional)
return dd
[docs]
@classmethod
def data_dictionary(cls) -> Dict[str, Any]:
"""
Generates a data dictionary as a dict type.
:return: dict containing all fields with their data types and descriptions
"""
return cls._fields_to_data_dictionary(fields(cls))
[docs]
@classmethod
def data_dictionary_json(cls) -> str:
"""
Generates a data dictionary as a JSON string.
:return: JSON string containing all fields with their data types and descriptions
"""
return json.dumps(cls.data_dictionary())
[docs]
@dataclass
class TIPRequest(TIPModel):
"""
Data class that contains fields that are shared across transaction
types and should be included in all TIP requests.
"""
customer_key: str = field(
metadata={
"description": descriptions.CUSTOMER_KEY_DESCRIPTION,
}
)
event_user_id: int = field(
metadata={
"description": descriptions.EVENT_USER_ID_DESCRIPTION,
}
)
session_id: str = field(
metadata={
"description": descriptions.SESSION_ID_DESCRIPTION,
}
)
ui_source: str = field(
metadata={
"description": descriptions.UI_SOURCE_DESCRIPTION,
}
)
transaction_id: int = field(
metadata={
"description": descriptions.TRANSACTION_ID_DESCRIPTION,
}
)
audit_id: int = field(
metadata={
"description": descriptions.AUDIT_ID_DESCRIPTION,
}
)
event_date_time: datetime = field(
metadata={
"description": descriptions.EVENT_DATE_TIME_DESCRIPTION,
}
)
customer_id: int = field(
metadata={
"description": descriptions.CUSTOMER_ID_DESCRIPTION,
}
)
group_id: int = field(
metadata={
"description": descriptions.GROUP_ID_DESCRIPTION,
}
)
transaction_type: TransactionTypeEnum = field(
metadata={
"description": f"{descriptions.TRANSACTION_TYPE_DESCRIPTION} Will be one of the following: {', '.join(TransactionTypeEnum.valid_names())}",
}
)
gt_type_short_name: str = field(
metadata={
"description": descriptions.GT_TYPE_SHORT_NAME_DESCRIPTION,
}
)
transaction_type_version: str = field(
metadata={
"description": descriptions.TRANSACTION_TYPE_VERSION_DESCRIPTION,
}
)
create_date: datetime = field(
metadata={
"description": descriptions.CREATE_DATE_DESCRIPTION,
}
)
transaction_amount: float = field(
metadata={
"description": descriptions.AUTHORIZED_DATE_DESCRIPTION,
}
)
login_audit_id: int = field(
metadata={
"description": descriptions.LOGIN_AUDIT_ID_DESCRIPTION,
}
)
user_logon_id: int = field(
metadata={
"description": descriptions.USER_LOGON_ID_DESCRIPTION,
}
)
created_user_id: Optional[int] = field(
metadata={
"description": descriptions.CREATED_USER_ID_DESCRIPTION,
}
)
authorized_user_id: Optional[int] = field(
metadata={
"description": descriptions.AUTHORIZED_USER_ID_DESCRIPTION,
}
)
modified_user_id: Optional[int] = field(
metadata={
"description": descriptions.MODIFIED_USER_ID_DESCRIPTION,
}
)
[docs]
@classmethod
def model_validate_from_elem(cls, data: etree.Element) -> "TIPRequest":
"""
Converts lxml objectify data to a TIPRequest model.
:param data: Input data as etree.Element
"""
if isinstance(data, objectify.ObjectifiedElement):
try:
# For the optional fields, there is the possibility that the node is not present
# in the xml at all. In this case, accessing the attribute on the objectified
# element will raise an AttributeError. But since the field is optional, we do not
# want to fail and should instead set it to None.
try:
created_user_id = data.CreatedUserID
except AttributeError:
created_user_id = None
try:
authorized_user_id = data.AuthorizedUserID
except AttributeError:
authorized_user_id = None
try:
modified_user_id = data.ModifiedUserID
except AttributeError:
modified_user_id = None
return cls(
# parsed differently because it comes from attributes in the root XML node
customer_key=TIPModel.parse_str(data.get("InsightCustomerKey")),
event_user_id=TIPModel.parse_int(data.EventUserID),
session_id=TIPModel.parse_str(data.SessionID.text),
ui_source=TIPModel.parse_str(data.UISource.text),
transaction_id=TIPModel.parse_int(data.TransactionID),
audit_id=TIPModel.parse_int(data.AuditID),
event_date_time=TIPModel.parse_datetime(data.EventDateTime),
customer_id=TIPModel.parse_int(data.CustomerID),
group_id=TIPModel.parse_int(data.GroupID),
transaction_type=TransactionTypeEnum.from_elem_str(
TIPModel.parse_str(data.TransactionType.text)
),
gt_type_short_name=TIPModel.parse_str(data.GTTypeShortName.text),
transaction_type_version=TIPModel.parse_str(
data.TransactionTypeVersion.text
),
create_date=TIPModel.parse_datetime(data.CreateDate),
transaction_amount=TIPModel.parse_float(data.TransactionAmount),
login_audit_id=TIPModel.parse_int(data.LoginAuditID),
user_logon_id=TIPModel.parse_int(data.UserLogonID),
created_user_id=TIPModel.parse_int_id_optional(created_user_id),
authorized_user_id=TIPModel.parse_int_id_optional(
authorized_user_id
),
modified_user_id=TIPModel.parse_int_id_optional(modified_user_id),
)
except AttributeError as e:
raise TIPModelParsingException(
f"Missing required attribute in obj_elem: {str(e)}"
)
raise TIPModelValidationFromLxmlObjectifyException(cls.__name__, type(data))
[docs]
def to_dict(self) -> Dict[str, Any]:
return {
"CustomerKey": self.customer_key,
"EventUserId": self.event_user_id,
"CreatedUserId": self.created_user_id,
"AuthorizedUserId": self.authorized_user_id,
"ModifiedUserId": self.modified_user_id,
"UISource": self.ui_source,
"SessionId": self.session_id,
"TransactionId": self.transaction_id,
"AuditId": self.audit_id,
"EventDateTime": self.event_date_time,
"CustomerId": self.customer_id,
"GroupId": self.group_id,
"TransactionType": self.transaction_type,
"TransactionTypeVersion": self.transaction_type_version,
"CreateDate": self.create_date,
"TransactionAmount": self.transaction_amount,
"LoginAuditId": self.login_audit_id,
"UserLogonId": self.user_logon_id,
}
[docs]
@dataclass
class TIPResponse(TIPModel):
"""
Data class used to build the TIP response XML returned to HQ
"""
status: TIPStatus
version: TIPResponseVersion = TIPResponseVersion.latest_version()
# these fields are optional because the TIP handler still need to send a response
# to HQ even if it fails before it knows all of these fields
tracking_id: Optional[str] = None
transaction_id: Optional[int] = None
error_type: str = "Success"
error_code: str = "0"
error_description: str = ""
[docs]
def to_audit_details(self) -> Dict[str, str]:
return {
"TrackingId": self.tracking_id or "",
"TransactionId": str(self.transaction_id or ""),
"Status": self.status.value,
"ErrorType": self.error_type,
"ErrorCode": self.error_code,
"ErrorDescription": self.error_description,
"Version": self.version,
}
[docs]
@catch_uncaught_exceptions
def to_xml(self) -> str:
"""
Convert this TIPResponse to XML formatted string.
:return: XML string representation of the response
"""
root = etree.Element("TIP")
result_elem = etree.SubElement(root, "Result")
error_code_elem = etree.SubElement(
result_elem, "ErrorCode", ErrorType=self.error_type
)
error_code_elem.text = self.error_code
etree.SubElement(result_elem, "ErrorDescription").text = self.error_description
etree.SubElement(result_elem, "Version").text = self.version
data_elem = etree.SubElement(root, "Data")
etree.SubElement(data_elem, "Status").text = self.status.value
if self.tracking_id is not None:
etree.SubElement(data_elem, "TrackingID").text = str(self.tracking_id)
if self.transaction_id is not None:
etree.SubElement(data_elem, "TransactionID").text = str(self.transaction_id)
return etree.tostring(root).decode("utf8")
[docs]
@classmethod
def from_exception(
cls,
error_description: str,
version: TIPResponseVersion = TIPResponseVersion.latest_version(),
tracking_id: Optional[str] = None,
transaction_id: Optional[int] = None,
) -> "TIPResponse":
"""
Converts an error description along with other optional parameters to a TIPResponse
:param cls: Description
:param error_description: Description
:type error_description: str
:param version: Description
:type version: TIPResponseVersion
:param tracking_id: Description
:type tracking_id: Optional[str]
:param transaction_id: Description
:type transaction_id: Optional[int]
"""
return cls(
tracking_id=tracking_id,
version=version,
error_description=error_description,
transaction_id=transaction_id,
status=TIPStatus.Exception,
error_type="Failure",
error_code="1",
)
[docs]
@classmethod
def model_validate_from_elem(cls, data: etree.Element) -> "TIPResponse":
"""
We don't need this method implemented but we still want to inherit
from TIPModel so that we can get type validation
"""
raise NotImplementedError()
[docs]
def validate_no_optional(self):
"""
This is the same method as super().validate() except we temporarily
set self._allow_optional to False
so that we force all the fields in this class to be required.
"""
self._allow_optional = False
try:
self.validate()
except Exception as e:
raise e
# we need to ensure that even when an exception is raised,
# we still restore the _allow_optional attribute
finally:
self._allow_optional = True
[docs]
def get_execute_request_as_xml_response_from_tip_response(
root_node: str,
xml_str: str,
attrs: dict,
) -> str:
"""
Builds a soap envelope to respond to an "ExecuteRequestAsXML" request from HQ
:param root_node: Request nodename provided by HQ
:param form_data: Response data
:param attrs: Attributes to add to the interior Data node, passed through to HQ
:param hq_commands: (Optional) Instance of `.HqCommands` to send additional commands with the response
"""
response = f"""
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Body>
<ExecuteRequestAsXmlResponse xmlns="http://tempuri.org/">
<ExecuteRequestAsXmlResult>
{xml_str}
</ExecuteRequestAsXmlResult>
</ExecuteRequestAsXmlResponse>
</soap:Body>
</soap:Envelope>
"""
response = etree.XML(response, parser=XML_PARSER)
data_node = response.find(".//{*}Data")
if data_node is not None:
for key, value in sorted(attrs.items()):
data_node.attrib[key] = str(value)
return etree.tostring(response).decode()