from typing import List
from lxml import objectify
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ..models.message_status import MessageStatus
from ...Symitar.queries import SymitarBaseQuery
from ...Symitar.models import parser
[docs]
class EstatementEnrollmentInfoMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> dict:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], SymitarBaseQuery), (
"Query must be an instance of Symitar.queries.EstatementEnrollmentInfoQuery"
)
response = list_of_queries[0].raw_core_response
symitar_response = parser.parse(response)
if symitar_response.status != MessageStatus.Success:
return {
"success": False,
"error": symitar_response.error_number,
"error_message": symitar_response.error_message,
}
root = objectify.fromstring(symitar_response.payload)
if str(root.MEMOMODE.text).lower() == "true":
return {"success": False, "error": "", "error_message": "Memo Mode"}
data_dict = {
"success": True,
root.ACCOUNTNUMBER.text: {
"account_type": root.ACCOUNTTYPE.text,
"enabled": root.ESTATEMENTS.text,
"email": root.EMAIL.text,
},
}
return data_dict
[docs]
class EstatementUpdateMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> dict:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], SymitarBaseQuery), (
"Query must be an instance of Symitar.queries.EstatementUpdateQuery"
)
response = list_of_queries[0].raw_core_response
symitar_response = parser.parse(response)
if symitar_response.status != MessageStatus.Success:
return {
"success": False,
"error": symitar_response.error_number,
"error_message": symitar_response.error_message,
}
root = objectify.fromstring(symitar_response.payload)
if not root.text:
return {"success": True}
else:
return {"success": False, "error_message": root.text}