import base64
import json
from typing import Optional
from lxml import etree, objectify
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import ConfigurationError
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.hq import http
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.hq.models import xml_helper
from ..models.responses import CoreResponse
from ..models.core_user import CoreUser
from . import api_AdapterPassThru, api_AdapterPassThruZoned, ob_AdapterPassThru
[docs]
class BaseQuery:
"""
All queries will inherit from this common base class.
"""
def __init__(self, logger: Q2LoggerType, mock_failure=False):
"""
Object which builds the query to be send to the Core.
If server is running debug mode, will return mocked response instead of calling the core.
:param logger: Reference to calling request's logger (self.logger in your extension)
:param mock_failure: If server is running in debug mode, will be used while mocking the response
"""
self.logger = logger
self.raw_query = self.build()
self.core_response_obj: CoreResponse | None = None
self.raw_core_response = None
assert isinstance(mock_failure, bool)
self.mock_failure = mock_failure
def build(self) -> str:
raise NotImplementedError
[docs]
def mock_response(self):
"""
When run in DEBUG mode, the SDK will intercept calls
before they go to HQ's AdapterPassThru endpoint.
In these cases, the response will be whatever is returned
from this function.
"""
raise NotImplementedError
@staticmethod
def encode_pass_thru_data(raw_data):
if isinstance(raw_data, dict):
raw_data = json.dumps(raw_data)
return base64.b64encode(raw_data.encode()).decode()
@staticmethod
def _get_mock_raw_response(pass_thru_response):
raw_response = """
<Q2API HqVersion="4.1" HqAssemblyVersion="4.1" ServerDateTime="2017-02-17T08:20:00.3376068-08:00">
<Result>
<ErrorCode ErrorType="Success">0</ErrorCode>
<ErrorDescription />
<HydraErrorReturnCode>0</HydraErrorReturnCode>
</Result>
<Data>
<PassThruResult>
<PassThruResp>
<HostErrorCode>0</HostErrorCode>
<StatusDescription>Success</StatusDescription>
<PassThruData>{pass_thru_response}</PassThruData>
</PassThruResp>
</PassThruResult>
</Data>
</Q2API>
""".format(pass_thru_response=pass_thru_response)
raw_response = http.normalize_xml_str(raw_response)
return raw_response
@staticmethod
def _get_mock_raw_response_failure(pass_thru_response):
raw_response = """
<Q2API HqVersion="4.1" HqAssemblyVersion="4.1" ServerDateTime="2017-02-17T08:20:00.3376068-08:00">
<Result>
<ErrorCode ErrorType="Failure">-1</ErrorCode>
<ErrorDescription>Mocked failure core response</ErrorDescription>
<HydraErrorReturnCode>0</HydraErrorReturnCode>
</Result>
<Data>
<PassThruResult>
<PassThruResp>
<HostErrorCode>-100</HostErrorCode>
<StatusDescription>Core has failed</StatusDescription>
<PassThruData>{pass_thru_response}</PassThruData>
</PassThruResp>
</PassThruResult>
</Data>
</Q2API>
""".format(pass_thru_response=pass_thru_response)
raw_response = http.normalize_xml_str(raw_response)
return raw_response
[docs]
async def execute(
self,
hq_credentials: Optional[HqCredentials] = None,
debug: bool = settings.MOCK_BRIDGE_CALLS,
zone_context: Optional[CoreUser] = None,
):
"""Call Core with self.raw_query and save the response in self.core_response. If in debug mode,
return mocked response instead of actually calling the core.
:param hq_credentials: If provided, uses an existing HQ Session
:param debug: Return a mocked response
"""
if not hq_credentials:
hq_credentials = settings.HQ_CREDENTIALS
pass_thru_data = self.raw_query
if debug:
pass_thru_response = self.encode_pass_thru_data(self.mock_response())
if not self.mock_failure:
raw_response = self._get_mock_raw_response(pass_thru_response)
else:
raw_response = self._get_mock_raw_response_failure(pass_thru_response)
hq_response = HqResponse(raw_response)
else:
pass_thru_module = await self._get_pass_thru_module(
hq_credentials, zone_context
)
b64encoded_pass_thru = self.encode_pass_thru_data(pass_thru_data)
kwargs = {
"logger": self.logger,
"pass_thru_data": b64encoded_pass_thru,
"hq_credentials": hq_credentials,
}
if pass_thru_module == api_AdapterPassThruZoned:
kwargs.update(self._get_zoned_kwargs(zone_context))
self.logger.debug("Preferring zoned HQ Call")
params_obj = pass_thru_module.ParamsObj(**kwargs)
hq_response = await pass_thru_module.execute(params_obj, use_json=True)
self.core_response_obj = CoreResponse(hq_response)
self.raw_core_response = self.core_response_obj.raw_core_response
self.logger.debug("Core Call: %s", pass_thru_data)
self.logger.debug("Core Response: %s", self.raw_core_response)
@staticmethod
def _get_zoned_kwargs(zone_context: CoreUser):
order_of_preference = [
{
"name": "group",
"type": api_AdapterPassThruZoned.ContextType.GroupId,
"id_property": "group_id",
},
{
"name": "customer_id",
"type": api_AdapterPassThruZoned.ContextType.CustomerId,
"id_property": "customer_id",
},
{
"name": "user_id",
"type": api_AdapterPassThruZoned.ContextType.UserId,
"id_property": "user_id",
},
{
"name": "user_logon_id",
"type": api_AdapterPassThruZoned.ContextType.UserLogonId,
"id_property": "user_logon_id",
},
]
context_type = None
context_id = None
user_vars = vars(zone_context.online_user)
for i in order_of_preference:
id_property = i["id_property"]
if user_vars.get(id_property):
context_type = i["type"]
context_id = user_vars[id_property]
break
if context_type is None:
raise ConfigurationError("No zoneable information provided in zone_context")
return {"context_type": context_type, "context_id": context_id}
async def _get_pass_thru_module(
self, hq_credentials: HqCredentials, zone_context: Optional[CoreUser]
):
if hq_credentials and hq_credentials.auth_token:
pass_thru_module = ob_AdapterPassThru
else:
pass_thru_module = api_AdapterPassThru
if zone_context:
if zone_context.online_user and await hq_credentials.is_zoned():
pass_thru_module = api_AdapterPassThruZoned
return pass_thru_module
def objectify_xml_response(self) -> etree:
raw_core_response = self.raw_core_response
try:
encoding = xml_helper.autodetect_xml_encoding(raw_core_response)
raw_core_response = raw_core_response.encode(encoding)
except LookupError: # pragma: no cover
raw_core_response = raw_core_response.encode("utf-8")
return objectify.fromstring(raw_core_response)