from typing import List
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ..queries.get_emails_query import GetEmailsQuery
from ..models.email_model import EmailInfo
[docs]
class GetEmailsMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> List[EmailInfo]:
"""Returns Email Dict"""
assert len(list_of_queries) == 1, (
"OSI only knows how to deal with a single email query"
)
assert isinstance(list_of_queries[0], GetEmailsQuery), (
"Query must be an instance of OSI.queries.GetEmailsQuery"
)
emails = []
raw_itc50_response = list_of_queries[0].raw_core_response
itc50 = raw_itc50_response.replace("ITC\t", "\nITC\t")
itc50 = itc50.splitlines()
itc50 = [i for i in itc50 if i and i != "ITC\t1\t0"]
for line in itc50:
line_split = line.split("\t", 3)
row_type = line_split[2]
data = line_split[3]
if row_type == "111":
email_data = data.split("\t")
email_type = email_data[1]
email_description = email_data[2]
email = email_data[3]
email_object = EmailInfo(
email_address=email,
raw_email_type=email_type,
email_description=email_description,
)
emails.append(email_object)
return emails