from typing import List, Optional
from lxml import objectify
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores.exceptions import CoreException
from q2_cores.SOA.queries.initial_search_query import InitialSearchQuery
from q2_cores.SOA.models.soa_data import SOAData
[docs]
class InitialSearchMapper(BaseMapper):
def __init__(
self,
list_of_queries: List[BaseQuery],
cif_mapping: dict = None,
hq_credentials: Optional[HqCredentials] = None,
zone_context: Optional[CoreUser] = None,
):
self.cleaned_queries = None
self.cif_mapping = cif_mapping or {}
super().__init__(
list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context
)
[docs]
def parse_returned_queries(self, list_of_queries: List[BaseQuery]) -> SOAData:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], InitialSearchQuery), (
"Query must be an instance of SOA.queries.InitialSearchQuery"
)
response = list_of_queries[0].raw_core_response
root = objectify.fromstring(response)
status = root.Status.StatusCode.text # Should always be there, error if not
if status != "0":
raise CoreException(
f"Core returned unsuccessful status: {root.Status.StatusDesc.text}"
)
found_portfolios = []
found_name_lines = []
address_name_mapping = {}
port_numbers = {}
for node in root.CIFC:
if node.get("Response") == "PortfolioNameRelationships":
found_portfolios.append(node.RelatedToId.text)
found_name_lines.append(node.RelationshipId.text)
port_numbers[int(node.RelatedToId.text)] = node.PortSequenceNumber.text
elif node.get("Response") == "NameAddressRelationships":
name_line = node.RelationshipId.text
address_id = node.RelatedToId.text
address_name_mapping[name_line] = address_id
if self.cif_mapping.get("max_port_folio", False):
port = max(key for key, value in port_numbers.items())
else:
port = min(key for key, value in port_numbers.items())
primary_cif = f"{port}{port_numbers[port].zfill(2)}"
return SOAData(
primary_cif, found_portfolios, found_name_lines, address_name_mapping
)