import logging
from q2_sdk.models.cores.base_core import BaseCore
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_cores.SOA.mappers import initial_search_mapper, demographic_info_mapper
from q2_cores.SOA.queries import (
initial_search_query,
demographic_info_query,
address_query,
)
from ..SOA.models.soa_data import SOAData
[docs]
class Core(BaseCore):
CONFIG_FILE_NAME = "SOA_Core"
REQUIRED_CONFIGURATIONS = {
"GROUP_NAME": "Q2",
"INST_NUMBER": "1",
"IDENT_INFO_MAPPING": {},
"FLEX_FIELD_MAPPING": {},
}
OPTIONAL_CONFIGURATIONS = {"CIF_MAPPING": {"max_port_folio": False}}
def __init__(
self,
logger: logging.Logger,
core_user: CoreUser,
hq_credentials: str = None,
soa_data: SOAData = None,
**kwargs,
):
super().__init__(logger, core_user, hq_credentials=hq_credentials, **kwargs)
self._soa_data = soa_data
[docs]
async def gather_soa_data(self):
initial_mapper = await self.build_initial_search()
soa_data = await initial_mapper.execute()
self._soa_data = soa_data
[docs]
async def build_initial_search(self) -> initial_search_mapper.InitialSearchMapper:
initial_search = initial_search_query.InitialSearchQuery(
self.logger,
self.configured_user.ssn,
self.config.GROUP_NAME,
self.config.INST_NUMBER,
)
cif_mapping = getattr(self.config, "CIF_MAPPING", None)
return initial_search_mapper.InitialSearchMapper(
[initial_search],
cif_mapping,
hq_credentials=self.hq_credentials,
zone_context=self.core_user,
)
[docs]
async def build_demographic_info(
self,
) -> demographic_info_mapper.DemographicInfoMapper:
if not self._soa_data:
await self.gather_soa_data()
queries = []
for name in self._soa_data.name_lines:
queries.append(
demographic_info_query.DemographicInfoQuery(
self.logger, name, self.config.GROUP_NAME, self.config.INST_NUMBER
)
)
for port in self._soa_data.portfolios:
queries.append(
address_query.AddressQuery(
self.logger, port, self.config.GROUP_NAME, self.config.INST_NUMBER
)
)
return demographic_info_mapper.DemographicInfoMapper(
queries,
self.configured_user.ssn,
self.config.IDENT_INFO_MAPPING,
self.config.FLEX_FIELD_MAPPING,
self._soa_data.address_name_mapping,
primary_cif=self._soa_data.primary_cif,
hq_credentials=self.hq_credentials,
zone_context=self.core_user,
)