Source code for q2_cores.OSI.mappers.get_email_info

from typing import List
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from ..queries.get_emails_query import GetEmailsQuery
from ..models.email_model import EmailInfo


[docs] class GetEmailsMapper(BaseMapper):
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[BaseQuery]) -> List[EmailInfo]: """Returns Email Dict""" assert len(list_of_queries) == 1, ( "OSI only knows how to deal with a single email query" ) assert isinstance(list_of_queries[0], GetEmailsQuery), ( "Query must be an instance of OSI.queries.GetEmailsQuery" ) emails = [] raw_itc50_response = list_of_queries[0].raw_core_response itc50 = raw_itc50_response.replace("ITC\t", "\nITC\t") itc50 = itc50.splitlines() itc50 = [i for i in itc50 if i and i != "ITC\t1\t0"] for line in itc50: line_split = line.split("\t", 3) row_type = line_split[2] data = line_split[3] if row_type == "111": email_data = data.split("\t") email_type = email_data[1] email_description = email_data[2] email = email_data[3] email_object = EmailInfo( email_address=email, raw_email_type=email_type, email_description=email_description, ) emails.append(email_object) return emails