Source code for q2_cores.Symitar.mappers.base_mapper

from typing import List

from q2_sdk.models.cores.mappers.base_mapper import BaseMapper

from ...Symitar.models import SymConnectMessage, SymXchangeMessage, parser
from ...Symitar.queries import SymitarBaseQuery


[docs] class SymitarBaseMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries( list_of_queries: List[SymitarBaseQuery], ) -> SymConnectMessage: assert len(list_of_queries) == 1 response = list_of_queries[0].raw_core_response data = parser.parse(response) return data
[docs] async def execute(self) -> SymConnectMessage: return await super().execute()
[docs] class SymXchangeBaseMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries( list_of_queries: List[SymitarBaseQuery], ) -> SymXchangeMessage: assert len(list_of_queries) == 1 response = list_of_queries[0].raw_core_response data = parser.parse(response) return data
[docs] async def execute(self) -> SymXchangeMessage: return await super().execute()