Source code for q2_cores.CorePro.queries.base_query

import asyncio
import json
import logging
from typing import Optional
from xml.sax import saxutils

from lxml import etree
from lxml.builder import E

from q2_sdk.hq import http
from q2_sdk.core.q2_requests import Q2RequestInterface
from q2_sdk.core import q2_requests
from q2_sdk.core.configuration import settings
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.models.cores.models.responses import CoreResponse

from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from .error_codes import ERROR_CODE_MAP

if "MOCK_BRIDGE_CALLS" in vars(settings):
    MOCK_BRIDGE_CALLS = settings.MOCK_BRIDGE_CALLS
else:  # pragma: no cover
    MOCK_BRIDGE_CALLS = settings.DEBUG


[docs] class CoreProBaseQuery(BaseQuery): def __init__( self, logger: logging.Logger, url_path: str, verb: str, body: Optional[dict] = None, mock_response: Optional[dict] = None, mock_error_code: Optional[int] = None, ): """ Generic call builder for CorePro as per the docs: https://docs.corepro.io/api/overview :param logger: Reference to calling request's logger (self.logger in your extension) :param url_path: URL according to CorePro docs (/customer/get/id for instance) :verb: GET, POST, PUT, etc. :body: (Optional) JSON payload :mock_response: (Optional) The response to emulate what CorePro would respond with :mock_error_code: (Optional) If provided, will return an error state for the mock """ assert not (mock_response and mock_error_code), ( "Only one of mock_response or mock_error_code should be provided" ) self.url_path = url_path self.verb = verb self.body = body self.mock_response_dict = mock_response self.mock_error_code = mock_error_code super().__init__(logger)
[docs] def build(self) -> str: """build Build passthru string Builds the adapter passthru xml string. For POST passthru requests, request body is json serialized and dumped as PassThru element content. Expected PassThru format: POST: <PassThru urlpath="/rest/path" method="POST">{ "json":"body" }</PassThru> GET: <PassThru urlpath="/rest/path" method="GET"/> :return: Passthru xml string """ if self.body: body = saxutils.escape(json.dumps(self.body)) xml = E.PassThru(body, urlpath=self.url_path, method=self.verb) else: # Unfortunately, this doesn't work the same by passing None # to the first parameter, so we need this else block xml = E.PassThru(urlpath=self.url_path, method=self.verb) xml_as_bytes = etree.tostring(xml) return xml_as_bytes.decode()
[docs] def mock_response(self): mock_response = self.mock_response_dict if self.mock_error_code: error = ERROR_CODE_MAP.get( int(self.mock_error_code), ERROR_CODE_MAP["default"] ) mock_response = { "errors": [{"code": self.mock_error_code, "message": error["Message"]}], "requestId": "54128665-e29d-4934-97fe-bc2912c146d8", "status": error["http_error_code"], } return mock_response
def _build_json(self, full_url_path, api_key, api_secret) -> str: request_obj = Q2RequestInterface(self.logger) if self.verb.upper() == "GET": request = q2_requests.get( self.logger, full_url_path, auth=(api_key, api_secret), verify_whitelist=False, ) elif self.verb.upper() == "POST": request = q2_requests.post( self.logger, full_url_path, json=self.body, auth=(api_key, api_secret), verify_whitelist=False, ) else: request = request_obj.call( full_url_path, self.verb, json=self.body, auth=(api_key, api_secret), verify_whitelist=False, ) return request
[docs] async def execute_no_bridge( self, base_url: str, api_key: str, api_secret: str, debug: bool = MOCK_BRIDGE_CALLS, ): """ Unlike most cores, CorePro does not need a Q2Bridge as a translation layer. To use the Bridge variant, look at .execute """ full_url_path = f"{base_url}/{self.url_path.lstrip('/')}" if not debug: request = self._build_json(full_url_path, api_key, api_secret) result = await asyncio.create_task(request) result_str = result.json() else: result = self.mock_response() result_str = result self.raw_core_response = result self.simulate_hq_response(result_str) self.logger.debug( "Core Call: %s %s Body: %s", self.verb, full_url_path, self.body ) self.logger.debug("Core Response: %s", result_str)
[docs] def simulate_hq_response(self, result_str): """ The rest of the SDK framework expects an HQ shaped response. This coerces the raw response from CorePro into an HQ shape. """ pass_thru_response = self.encode_pass_thru_data(json.dumps(result_str)) result_node = """ <Result> <ErrorCode ErrorType="Success">0</ErrorCode> <ErrorDescription /> <HydraErrorReturnCode>0</HydraErrorReturnCode> </Result> """ raw_response = """ <Q2API HqVersion="4.1" HqAssemblyVersion="4.1" ServerDateTime="2017-02-17T08:20:00.3376068-08:00"> {result_node} <Data> <PassThruResult> <PassThruResp> <HostErrorCode>0</HostErrorCode> <StatusDescription>Core has failed</StatusDescription> <PassThruData>{pass_thru_response}</PassThruData> </PassThruResp> </PassThruResult> </Data> </Q2API> """.format(result_node=result_node, pass_thru_response=pass_thru_response) raw_response = http.normalize_xml_str(raw_response) hq_response = HqResponse(raw_response) self.core_response_obj = CoreResponse(hq_response)
[docs] async def execute( self, hq_credentials: Optional[HqCredentials] = None, debug: bool = MOCK_BRIDGE_CALLS, zone_context: Optional[CoreUser] = None, ): await super().execute( hq_credentials=hq_credentials, debug=debug, zone_context=zone_context ) self.raw_core_response = json.loads(self.raw_core_response)