Source code for q2_sdk.models.cores.queries.base_query

import base64
import json
from typing import Optional

from lxml import etree, objectify
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import ConfigurationError
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.hq import http
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.hq.models import xml_helper

from ..models.responses import CoreResponse
from ..models.core_user import CoreUser
from . import api_AdapterPassThru, api_AdapterPassThruZoned, ob_AdapterPassThru


[docs] class BaseQuery: """ All queries will inherit from this common base class. """ def __init__(self, logger: Q2LoggerType, mock_failure=False): """ Object which builds the query to be send to the Core. If server is running debug mode, will return mocked response instead of calling the core. :param logger: Reference to calling request's logger (self.logger in your extension) :param mock_failure: If server is running in debug mode, will be used while mocking the response """ self.logger = logger self.raw_query = self.build() self.core_response_obj: CoreResponse | None = None self.raw_core_response = None assert isinstance(mock_failure, bool) self.mock_failure = mock_failure def build(self) -> str: raise NotImplementedError
[docs] def mock_response(self): """ When run in DEBUG mode, the SDK will intercept calls before they go to HQ's AdapterPassThru endpoint. In these cases, the response will be whatever is returned from this function. """ raise NotImplementedError
@staticmethod def encode_pass_thru_data(raw_data): if isinstance(raw_data, dict): raw_data = json.dumps(raw_data) return base64.b64encode(raw_data.encode()).decode() @staticmethod def _get_mock_raw_response(pass_thru_response): raw_response = """ <Q2API HqVersion="4.1" HqAssemblyVersion="4.1" ServerDateTime="2017-02-17T08:20:00.3376068-08:00"> <Result> <ErrorCode ErrorType="Success">0</ErrorCode> <ErrorDescription /> <HydraErrorReturnCode>0</HydraErrorReturnCode> </Result> <Data> <PassThruResult> <PassThruResp> <HostErrorCode>0</HostErrorCode> <StatusDescription>Success</StatusDescription> <PassThruData>{pass_thru_response}</PassThruData> </PassThruResp> </PassThruResult> </Data> </Q2API> """.format(pass_thru_response=pass_thru_response) raw_response = http.normalize_xml_str(raw_response) return raw_response @staticmethod def _get_mock_raw_response_failure(pass_thru_response): raw_response = """ <Q2API HqVersion="4.1" HqAssemblyVersion="4.1" ServerDateTime="2017-02-17T08:20:00.3376068-08:00"> <Result> <ErrorCode ErrorType="Failure">-1</ErrorCode> <ErrorDescription>Mocked failure core response</ErrorDescription> <HydraErrorReturnCode>0</HydraErrorReturnCode> </Result> <Data> <PassThruResult> <PassThruResp> <HostErrorCode>-100</HostErrorCode> <StatusDescription>Core has failed</StatusDescription> <PassThruData>{pass_thru_response}</PassThruData> </PassThruResp> </PassThruResult> </Data> </Q2API> """.format(pass_thru_response=pass_thru_response) raw_response = http.normalize_xml_str(raw_response) return raw_response
[docs] async def execute( self, hq_credentials: Optional[HqCredentials] = None, debug: bool = settings.MOCK_BRIDGE_CALLS, zone_context: Optional[CoreUser] = None, ): """Call Core with self.raw_query and save the response in self.core_response. If in debug mode, return mocked response instead of actually calling the core. :param hq_credentials: If provided, uses an existing HQ Session :param debug: Return a mocked response """ if not hq_credentials: hq_credentials = settings.HQ_CREDENTIALS pass_thru_data = self.raw_query if debug: pass_thru_response = self.encode_pass_thru_data(self.mock_response()) if not self.mock_failure: raw_response = self._get_mock_raw_response(pass_thru_response) else: raw_response = self._get_mock_raw_response_failure(pass_thru_response) hq_response = HqResponse(raw_response) else: pass_thru_module = await self._get_pass_thru_module( hq_credentials, zone_context ) b64encoded_pass_thru = self.encode_pass_thru_data(pass_thru_data) kwargs = { "logger": self.logger, "pass_thru_data": b64encoded_pass_thru, "hq_credentials": hq_credentials, } if pass_thru_module == api_AdapterPassThruZoned: kwargs.update(self._get_zoned_kwargs(zone_context)) self.logger.debug("Preferring zoned HQ Call") params_obj = pass_thru_module.ParamsObj(**kwargs) hq_response = await pass_thru_module.execute(params_obj, use_json=True) self.core_response_obj = CoreResponse(hq_response) self.raw_core_response = self.core_response_obj.raw_core_response self.logger.debug("Core Call: %s", pass_thru_data) self.logger.debug("Core Response: %s", self.raw_core_response)
@staticmethod def _get_zoned_kwargs(zone_context: CoreUser): order_of_preference = [ { "name": "group", "type": api_AdapterPassThruZoned.ContextType.GroupId, "id_property": "group_id", }, { "name": "customer_id", "type": api_AdapterPassThruZoned.ContextType.CustomerId, "id_property": "customer_id", }, { "name": "user_id", "type": api_AdapterPassThruZoned.ContextType.UserId, "id_property": "user_id", }, { "name": "user_logon_id", "type": api_AdapterPassThruZoned.ContextType.UserLogonId, "id_property": "user_logon_id", }, ] context_type = None context_id = None user_vars = vars(zone_context.online_user) for i in order_of_preference: id_property = i["id_property"] if user_vars.get(id_property): context_type = i["type"] context_id = user_vars[id_property] break if context_type is None: raise ConfigurationError("No zoneable information provided in zone_context") return {"context_type": context_type, "context_id": context_id} async def _get_pass_thru_module( self, hq_credentials: HqCredentials, zone_context: Optional[CoreUser] ): if hq_credentials and hq_credentials.auth_token: pass_thru_module = ob_AdapterPassThru else: pass_thru_module = api_AdapterPassThru if zone_context: if zone_context.online_user and await hq_credentials.is_zoned(): pass_thru_module = api_AdapterPassThruZoned return pass_thru_module def objectify_xml_response(self) -> etree: raw_core_response = self.raw_core_response try: encoding = xml_helper.autodetect_xml_encoding(raw_core_response) raw_core_response = raw_core_response.encode(encoding) except LookupError: # pragma: no cover raw_core_response = raw_core_response.encode("utf-8") return objectify.fromstring(raw_core_response)