from functools import cached_property
from typing import Any, List, Optional
from q2_sdk.hq.models import xml_helper
from . import MessageStatus, SymXchangeMessageType
[docs]
class SymConnectMessage:
"""Data structure of a SymConnect message"""
def __init__(
self,
raw,
guid,
message_type,
status,
payload: str = None,
fields: dict = None,
error_number: int = None,
error_message: str = None,
):
self.raw = raw
self.guid = guid
self.message_type = message_type
self.status = status
self.payload = payload
self.fields = fields
self.error_number = error_number
self.error_message = error_message
def __str__(self):
return self.raw
[docs]
def has_error(self):
"""Evaluates if the response is considered an error"""
return True if self.status is MessageStatus.Error else False
[docs]
def has_warning(self):
"""Evaluates if the response has a warning code"""
return True if self.status is MessageStatus.Warning else False
[docs]
class SymXchangeMessage:
"""Data structure of a SymXchangeResponse message"""
def __init__(self, raw: str, element: Any, message_type: SymXchangeMessageType):
"""
Initialize a SymXchangeMessage object
:param raw: The raw xml message (str)
:param element: The xml element as a lxml object
:param message_type: The SymXchange message type
"""
self.raw = raw
self._element = element
self._message_type = message_type
@property
def status(self) -> MessageStatus:
match self.status_code: # noqa: E999
case 0:
return MessageStatus.Success
case code if code <= 10000:
return MessageStatus.Warning
case _:
return MessageStatus.Error
[docs]
@cached_property
def status_code(self) -> Optional[int]:
return xml_helper.find_with_default(
self.element, "//Response/Body/StatusCode", default=0, data_type=int
)
[docs]
@cached_property
def status_message(self) -> Optional[int]:
return xml_helper.find_with_default(
self.element, "//Response/Body/StatusMessage", data_type=str
)
[docs]
@cached_property
def raw_message_type(self) -> Optional[str]:
return self.element.find(".//*/*").tag
@property
def message_type(self) -> SymXchangeMessageType:
return self._message_type
@property
def element(self) -> Any:
return self._element
[docs]
def has_error(self):
"""Evaluates if the response is considered an error"""
return True if self.status is MessageStatus.Error else False
[docs]
def has_warning(self):
"""Evaluates if the response has a warning code"""
return True if self.status is MessageStatus.Warning else False
[docs]
def xml_unescape(self, data: str) -> str:
"""Unescape &, ', ", <, and > in a string of data."""
data = data.replace("<", "<")
data = data.replace(">", ">")
data = data.replace("'", "'")
data = data.replace(""", '"')
return data.replace("&", "&")
def __str__(self):
return self.raw
[docs]
class PowerOnMessage(SymXchangeMessage):
"""Data structure of a PowerOnResponse message"""
def __init__(self, raw: str, element: Any, unescape: bool):
super().__init__(raw, element, SymXchangeMessageType.POWERON)
self._unescape = unescape
[docs]
@cached_property
def user_defined_chr_parameters(self) -> List[str]:
"""Returns the user defined character parameters"""
return self.element.xpath(
"//Response/Body/UserDefinedParameters/RGUserChr/Value"
)
[docs]
@cached_property
def user_defined_num_parameters(self) -> List[int]:
"""Returns a list of user defined numeric parameters"""
return list(
map(
int,
self.element.xpath(
"//Response/Body/UserDefinedParameters/RGUserNum/Value"
),
)
)
[docs]
@cached_property
def message_id(self) -> str:
"""Unique Message ID assigned during the request"""
return xml_helper.find_with_default(
self.element, "//Response/Header/MessageID", data_type=str
)
[docs]
@cached_property
def rg_state(self) -> str:
"""RGState value from the PowerOnResponse"""
return xml_helper.find_with_default(
self.element, "//Response/Header/RGState", data_type=str
)
[docs]
@cached_property
def payload(self) -> Optional[str]:
"""Returns the string within the <RGLines> node"""
payload = xml_helper.find_with_default(
self.element, "//Response/Body/RGLines", data_type=str
)
if payload and self._unescape:
payload = self.xml_unescape(payload)
# This really shouldn't be necessary as it should have been done at a layer above this but for backwards compatibility
# keeping it.
data_start = payload.find("<data>")
data_end = payload.find("</data>")
if data_start != -1 and data_end != -1:
payload = payload[data_start : data_end + len("</data>")]
return payload