Source code for q2_cores.Phoenix.mappers.initial_search_demographic_mapper

import re
from lxml import objectify
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores import exceptions
from ..queries.demographic_info_query import DemographicInfoQuery


[docs] class InitialSearchMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: list[BaseQuery]): """ Parses a list of queries to return the demographic data (name, SSN, member number) of a user. :param list_of_queries: a :py:class:`List` of :py:class:`BaseQuery` objects to operate against :return: a Dict of user demographic data """ assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], DemographicInfoQuery), ( "Query must be an instance of Phoenix.queries.DemographicInfoQuery" ) raw_response = re.sub(r"<\?xml+.+\?>", "", list_of_queries[0].raw_core_response) response_obj = objectify.fromstring(raw_response) if response_obj.HRESULT.text != "0" or ( hasattr(response_obj, "RESPONSE") and response_obj.RESPONSE.UserReturnCode.text != "0" ): raise exceptions.CoreException( "There was a problem with the request to the core." ) record = response_obj.find("./RESPONSE/RECORD") first_name, last_name = record.FirstName.text, record.LastName.text middle_name = record.findtext("MiddleInitial", "") demo_data = { "full_name": f"{first_name} {middle_name} {last_name}", "first_name": first_name, "last_name": last_name, "middle_name": middle_name, "ssn": "".join(x for x in record.Tin.text if x.isdigit()), "member_number": record.RimNo.text, } return demo_data