from typing import List
from lxml import objectify
from q2_sdk.models.cores.mappers.update_demographic_info import (
BaseUpdateDemographicInfoMapper,
)
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores.exceptions import CoreException
from q2_cores.JXchange.queries import UpdateDemographicInfoQuery
[docs]
class UpdateDemographicInfoMapper(BaseUpdateDemographicInfoMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> bool:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], UpdateDemographicInfoQuery), (
"Query must be an instance of JXchange.queries.UpdateDemographicInfoQuery"
)
response = list_of_queries[0].raw_core_response
root = objectify.fromstring(response)
response_obj = {}
for node in root.getchildren():
response_obj[node.tag] = node
try:
status = response_obj["RsStat"]
if status == "Success":
return True
return False
except Exception as err:
raise CoreException(response) from err