Source code for q2_sdk.hq.models.hq_request.base

from enum import Enum
from timeit import default_timer

from lxml import etree
from q2_sdk.core.opentelemetry.span import Q2Span, Q2SpanAttributes

from q2_sdk.core.prometheus import MetricType, get_metric
from q2_sdk.hq.models.hq_params.base import BaseParameter
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.hq.models.hq_credentials import BackOfficeHqCredentials
from q2_sdk.hq import http
from ..hq_params.base import BaseParamsObj


[docs] class BaseRequest: """ All HQ Communication will go through this class It supports both SOAP and REST communication styles, though ultimately an end user would just call the .execute method and the response is turned into an object """ HQ_MODULE = None def __init__(self, name, use_json=False, **kwargs): self.name = name self.response_class = HqResponse self.env_nsmap = None self.action_nsmap = None self.include_hq_token = False self.use_json = use_json self.bt_handle = kwargs.get("bt_handle") def build_soap(self, params_obj: BaseParamsObj): root = etree.Element( "{http://www.w3.org/2003/05/soap-envelope}Envelope", nsmap=self.env_nsmap ) body = etree.SubElement(root, "{http://www.w3.org/2003/05/soap-envelope}Body") action = etree.SubElement(body, self.name, nsmap=self.action_nsmap) for name, value in params_obj.request_params.items(): if value is None: continue if isinstance(value, BaseParameter): action.append(value.serialize_as_xml()) elif isinstance(value, list): for item in value: etree.SubElement(action, name).text = str(item) elif isinstance(value, Enum): etree.SubElement(action, name).text = str(value.value) else: if isinstance(value, bool): value = value.real value = value if value is not None else "" etree.SubElement(action, name).text = str(value) return etree.tostring(root).decode() @staticmethod def build_json(params_obj: BaseParamsObj): body = {} for name, value in params_obj.request_params.items(): if isinstance(value, BaseParameter): body[name] = value.serialize_as_json() elif isinstance(value, Enum): body[name] = value.value else: body[name] = value return body async def _send_request(self, url, body, params_obj, **kwargs) -> HqResponse: hq_credentials = params_obj.hq_credentials hq_token = None backoffice_cookie = None if isinstance(hq_credentials, BackOfficeHqCredentials): backoffice_cookie = await params_obj.hq_credentials.get_cookie() hq_credentials = params_obj.hq_credentials.base_hq_credentials if self.include_hq_token: hq_token = hq_credentials.auth_token try: response = await http.call_hq( body, url, params_obj.logger, response_class=self.response_class, hq_token=hq_token, bt_handle=self.bt_handle, backoffice_cookie=backoffice_cookie, **kwargs, ) except Exception: params_obj.logger.exception("Failed to send HQ request") raise else: return response finally: if isinstance(hq_credentials, BackOfficeHqCredentials): if hq_credentials.auto_logoff: hq_credentials.auto_logoff = ( False # Turn it off so we don't recursively call logoff ) await hq_credentials.logoff()
[docs] async def get_json(self, params_obj: BaseParamsObj, **kwargs) -> HqResponse: """Call HQ using a json structure""" url = "{}/{}".format(params_obj.hq_url + ".ashx", self.name) body = self.build_json(params_obj) response = await self._send_request(url, body, params_obj, **kwargs) return response
[docs] async def get_soap(self, params_obj: BaseParamsObj, **kwargs) -> HqResponse: """Call HQ using a soap structure""" url = params_obj.hq_url + ".asmx" body = self.build_soap(params_obj) response = await self._send_request(url, body, params_obj, **kwargs) return response
[docs] @Q2Span.instrument() async def execute(self, params_obj: BaseParamsObj, **kwargs) -> HqResponse: """ :param params_obj: Object containing everything necessary to call this HQ endpoint :param kwargs: Optional keyword args as defined for q2_requests.post() e.g. "timeout" """ start_time = default_timer() if self.use_json: response = await self.get_json(params_obj, **kwargs) else: response = await self.get_soap(params_obj, **kwargs) end_time = default_timer() Q2Span.set_attributes({ Q2SpanAttributes.HQ_MODULE: self.HQ_MODULE, Q2SpanAttributes.HQ_ENDPOINT: self.name, Q2SpanAttributes.HQ_URL: params_obj.hq_credentials.hq_url, }) get_metric( MetricType.Histogram, "caliper_hq_requests", "Calls to HQ", { "module": self.HQ_MODULE, "endpoint": self.name, "hq_url": params_obj.hq_credentials.hq_url, }, chain={"op": "observe", "params": [end_time - start_time]}, ) return response