import asyncio
import json
import logging
from typing import Optional
from xml.sax import saxutils
from lxml import etree
from lxml.builder import E
from q2_sdk.hq import http
from q2_sdk.core.q2_requests import Q2RequestInterface
from q2_sdk.core import q2_requests
from q2_sdk.core.configuration import settings
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.hq.models.hq_response import HqResponse
from q2_sdk.models.cores.models.responses import CoreResponse
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from .error_codes import ERROR_CODE_MAP
if "MOCK_BRIDGE_CALLS" in vars(settings):
MOCK_BRIDGE_CALLS = settings.MOCK_BRIDGE_CALLS
else: # pragma: no cover
MOCK_BRIDGE_CALLS = settings.DEBUG
[docs]
class CoreProBaseQuery(BaseQuery):
def __init__(
self,
logger: logging.Logger,
url_path: str,
verb: str,
body: Optional[dict] = None,
mock_response: Optional[dict] = None,
mock_error_code: Optional[int] = None,
):
"""
Generic call builder for CorePro as per the docs: https://docs.corepro.io/api/overview
:param logger: Reference to calling request's logger (self.logger in your extension)
:param url_path: URL according to CorePro docs (/customer/get/id for instance)
:verb: GET, POST, PUT, etc.
:body: (Optional) JSON payload
:mock_response: (Optional) The response to emulate what CorePro would respond with
:mock_error_code: (Optional) If provided, will return an error state for the mock
"""
assert not (mock_response and mock_error_code), (
"Only one of mock_response or mock_error_code should be provided"
)
self.url_path = url_path
self.verb = verb
self.body = body
self.mock_response_dict = mock_response
self.mock_error_code = mock_error_code
super().__init__(logger)
[docs]
def build(self) -> str:
"""build Build passthru string
Builds the adapter passthru xml string.
For POST passthru requests, request body is json serialized and dumped as PassThru element content.
Expected PassThru format:
POST:
<PassThru urlpath="/rest/path" method="POST">{ "json":"body" }</PassThru>
GET:
<PassThru urlpath="/rest/path" method="GET"/>
:return: Passthru xml string
"""
if self.body:
body = saxutils.escape(json.dumps(self.body))
xml = E.PassThru(body, urlpath=self.url_path, method=self.verb)
else:
# Unfortunately, this doesn't work the same by passing None
# to the first parameter, so we need this else block
xml = E.PassThru(urlpath=self.url_path, method=self.verb)
xml_as_bytes = etree.tostring(xml)
return xml_as_bytes.decode()
[docs]
def mock_response(self):
mock_response = self.mock_response_dict
if self.mock_error_code:
error = ERROR_CODE_MAP.get(
int(self.mock_error_code), ERROR_CODE_MAP["default"]
)
mock_response = {
"errors": [{"code": self.mock_error_code, "message": error["Message"]}],
"requestId": "54128665-e29d-4934-97fe-bc2912c146d8",
"status": error["http_error_code"],
}
return mock_response
def _build_json(self, full_url_path, api_key, api_secret) -> str:
request_obj = Q2RequestInterface(self.logger)
if self.verb.upper() == "GET":
request = q2_requests.get(
self.logger,
full_url_path,
auth=(api_key, api_secret),
verify_whitelist=False,
)
elif self.verb.upper() == "POST":
request = q2_requests.post(
self.logger,
full_url_path,
json=self.body,
auth=(api_key, api_secret),
verify_whitelist=False,
)
else:
request = request_obj.call(
full_url_path,
self.verb,
json=self.body,
auth=(api_key, api_secret),
verify_whitelist=False,
)
return request
[docs]
async def execute_no_bridge(
self,
base_url: str,
api_key: str,
api_secret: str,
debug: bool = MOCK_BRIDGE_CALLS,
):
"""
Unlike most cores, CorePro does not need a Q2Bridge as a
translation layer. To use the Bridge variant, look at .execute
"""
full_url_path = f"{base_url}/{self.url_path.lstrip('/')}"
if not debug:
request = self._build_json(full_url_path, api_key, api_secret)
result = await asyncio.create_task(request)
result_str = result.json()
else:
result = self.mock_response()
result_str = result
self.raw_core_response = result
self.simulate_hq_response(result_str)
self.logger.debug(
"Core Call: %s %s Body: %s", self.verb, full_url_path, self.body
)
self.logger.debug("Core Response: %s", result_str)
[docs]
def simulate_hq_response(self, result_str):
"""
The rest of the SDK framework expects an HQ shaped response.
This coerces the raw response from CorePro into an HQ shape.
"""
pass_thru_response = self.encode_pass_thru_data(json.dumps(result_str))
result_node = """
<Result>
<ErrorCode ErrorType="Success">0</ErrorCode>
<ErrorDescription />
<HydraErrorReturnCode>0</HydraErrorReturnCode>
</Result>
"""
raw_response = """
<Q2API HqVersion="4.1" HqAssemblyVersion="4.1" ServerDateTime="2017-02-17T08:20:00.3376068-08:00">
{result_node}
<Data>
<PassThruResult>
<PassThruResp>
<HostErrorCode>0</HostErrorCode>
<StatusDescription>Core has failed</StatusDescription>
<PassThruData>{pass_thru_response}</PassThruData>
</PassThruResp>
</PassThruResult>
</Data>
</Q2API>
""".format(result_node=result_node, pass_thru_response=pass_thru_response)
raw_response = http.normalize_xml_str(raw_response)
hq_response = HqResponse(raw_response)
self.core_response_obj = CoreResponse(hq_response)
[docs]
async def execute(
self,
hq_credentials: Optional[HqCredentials] = None,
debug: bool = MOCK_BRIDGE_CALLS,
zone_context: Optional[CoreUser] = None,
):
await super().execute(
hq_credentials=hq_credentials, debug=debug, zone_context=zone_context
)
self.raw_core_response = json.loads(self.raw_core_response)