Source code for q2_cores.FISHorizon.mappers.initial_search_mapper

import re

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from ...exceptions import CoreException, CoreValidationException
from ..queries.initial_search_query import InitialSearchQuery


[docs] class InitialSearchMapper(BaseMapper): def __init__( self, list_of_queries: list[BaseQuery], hq_credentials: HqCredentials = None ): super().__init__(list_of_queries, hq_credentials=hq_credentials)
[docs] def parse_returned_queries(self, list_of_queries: list[BaseQuery]) -> str: """ Parses a list of queries to return the customer's key. :return: Unique customer identifier :raises CoreValidationException: No user was found with the provided data points :raises CoreException: A general error was returned from the core """ assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], InitialSearchQuery), ( "Query must be instance of FISHorizon.queries.InitialSearchQuery" ) data = list_of_queries[0].raw_core_response rm01_matcher = re.compile(r"^HSRM01.+HEDS001(.+)~DE$") rm01_match = rm01_matcher.match(data) if not rm01_match: if re.compile(r"^HSRM01.+HEDS000~DE$").match(data): raise CoreValidationException( "No user was found with the provided information" ) elif re.compile(r"^HSRM01.+HEDS0\d+~(.+)~DE$").match(data): raise CoreValidationException( "Multiple users were found with the provided information" ) raise CoreException(f"Couldn't find a valid RM01 response: {rm01_match}") response = rm01_match.group(1) split_response = response.split("~") cus_key = split_response[1].lstrip("0") return cus_key