import re
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ...exceptions import CoreException, CoreValidationException
from ..queries.initial_search_query import InitialSearchQuery
[docs]
class InitialSearchMapper(BaseMapper):
def __init__(
self, list_of_queries: list[BaseQuery], hq_credentials: HqCredentials = None
):
super().__init__(list_of_queries, hq_credentials=hq_credentials)
[docs]
def parse_returned_queries(self, list_of_queries: list[BaseQuery]) -> str:
"""
Parses a list of queries to return the customer's key.
:return: Unique customer identifier
:raises CoreValidationException: No user was found with the provided data points
:raises CoreException: A general error was returned from the core
"""
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], InitialSearchQuery), (
"Query must be instance of FISHorizon.queries.InitialSearchQuery"
)
data = list_of_queries[0].raw_core_response
rm01_matcher = re.compile(r"^HSRM01.+HEDS001(.+)~DE$")
rm01_match = rm01_matcher.match(data)
if not rm01_match:
if re.compile(r"^HSRM01.+HEDS000~DE$").match(data):
raise CoreValidationException(
"No user was found with the provided information"
)
elif re.compile(r"^HSRM01.+HEDS0\d+~(.+)~DE$").match(data):
raise CoreValidationException(
"Multiple users were found with the provided information"
)
raise CoreException(f"Couldn't find a valid RM01 response: {rm01_match}")
response = rm01_match.group(1)
split_response = response.split("~")
cus_key = split_response[1].lstrip("0")
return cus_key