Source code for q2_cores.Symitar.models.parser

from typing import Type

from lxml import objectify
from q2_cores.Symitar.models.message import PowerOnMessage

from . import MessageStatus, MessageType, SymConnectMessage
from . import SymXchangeMessageType, SymXchangeMessage
from ..exceptions import SymConnectMessageTypeError


[docs] def parse( message: str, unescape: bool = True ) -> SymConnectMessage | Type[SymXchangeMessage]: """ Parse a Symitar message into a SymConnectMessage or a SymXchangeMessage object. :param message: The message to parse :param unescape: Whether to unescape (xml) the message payload. Only relevant for SymXchange PowerOn message types. """ if message is None: return None message = message.strip() # Check for SymXchange messages. These are always XML, # at which we'll consider the starting char to be '<' if message.startswith("<"): return _parse_symxchange_message(message, unescape) # Check for SymConnect messages if message.startswith(MessageType.RSIQ.value): message_type = MessageType.RSIQ elif message.startswith(MessageType.RSHANDSHAKE.value): message_type = MessageType.RSHANDSHAKE elif message.startswith(MessageType.RSRG.value): message_type = MessageType.RSRG elif message.startswith(MessageType.RSFM.value): message_type = MessageType.RSFM elif message.startswith(MessageType.RSTR.value): message_type = MessageType.RSTR elif message.startswith(MessageType.RSWHAT.value): message_type = MessageType.RSWHAT else: raise SymConnectMessageTypeError( f"Unknown Symitar message type received: {message[:30]}" ) return _parse_fields(message, message_type)
def _get_q2_payload(message) -> str: payload = None data_start = message.find("<data>") data_end = message.find("</data>") fm_error_start = message.find("<fmerror>") fm_error_end = message.find("</fmerror>") data_section = message[data_start : data_end + len("</data>")] fm_error_section = message[fm_error_start : fm_error_end + len("</fmerror>")] if "JRGLINE" not in data_section: payload = data_section if "JRGLINE" not in fm_error_section and (payload is None or payload == ""): payload = fm_error_section return payload def _parse_fields(message: str, message_type: MessageType) -> SymConnectMessage: results = message.split("~") guid = None error_number = 0 error_description = None status = MessageStatus.Error lines = [] fields = {} payload = _get_q2_payload(message) if message_type == MessageType.RSHANDSHAKE: status = MessageStatus.Success elif message_type != MessageType.RSWHAT: i = 0 count = len(results) while i < count: field = results[i] if field == message_type.value: i += 1 guid = results[i] elif field.startswith("K"): if "K0" not in field: split = field[1:].split(":") error_number = int(split[0]) error_description = split[1] status = ( MessageStatus.Warning if error_number <= 10000 else MessageStatus.Error ) else: status = MessageStatus.Success elif not payload and field.startswith("JRGLINE="): # Spliting on entire JRGLINE= as there could be an '=' within the # value e.g. ~JRGLINE=<Node Type='31'> split = field.split("JRGLINE=") lines.append(split[1]) else: # Everything else should be in the format of key=value split = field.split("=") fields[split[0]] = split[1] i += 1 if not payload: payload = "".join(lines) payload = _escape_payload(payload) return SymConnectMessage( message, guid, message_type, status, payload, fields, error_number, error_description, ) def _parse_symxchange_message(message: str, unescape: bool) -> Type[SymXchangeMessage]: """Convert a SymXchange message into a SymXchangeMessage object""" symxchange_message: SymXchangeMessage element = objectify.fromstring(message) match element.find(".//*/*").tag: # noqa: E999 case SymXchangeMessageType.POWERON.value: symxchange_message = PowerOnMessage(message, element, unescape) case _: symxchange_message = SymXchangeMessage( message, element, SymXchangeMessageType.GENERIC ) return symxchange_message def _escape_payload(payload: str) -> str: payload = payload.replace("&amp;", "&") # Ensure we don't double escape things payload = payload.replace("&", "&amp;") return payload