Source code for q2_cores.Symitar.models.message

from functools import cached_property
from typing import Any, List, Optional
from q2_sdk.hq.models import xml_helper
from . import MessageStatus, SymXchangeMessageType


[docs] class SymConnectMessage: """Data structure of a SymConnect message""" def __init__( self, raw, guid, message_type, status, payload: str = None, fields: dict = None, error_number: int = None, error_message: str = None, ): self.raw = raw self.guid = guid self.message_type = message_type self.status = status self.payload = payload self.fields = fields self.error_number = error_number self.error_message = error_message def __str__(self): return self.raw
[docs] def has_error(self): """Evaluates if the response is considered an error""" return True if self.status is MessageStatus.Error else False
[docs] def has_warning(self): """Evaluates if the response has a warning code""" return True if self.status is MessageStatus.Warning else False
[docs] class SymXchangeMessage: """Data structure of a SymXchangeResponse message""" def __init__(self, raw: str, element: Any, message_type: SymXchangeMessageType): """ Initialize a SymXchangeMessage object :param raw: The raw xml message (str) :param element: The xml element as a lxml object :param message_type: The SymXchange message type """ self.raw = raw self._element = element self._message_type = message_type @property def status(self) -> MessageStatus: match self.status_code: # noqa: E999 case 0: return MessageStatus.Success case code if code <= 10000: return MessageStatus.Warning case _: return MessageStatus.Error
[docs] @cached_property def status_code(self) -> Optional[int]: return xml_helper.find_with_default( self.element, "//Response/Body/StatusCode", default=0, data_type=int )
[docs] @cached_property def status_message(self) -> Optional[int]: return xml_helper.find_with_default( self.element, "//Response/Body/StatusMessage", data_type=str )
[docs] @cached_property def raw_message_type(self) -> Optional[str]: return self.element.find(".//*/*").tag
@property def message_type(self) -> SymXchangeMessageType: return self._message_type @property def element(self) -> Any: return self._element
[docs] def has_error(self): """Evaluates if the response is considered an error""" return True if self.status is MessageStatus.Error else False
[docs] def has_warning(self): """Evaluates if the response has a warning code""" return True if self.status is MessageStatus.Warning else False
[docs] def xml_unescape(self, data: str) -> str: """Unescape &amp;, &apos;, &quot;, &lt;, and &gt; in a string of data.""" data = data.replace("&lt;", "<") data = data.replace("&gt;", ">") data = data.replace("&apos;", "'") data = data.replace("&quot;", '"') return data.replace("&amp;", "&")
def __str__(self): return self.raw
[docs] class PowerOnMessage(SymXchangeMessage): """Data structure of a PowerOnResponse message""" def __init__(self, raw: str, element: Any, unescape: bool): super().__init__(raw, element, SymXchangeMessageType.POWERON) self._unescape = unescape
[docs] @cached_property def user_defined_chr_parameters(self) -> List[str]: """Returns the user defined character parameters""" return self.element.xpath( "//Response/Body/UserDefinedParameters/RGUserChr/Value" )
[docs] @cached_property def user_defined_num_parameters(self) -> List[int]: """Returns a list of user defined numeric parameters""" return list( map( int, self.element.xpath( "//Response/Body/UserDefinedParameters/RGUserNum/Value" ), ) )
[docs] @cached_property def message_id(self) -> str: """Unique Message ID assigned during the request""" return xml_helper.find_with_default( self.element, "//Response/Header/MessageID", data_type=str )
[docs] @cached_property def rg_state(self) -> str: """RGState value from the PowerOnResponse""" return xml_helper.find_with_default( self.element, "//Response/Header/RGState", data_type=str )
[docs] @cached_property def payload(self) -> Optional[str]: """Returns the string within the <RGLines> node""" payload = xml_helper.find_with_default( self.element, "//Response/Body/RGLines", data_type=str ) if payload and self._unescape: payload = self.xml_unescape(payload) # This really shouldn't be necessary as it should have been done at a layer above this but for backwards compatibility # keeping it. data_start = payload.find("<data>") data_end = payload.find("</data>") if data_start != -1 and data_end != -1: payload = payload[data_start : data_end + len("</data>")] return payload