from enum import Enum
from timeit import default_timer
from lxml import etree
from q2_sdk.core.opentelemetry.span import Q2Span, Q2SpanAttributes
from q2_sdk.core.prometheus import MetricType, get_metric
from q2_sdk.hq.models.hq_params.base import BaseParameter
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.hq.models.hq_credentials import BackOfficeHqCredentials
from q2_sdk.hq import http
from ..hq_params.base import BaseParamsObj
[docs]
class BaseRequest:
"""
All HQ Communication will go through this class
It supports both SOAP and REST communication styles, though
ultimately an end user would just call the .execute method
and the response is turned into an object
"""
HQ_MODULE = None
def __init__(self, name, use_json=False, **kwargs):
self.name = name
self.response_class = HqResponse
self.env_nsmap = None
self.action_nsmap = None
self.include_hq_token = False
self.use_json = use_json
self.bt_handle = kwargs.get("bt_handle")
def build_soap(self, params_obj: BaseParamsObj):
root = etree.Element(
"{http://www.w3.org/2003/05/soap-envelope}Envelope", nsmap=self.env_nsmap
)
body = etree.SubElement(root, "{http://www.w3.org/2003/05/soap-envelope}Body")
action = etree.SubElement(body, self.name, nsmap=self.action_nsmap)
for name, value in params_obj.request_params.items():
if value is None:
continue
if isinstance(value, BaseParameter):
action.append(value.serialize_as_xml())
elif isinstance(value, list):
for item in value:
etree.SubElement(action, name).text = str(item)
elif isinstance(value, Enum):
etree.SubElement(action, name).text = str(value.value)
else:
if isinstance(value, bool):
value = value.real
value = value if value is not None else ""
etree.SubElement(action, name).text = str(value)
return etree.tostring(root).decode()
@staticmethod
def build_json(params_obj: BaseParamsObj):
body = {}
for name, value in params_obj.request_params.items():
if isinstance(value, BaseParameter):
body[name] = value.serialize_as_json()
elif isinstance(value, Enum):
body[name] = value.value
else:
body[name] = value
return body
async def _send_request(self, url, body, params_obj, **kwargs) -> HqResponse:
hq_credentials = params_obj.hq_credentials
hq_token = None
backoffice_cookie = None
if isinstance(hq_credentials, BackOfficeHqCredentials):
backoffice_cookie = await params_obj.hq_credentials.get_cookie()
hq_credentials = params_obj.hq_credentials.base_hq_credentials
if self.include_hq_token:
hq_token = hq_credentials.auth_token
try:
response = await http.call_hq(
body,
url,
params_obj.logger,
response_class=self.response_class,
hq_token=hq_token,
bt_handle=self.bt_handle,
backoffice_cookie=backoffice_cookie,
**kwargs,
)
except Exception:
params_obj.logger.exception("Failed to send HQ request")
raise
else:
return response
finally:
if isinstance(hq_credentials, BackOfficeHqCredentials):
if hq_credentials.auto_logoff:
hq_credentials.auto_logoff = (
False # Turn it off so we don't recursively call logoff
)
await hq_credentials.logoff()
[docs]
async def get_json(self, params_obj: BaseParamsObj, **kwargs) -> HqResponse:
"""Call HQ using a json structure"""
url = "{}/{}".format(params_obj.hq_url + ".ashx", self.name)
body = self.build_json(params_obj)
response = await self._send_request(url, body, params_obj, **kwargs)
return response
[docs]
async def get_soap(self, params_obj: BaseParamsObj, **kwargs) -> HqResponse:
"""Call HQ using a soap structure"""
url = params_obj.hq_url + ".asmx"
body = self.build_soap(params_obj)
response = await self._send_request(url, body, params_obj, **kwargs)
return response
[docs]
@Q2Span.instrument()
async def execute(self, params_obj: BaseParamsObj, **kwargs) -> HqResponse:
"""
:param params_obj: Object containing everything necessary to call this HQ endpoint
:param kwargs: Optional keyword args as defined for q2_requests.post() e.g. "timeout"
"""
start_time = default_timer()
if self.use_json:
response = await self.get_json(params_obj, **kwargs)
else:
response = await self.get_soap(params_obj, **kwargs)
end_time = default_timer()
Q2Span.set_attributes({
Q2SpanAttributes.HQ_MODULE: self.HQ_MODULE,
Q2SpanAttributes.HQ_ENDPOINT: self.name,
Q2SpanAttributes.HQ_URL: params_obj.hq_credentials.hq_url,
})
get_metric(
MetricType.Histogram,
"caliper_hq_requests",
"Calls to HQ",
{
"module": self.HQ_MODULE,
"endpoint": self.name,
"hq_url": params_obj.hq_credentials.hq_url,
},
chain={"op": "observe", "params": [end_time - start_time]},
)
return response