Source code for q2_cores.CorePro.mappers.base_mapper

import json
from typing import List, Optional

from q2_sdk.core.configuration import Configuration, settings
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser

from q2_cores import exceptions
from q2_cores.CorePro.models import CoreProResponse
from ...CorePro.queries import CoreProBaseQuery


DEPLOY_ENV_MAP = {
    "PROD": "https://api.corepro.io",
    "STG": "https://sandbox-api.corepro.io",
    "DEV": "https://sandbox-api.corepro.io",
}


[docs] class CoreProBaseMapper(BaseMapper): def __init__( self, list_of_queries: List[CoreProBaseQuery], core_config: Configuration, hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, ): super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context ) self.use_bridge = core_config.USE_BRIDGE self.api_key = core_config.API_KEY self.api_secret = core_config.API_SECRET base_url = core_config.BASE_URL_OVERRIDE if not base_url: base_url = DEPLOY_ENV_MAP[settings.DEPLOY_ENV] self.base_url = base_url.rstrip("/")
[docs] @staticmethod def parse_returned_queries( list_of_queries: List[CoreProBaseQuery], ) -> CoreProResponse: assert len(list_of_queries) == 1 raw_response = list_of_queries[0].raw_core_response if isinstance(raw_response, dict): response = raw_response elif isinstance(raw_response, str): response = json.loads(raw_response) else: response = raw_response.json() response_obj = CoreProResponse(response) if response_obj.errors: raise exceptions.CoreException( f"The core returned an error: {response_obj.errors}" ) return response_obj
async def _run_queries(self): if self.use_bridge: await super()._run_queries() else: for query in self.list_of_queries: await query.execute_no_bridge( self.base_url, self.api_key, self.api_secret, )
[docs] async def execute(self) -> CoreProResponse: return await super().execute()