from typing import List
from lxml import objectify
from q2_cores.exceptions import CoreException
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ..queries.initial_search_query import InitialSearchQuery
[docs]
class InitialSearchMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> str:
"""
Handles the demographic information response from the core
if ``THROW_ERROR_ON_MULTIPLE_CUSTPERM_ID`` is set to True, a CoreException error will be raised if multiple customer permanent IDs are returned from the core.
:return: a string of customer's permanent ID (CIF)
:raises CoreException: There was a core error
:raises CoreException: Unexpected multiple CustPermId returned
"""
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], InitialSearchQuery), (
"Query must be an instance of q2_cores.FiservCommunicatorSignature.queries.initial_search_query"
)
root = objectify.fromstring(list_of_queries[0].raw_core_response)
if hasattr(root.Status, "Error"):
raise CoreException(
f"Core Error: {root.Status.StatusDesc.text}, {root.Status.Error.ErrDesc.text}"
)
configs = list_of_queries[0].context.get("configs")
throw_error_mult_cust = getattr(
configs, "THROW_ERROR_ON_MULTIPLE_CUSTPERM_ID", False
)
if throw_error_mult_cust:
if len(root.CIFSvcRs.CustListInqRs.findall(".CustBasic")) > 1:
raise CoreException("Unexpected multiple CustPermId returned")
return root.CIFSvcRs.CustListInqRs.CustBasic.CustId.CustPermId.text