Source code for q2_cores.Symitar.mappers.get_estatement_info

from typing import List
from lxml import objectify

from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ..models.message_status import MessageStatus
from ...Symitar.queries import SymitarBaseQuery
from ...Symitar.models import parser


[docs] class EstatementEnrollmentInfoMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> dict: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], SymitarBaseQuery), ( "Query must be an instance of Symitar.queries.EstatementEnrollmentInfoQuery" ) response = list_of_queries[0].raw_core_response symitar_response = parser.parse(response) if symitar_response.status != MessageStatus.Success: return { "success": False, "error": symitar_response.error_number, "error_message": symitar_response.error_message, } root = objectify.fromstring(symitar_response.payload) if str(root.MEMOMODE.text).lower() == "true": return {"success": False, "error": "", "error_message": "Memo Mode"} data_dict = { "success": True, root.ACCOUNTNUMBER.text: { "account_type": root.ACCOUNTTYPE.text, "enabled": root.ESTATEMENTS.text, "email": root.EMAIL.text, }, } return data_dict
[docs] class EstatementUpdateMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> dict: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], SymitarBaseQuery), ( "Query must be an instance of Symitar.queries.EstatementUpdateQuery" ) response = list_of_queries[0].raw_core_response symitar_response = parser.parse(response) if symitar_response.status != MessageStatus.Success: return { "success": False, "error": symitar_response.error_number, "error_message": symitar_response.error_message, } root = objectify.fromstring(symitar_response.payload) if not root.text: return {"success": True} else: return {"success": False, "error_message": root.text}