Source code for q2_cores.XP2.mappers.initial_search_mapper

from typing import List

from q2_sdk.models.cores.queries.base_query import BaseQuery

from q2_cores.exceptions import CoreException
from q2_cores.XP2.queries.initial_search_query import (
    InitialSearchQuery,
    InitialSearchSSNQuery,
)
from .xp2_mapper import XP2Mapper


[docs] class InitialSearchMapper(XP2Mapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> str: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], InitialSearchQuery), ( "Query must be an instance of XP2.queries.InitialSearchQuery" ) response = list_of_queries[0].raw_core_response root = super(InitialSearchMapper, InitialSearchMapper).unpack_return_mesage( response ) if hasattr(root, "ErrorMessage"): raise CoreException( f"Core returned unsuccessful status: {root.ErrorMessage.Message.text}" ) return root.PrimaryIndividualId.text
[docs] class InitialSearchSSNMapper(XP2Mapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> str: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], InitialSearchSSNQuery), ( "Query must be an instance of XP2.queries.InitialSearchSSNQuery" ) response = list_of_queries[0].raw_core_response root = super( InitialSearchSSNMapper, InitialSearchSSNMapper ).unpack_return_mesage(response) if hasattr(root, "ErrorMessage"): raise CoreException( f"Core returned unsuccessful status: {root.ErrorMessage.Message.text}" ) return root.IndividualLookupData.IndividualId.text