Source code for q2_cores.FISIBSOpenAPI.mappers.get_email_addresses_mapper

import json
from typing import List, Optional

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from ..queries.get_email_addresses_query import GetEmailAddressesQuery


[docs] class GetEmailAddressesMapper(BaseDemographicInfoMapper): def __init__( self, list_of_queries: List[BaseQuery] = None, hq_credentials: Optional[HqCredentials] = None, ): super().__init__(list_of_queries, hq_credentials)
[docs] def parse_returned_queries(self, list_of_queries: List[BaseQuery]) -> List[str]: """ Handles the email address response from the core :return: returns a list of emails """ assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], GetEmailAddressesQuery), ( "Query must be an instance of FISIBSOpenAPI.queries.GetEmailAddressesQuery" ) raw_core_resp = list_of_queries[0].raw_core_response response = ( json.loads(raw_core_resp) if not isinstance(raw_core_resp, dict) else raw_core_resp ) emails = [] customer_emails = response["Entity"]["email-addressesLst"] for email in customer_emails: customer_email = email["EmailAddr"] if customer_email not in emails: emails.append(customer_email) return emails