Source code for q2_cores.FiservCommunicatorSignature.mappers.initial_search_mapper

from typing import List

from lxml import objectify
from q2_cores.exceptions import CoreException
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from ..queries.initial_search_query import InitialSearchQuery


[docs] class InitialSearchMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> str: """ Handles the demographic information response from the core if ``THROW_ERROR_ON_MULTIPLE_CUSTPERM_ID`` is set to True, a CoreException error will be raised if multiple customer permanent IDs are returned from the core. :return: a string of customer's permanent ID (CIF) :raises CoreException: There was a core error :raises CoreException: Unexpected multiple CustPermId returned """ assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], InitialSearchQuery), ( "Query must be an instance of q2_cores.FiservCommunicatorSignature.queries.initial_search_query" ) root = objectify.fromstring(list_of_queries[0].raw_core_response) if hasattr(root.Status, "Error"): raise CoreException( f"Core Error: {root.Status.StatusDesc.text}, {root.Status.Error.ErrDesc.text}" ) configs = list_of_queries[0].context.get("configs") throw_error_mult_cust = getattr( configs, "THROW_ERROR_ON_MULTIPLE_CUSTPERM_ID", False ) if throw_error_mult_cust: if len(root.CIFSvcRs.CustListInqRs.findall(".CustBasic")) > 1: raise CoreException("Unexpected multiple CustPermId returned") return root.CIFSvcRs.CustListInqRs.CustBasic.CustId.CustPermId.text