from typing import Type
from lxml import objectify
from q2_cores.Symitar.models.message import PowerOnMessage
from . import MessageStatus, MessageType, SymConnectMessage
from . import SymXchangeMessageType, SymXchangeMessage
from ..exceptions import SymConnectMessageTypeError
[docs]
def parse(
message: str, unescape: bool = True
) -> SymConnectMessage | Type[SymXchangeMessage]:
"""
Parse a Symitar message into a SymConnectMessage or a SymXchangeMessage object.
:param message: The message to parse
:param unescape: Whether to unescape (xml) the message payload. Only relevant for SymXchange PowerOn message types.
"""
if message is None:
return None
message = message.strip()
# Check for SymXchange messages. These are always XML,
# at which we'll consider the starting char to be '<'
if message.startswith("<"):
return _parse_symxchange_message(message, unescape)
# Check for SymConnect messages
if message.startswith(MessageType.RSIQ.value):
message_type = MessageType.RSIQ
elif message.startswith(MessageType.RSHANDSHAKE.value):
message_type = MessageType.RSHANDSHAKE
elif message.startswith(MessageType.RSRG.value):
message_type = MessageType.RSRG
elif message.startswith(MessageType.RSFM.value):
message_type = MessageType.RSFM
elif message.startswith(MessageType.RSTR.value):
message_type = MessageType.RSTR
elif message.startswith(MessageType.RSWHAT.value):
message_type = MessageType.RSWHAT
else:
raise SymConnectMessageTypeError(
f"Unknown Symitar message type received: {message[:30]}"
)
return _parse_fields(message, message_type)
def _get_q2_payload(message) -> str:
payload = None
data_start = message.find("<data>")
data_end = message.find("</data>")
fm_error_start = message.find("<fmerror>")
fm_error_end = message.find("</fmerror>")
data_section = message[data_start : data_end + len("</data>")]
fm_error_section = message[fm_error_start : fm_error_end + len("</fmerror>")]
if "JRGLINE" not in data_section:
payload = data_section
if "JRGLINE" not in fm_error_section and (payload is None or payload == ""):
payload = fm_error_section
return payload
def _parse_fields(message: str, message_type: MessageType) -> SymConnectMessage:
results = message.split("~")
guid = None
error_number = 0
error_description = None
status = MessageStatus.Error
lines = []
fields = {}
payload = _get_q2_payload(message)
if message_type == MessageType.RSHANDSHAKE:
status = MessageStatus.Success
elif message_type != MessageType.RSWHAT:
i = 0
count = len(results)
while i < count:
field = results[i]
if field == message_type.value:
i += 1
guid = results[i]
elif field.startswith("K"):
if "K0" not in field:
split = field[1:].split(":")
error_number = int(split[0])
error_description = split[1]
status = (
MessageStatus.Warning
if error_number <= 10000
else MessageStatus.Error
)
else:
status = MessageStatus.Success
elif not payload and field.startswith("JRGLINE="):
# Spliting on entire JRGLINE= as there could be an '=' within the
# value e.g. ~JRGLINE=<Node Type='31'>
split = field.split("JRGLINE=")
lines.append(split[1])
else:
# Everything else should be in the format of key=value
split = field.split("=")
fields[split[0]] = split[1]
i += 1
if not payload:
payload = "".join(lines)
payload = _escape_payload(payload)
return SymConnectMessage(
message,
guid,
message_type,
status,
payload,
fields,
error_number,
error_description,
)
def _parse_symxchange_message(message: str, unescape: bool) -> Type[SymXchangeMessage]:
"""Convert a SymXchange message into a SymXchangeMessage object"""
symxchange_message: SymXchangeMessage
element = objectify.fromstring(message)
match element.find(".//*/*").tag: # noqa: E999
case SymXchangeMessageType.POWERON.value:
symxchange_message = PowerOnMessage(message, element, unescape)
case _:
symxchange_message = SymXchangeMessage(
message, element, SymXchangeMessageType.GENERIC
)
return symxchange_message
def _escape_payload(payload: str) -> str:
payload = payload.replace("&", "&") # Ensure we don't double escape things
payload = payload.replace("&", "&")
return payload