import json
from typing import List, Optional
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ..queries.get_email_addresses_query import GetEmailAddressesQuery
[docs]
class GetEmailAddressesMapper(BaseDemographicInfoMapper):
def __init__(
self,
list_of_queries: List[BaseQuery] = None,
hq_credentials: Optional[HqCredentials] = None,
):
super().__init__(list_of_queries, hq_credentials)
[docs]
def parse_returned_queries(self, list_of_queries: List[BaseQuery]) -> List[str]:
"""
Handles the email address response from the core
:return: returns a list of emails
"""
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], GetEmailAddressesQuery), (
"Query must be an instance of FISIBSOpenAPI.queries.GetEmailAddressesQuery"
)
raw_core_resp = list_of_queries[0].raw_core_response
response = (
json.loads(raw_core_resp)
if not isinstance(raw_core_resp, dict)
else raw_core_resp
)
emails = []
customer_emails = response["Entity"]["email-addressesLst"]
for email in customer_emails:
customer_email = email["EmailAddr"]
if customer_email not in emails:
emails.append(customer_email)
return emails