import base64
from datetime import datetime
import json
import logging
from typing import Optional, List, Union, Type
import zlib
import dateutil
import requests
from lxml import etree, objectify
from lxml.objectify import ObjectifiedElement
from q2_sdk.hq.db.representation_row_base import RepresentationRowBase
from q2_sdk.hq.models import xml_helper
from q2_sdk.hq.table_row import TableRow
DEFAULT_LOGGER = logging.getLogger()
[docs]
class HqResponse:
"""
Wraps a response from HQ up as a handy object to work with
"""
def __init__(self, response: requests.Response, logger=None):
"""
:param response: Body of HQ response
"""
self.error_message = None
self.error_code = None
self.success = True
self.headers = {}
self.result_node: Optional[etree.Element] = None
legacy_types = (str, dict, bytes)
if isinstance(response, legacy_types): # Backwards compatibility
raw_response = response
else:
raw_response = self.get_response_body(response)
self.headers = response.headers
self.raw_response = raw_response
if not logger:
logger = DEFAULT_LOGGER
self.logger = logger
# On some errors the raw response comes in as a dict, this needs to be checked before anything else
if isinstance(raw_response, dict):
message = str(raw_response)
if raw_response.get("ExceptionMessage"):
message = raw_response["ExceptionMessage"]
if raw_response.get("RawResponse"):
self.raw_response = raw_response.get("RawResponse")
self.logger.error(message)
self.success = False
self.error_message = message
return
response_xml = self._objectify_response(raw_response)
if "soap" in response_xml.nsmap:
result = response_xml.xpath("//*[contains(name(), 'Result')]")
if result:
result = result[0]
inner_response = result
if result.text:
inner_response = self._objectify_response(result.text)
else:
inner_response = response_xml
inner_response = self._decode_compressed(inner_response)
self.result_node = inner_response
self.success = self.is_success(inner_response)
if not self.success:
self.error_code = self.get_error_code(inner_response)
error_message = self.get_hq_return_error(inner_response)
if error_message is not None:
self.error_message = error_message
def _objectify_response(self, raw_response):
return objectify.fromstring(
raw_response, parser=objectify.makeparser(huge_tree=True)
)
def _decode_compressed(self, response_xml):
if response_xml.tag == "cmp":
try:
response_xml = self._objectify_response(
zlib.decompress(base64.b64decode(response_xml.text))
)
except Exception as err:
self.logger.error("Error decompressing very large HQ response: %s", err)
return response_xml
@staticmethod
def get_response_body(response: requests.Response):
response.encoding = "utf-8"
try:
response_body = response.json()
except ValueError:
response_body = response.text.encode()
if isinstance(response_body, str):
response_body = response_body.encode()
return response_body
@staticmethod
def is_success(inner_response):
return inner_response.Result.ErrorCode.pyval == 0
@staticmethod
def get_error_code(inner_response):
return inner_response.Result.ErrorCode.pyval
[docs]
@staticmethod
def get_hq_return_error(inner_response: etree.Element):
"""
Parses HQ response and returns an error message if one exists
:param inner_response: Result Node from HQ response
"""
result_node = inner_response.find(".//Table/ResultInfo//Result")
if result_node is None:
result_node = inner_response.Result
if result_node.ErrorCode.pyval != 0:
error_desc = result_node.findtext(".//ErrorDescription")
# Pass thru error messages are just whatever the Core tells us
pass_thru_result = inner_response.findtext(
".//PassThruResp/StatusDescription"
)
if pass_thru_result:
error_desc = f"{error_desc} CoreError: {pass_thru_result}".strip()
return error_desc
return None
@property
def server_date_time(self) -> datetime:
return dateutil.parser.parse(self.result_node.attrib["ServerDateTime"])
@property
def audit_id(self) -> Optional[int]:
if self.result_node is not None:
return int(self.result_node.attrib.get("AuditId", 0)) or None
return None
[docs]
def parse_sproc_return(
self,
specific_table: str = "Table",
representation_row_class: Type[RepresentationRowBase] | None = None,
) -> Union[List[TableRow], List[ObjectifiedElement]]:
"""
If the response from the db is sufficiently large, it is split up among
multiple Table elements. As such, we return a list of these Table elements
"""
lxml_elements: List[ObjectifiedElement] = self.result_node.Data.findall(
f".//{specific_table}"
)
if representation_row_class is None:
return lxml_elements
return [
TableRow(lxml_element, representation_row_class)
for lxml_element in lxml_elements
]
[docs]
def json(self) -> dict:
"""Convert the result to a dictionary"""
if self.result_node is None:
raise AttributeError("result_node of HqResponse is None")
return xml_helper.xml_to_dict(self.result_node)
def __repr__(self):
try:
json_dict = self.json()
json_repr = json.dumps(json_dict, indent=2)
except AttributeError:
json_repr = "Invalid Response"
default_return = super().__repr__()
return f"{default_return}:\n{json_repr}"