import json
from typing import List, Optional
from q2_sdk.core.configuration import Configuration, settings
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_cores import exceptions
from q2_cores.CorePro.models import CoreProResponse
from ...CorePro.queries import CoreProBaseQuery
DEPLOY_ENV_MAP = {
"PROD": "https://api.corepro.io",
"STG": "https://sandbox-api.corepro.io",
"DEV": "https://sandbox-api.corepro.io",
}
[docs]
class CoreProBaseMapper(BaseMapper):
def __init__(
self,
list_of_queries: List[CoreProBaseQuery],
core_config: Configuration,
hq_credentials: Optional[HqCredentials] = None,
zone_context: Optional[CoreUser] = None,
):
super().__init__(
list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context
)
self.use_bridge = core_config.USE_BRIDGE
self.api_key = core_config.API_KEY
self.api_secret = core_config.API_SECRET
base_url = core_config.BASE_URL_OVERRIDE
if not base_url:
base_url = DEPLOY_ENV_MAP[settings.DEPLOY_ENV]
self.base_url = base_url.rstrip("/")
[docs]
@staticmethod
def parse_returned_queries(
list_of_queries: List[CoreProBaseQuery],
) -> CoreProResponse:
assert len(list_of_queries) == 1
raw_response = list_of_queries[0].raw_core_response
if isinstance(raw_response, dict):
response = raw_response
elif isinstance(raw_response, str):
response = json.loads(raw_response)
else:
response = raw_response.json()
response_obj = CoreProResponse(response)
if response_obj.errors:
raise exceptions.CoreException(
f"The core returned an error: {response_obj.errors}"
)
return response_obj
async def _run_queries(self):
if self.use_bridge:
await super()._run_queries()
else:
for query in self.list_of_queries:
await query.execute_no_bridge(
self.base_url,
self.api_key,
self.api_secret,
)
[docs]
async def execute(self) -> CoreProResponse:
return await super().execute()