import base64
from datetime import datetime
import json
import logging
from typing import Optional, List, Union, Type
import zlib
import dateutil
import requests
from lxml import etree, objectify
from lxml.objectify import ObjectifiedElement
from q2_sdk.hq.models import xml_helper
from q2_sdk.hq.table_row import TableRow
DEFAULT_LOGGER = logging.getLogger()
[docs]
class HqResponse:
    """
    Wraps a response from HQ up as a handy object to work with
    """
    def __init__(self, response: requests.Response, logger=None):
        """
        :param response: Body of HQ response
        """
        self.error_message = None
        self.error_code = None
        self.success = True
        self.headers = {}
        self.result_node: Optional[etree.Element] = None
        self.http_response: requests.Response = response
        legacy_types = (str, dict, bytes)
        if isinstance(response, legacy_types):  # Backwards compatibility
            raw_response = response
        else:
            raw_response = self.get_response_body(response)
            self.headers = response.headers
        self.raw_response = raw_response
        if not logger:
            logger = DEFAULT_LOGGER
        self.logger = logger
        # If this HQ response is not in the ok range then it should be
        if (
            response is not None
            and isinstance(response, requests.Response)
            and not response.ok
        ):
            self.success = False
            return
        # On some errors the raw response comes in as a dict, this needs to be checked before anything else
        if isinstance(raw_response, dict):
            message = str(raw_response)
            if raw_response.get("ExceptionMessage"):
                message = raw_response["ExceptionMessage"]
            if raw_response.get("RawResponse"):
                self.raw_response = raw_response.get("RawResponse")
            self.logger.error(message)
            self.success = False
            self.error_message = message
            return
        response_xml = self._objectify_response(raw_response)
        if "soap" in response_xml.nsmap:
            result = response_xml.xpath("//*[contains(name(), 'Result')]")
            if result:
                result = result[0]
            inner_response = result
            if result.text:
                inner_response = self._objectify_response(result.text)
        else:
            inner_response = response_xml
        inner_response = self._decode_compressed(inner_response)
        self.result_node = inner_response
        self.success = self.is_success(inner_response)
        if not self.success:
            self.error_code = self.get_error_code(inner_response)
        error_message = self.get_hq_return_error(inner_response)
        if error_message is not None:
            self.error_message = error_message
    def _objectify_response(self, raw_response):
        return objectify.fromstring(
            raw_response, parser=objectify.makeparser(huge_tree=True)
        )
    def _decode_compressed(self, response_xml):
        if response_xml.tag == "cmp":
            try:
                response_xml = self._objectify_response(
                    zlib.decompress(base64.b64decode(response_xml.text))
                )
            except Exception as err:
                self.logger.error("Error decompressing very large HQ response: %s", err)
        return response_xml
    @staticmethod
    def get_response_body(response: requests.Response):
        response.encoding = "utf-8"
        try:
            response_body = response.json()
        except ValueError:
            response_body = response.text.encode()
        if isinstance(response_body, str):
            response_body = response_body.encode()
        return response_body
    @staticmethod
    def is_success(inner_response):
        return inner_response.Result.ErrorCode.pyval == 0
    @staticmethod
    def get_error_code(inner_response):
        return inner_response.Result.ErrorCode.pyval
[docs]
    @staticmethod
    def get_hq_return_error(inner_response: etree.Element):
        """
        Parses HQ response and returns an error message if one exists
        :param inner_response: Result Node from HQ response
        """
        result_node = inner_response.find(".//Table/ResultInfo//Result")
        if result_node is None:
            result_node = inner_response.Result
        if result_node.ErrorCode.pyval != 0:
            error_desc = result_node.findtext(".//ErrorDescription")
            # Pass thru error messages are just whatever the Core tells us
            pass_thru_result = inner_response.findtext(
                ".//PassThruResp/StatusDescription"
            )
            if pass_thru_result:
                error_desc = f"{error_desc} CoreError: {pass_thru_result}".strip()
            return error_desc
        return None 
    @property
    def server_date_time(self) -> datetime:
        return dateutil.parser.parse(self.result_node.attrib["ServerDateTime"])
    @property
    def audit_id(self) -> Optional[int]:
        if self.result_node is not None:
            return int(self.result_node.attrib.get("AuditId", 0)) or None
        return None
    @property
    def hydra_error_return_code(self) -> Optional[int]:
        """
        Retrieves the Hydra error return code from the response.
        :return: The Hydra error return code or None if not present.
        """
        if self.result_node is not None:
            try:
                hydra_error_code = getattr(
                    self.result_node.Result, "HydraErrorReturnCode", None
                )
                if hydra_error_code is not None:
                    return int(hydra_error_code.pyval) or None
            except (AttributeError, ValueError, TypeError):
                pass
        return None
    @property
    def hydra_error_return_code_description(self) -> Optional[str]:
        """
        Retrieves the Hydra error return code description from the response.
        :return: The Hydra error return code description or None if not present.
        """
        if self.result_node is not None:
            try:
                hydra_error_desc = getattr(
                    self.result_node.Result, "HydraErrorReturnCodeDescription", None
                )
                if hydra_error_desc is not None:
                    return hydra_error_desc.text or None
            except (AttributeError, TypeError):
                pass
        return None
    @property
    def hydra_exception_message(self) -> Optional[str]:
        """
        Retrieves the Hydra exception message from the response.
        :return: The Hydra exception message or None if not present.
        """
        if self.result_node is not None:
            try:
                hydra_exception = getattr(
                    self.result_node.Result, "HydraExceptionMessage", None
                )
                if hydra_exception is not None:
                    return hydra_exception.text or None
            except (AttributeError, TypeError):
                pass
        return None
[docs]
    def parse_sproc_return(
        self,
        specific_table: str = "Table",
        representation_row_class: Type[TableRow] | None = None,
    ) -> Union[List[TableRow], List[ObjectifiedElement]]:
        """
        If the response from the db is sufficiently large, it is split up among
        multiple Table elements. As such, we return a list of these Table elements
        """
        lxml_elements: List[ObjectifiedElement] = self.result_node.Data.findall(
            f".//{specific_table}"
        )
        if representation_row_class is None:
            return lxml_elements
        return [
            representation_row_class(lxml_element) for lxml_element in lxml_elements
        ] 
[docs]
    def json(self) -> dict:
        """Convert the result to a dictionary"""
        if self.result_node is None:
            raise AttributeError("result_node of HqResponse is None")
        return xml_helper.xml_to_dict(self.result_node) 
    def __repr__(self):
        try:
            json_dict = self.json()
            json_repr = json.dumps(json_dict, indent=2)
        except AttributeError:
            json_repr = "Invalid Response"
        default_return = super().__repr__()
        return f"{default_return}:\n{json_repr}"