Source code for q2_cores.SOA.mappers.initial_search_mapper

from typing import List, Optional
from lxml import objectify

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery

from q2_cores.exceptions import CoreException
from q2_cores.SOA.queries.initial_search_query import InitialSearchQuery
from q2_cores.SOA.models.soa_data import SOAData


[docs] class InitialSearchMapper(BaseMapper): def __init__( self, list_of_queries: List[BaseQuery], cif_mapping: dict = None, hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, ): self.cleaned_queries = None self.cif_mapping = cif_mapping or {} super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context )
[docs] def parse_returned_queries(self, list_of_queries: List[BaseQuery]) -> SOAData: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], InitialSearchQuery), ( "Query must be an instance of SOA.queries.InitialSearchQuery" ) response = list_of_queries[0].raw_core_response root = objectify.fromstring(response) status = root.Status.StatusCode.text # Should always be there, error if not if status != "0": raise CoreException( f"Core returned unsuccessful status: {root.Status.StatusDesc.text}" ) found_portfolios = [] found_name_lines = [] address_name_mapping = {} port_numbers = {} for node in root.CIFC: if node.get("Response") == "PortfolioNameRelationships": found_portfolios.append(node.RelatedToId.text) found_name_lines.append(node.RelationshipId.text) port_numbers[int(node.RelatedToId.text)] = node.PortSequenceNumber.text elif node.get("Response") == "NameAddressRelationships": name_line = node.RelationshipId.text address_id = node.RelatedToId.text address_name_mapping[name_line] = address_id if self.cif_mapping.get("max_port_folio", False): port = max(key for key, value in port_numbers.items()) else: port = min(key for key, value in port_numbers.items()) primary_cif = f"{port}{port_numbers[port].zfill(2)}" return SOAData( primary_cif, found_portfolios, found_name_lines, address_name_mapping )