import re
from lxml import objectify
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores import exceptions
from ..queries.demographic_info_query import DemographicInfoQuery
[docs]
class InitialSearchMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: list[BaseQuery]):
"""
Parses a list of queries to return the demographic data (name, SSN, member number) of a user.
:param list_of_queries: a :py:class:`List` of :py:class:`BaseQuery` objects to operate against
:return: a Dict of user demographic data
"""
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], DemographicInfoQuery), (
"Query must be an instance of Phoenix.queries.DemographicInfoQuery"
)
raw_response = re.sub(r"<\?xml+.+\?>", "", list_of_queries[0].raw_core_response)
response_obj = objectify.fromstring(raw_response)
if response_obj.HRESULT.text != "0" or (
hasattr(response_obj, "RESPONSE")
and response_obj.RESPONSE.UserReturnCode.text != "0"
):
raise exceptions.CoreException(
"There was a problem with the request to the core."
)
record = response_obj.find("./RESPONSE/RECORD")
first_name, last_name = record.FirstName.text, record.LastName.text
middle_name = record.findtext("MiddleInitial", "")
demo_data = {
"full_name": f"{first_name} {middle_name} {last_name}",
"first_name": first_name,
"last_name": last_name,
"middle_name": middle_name,
"ssn": "".join(x for x in record.Tin.text if x.isdigit()),
"member_number": record.RimNo.text,
}
return demo_data