from typing import List
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores.exceptions import CoreException
from q2_cores.XP2.queries.initial_search_query import (
InitialSearchQuery,
InitialSearchSSNQuery,
)
from .xp2_mapper import XP2Mapper
[docs]
class InitialSearchMapper(XP2Mapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> str:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], InitialSearchQuery), (
"Query must be an instance of XP2.queries.InitialSearchQuery"
)
response = list_of_queries[0].raw_core_response
root = super(InitialSearchMapper, InitialSearchMapper).unpack_return_mesage(
response
)
if hasattr(root, "ErrorMessage"):
raise CoreException(
f"Core returned unsuccessful status: {root.ErrorMessage.Message.text}"
)
return root.PrimaryIndividualId.text
[docs]
class InitialSearchSSNMapper(XP2Mapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> str:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], InitialSearchSSNQuery), (
"Query must be an instance of XP2.queries.InitialSearchSSNQuery"
)
response = list_of_queries[0].raw_core_response
root = super(
InitialSearchSSNMapper, InitialSearchSSNMapper
).unpack_return_mesage(response)
if hasattr(root, "ErrorMessage"):
raise CoreException(
f"Core returned unsuccessful status: {root.ErrorMessage.Message.text}"
)
return root.IndividualLookupData.IndividualId.text