Source code for q2_cores.Corelation.mappers.alert_details_mapper

from typing import List, Optional
from lxml import objectify
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores.Corelation import queries, models


[docs] class AlertDetailsMapper(BaseMapper): def __init__( self, list_of_queries: List[BaseQuery], hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, alert_type_details: Optional[dict] = None, ): super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context, ) self.alert_type_details: dict = alert_type_details or {}
[docs] def parse_returned_queries( self, list_of_queries: List[BaseQuery] ) -> List[models.BaseAlert]: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], queries.AlertDetailsQuery) response = list_of_queries[0].raw_core_response core_response = objectify.fromstring(response) # Loop through alerts and build BaseAlert instances from them serial_types = [ "ACCOUNT_SERIAL", "SHARE_SERIAL", "LOAN_SERIAL", "CARD_SERIAL", "LOGIN_SERIAL", ] alerts = [] for sequence in core_response.sequence: if sequence.transaction.get("result") == "failed": continue serial: int = int(sequence.transaction.step[1].record.targetSerial.text) account_serial: Optional[int] = None account: str = "" serial_type: str = "" days_before_event: Optional[int] = None alert_description: str = "" category_serial: Optional[int] = None minimum_amount: Optional[str] = None maximum_amount: Optional[str] = None person_contact_serial: Optional[int] = None contact_method: Optional[str] = None contact_address = get_contact_address( sequence.transaction.step[ 0 ].recordTree.resultTree.child.rowDescription.text ) for field in sequence.transaction.step[1].record.field: if field.columnName.text == "TYPE_SERIAL": alert_description = field.newContentsDescription.text category_serial = int(field.newContents.text) elif hasattr(field, "newContents"): if field.columnName.text in serial_types: serial_type = field.columnName.text account_serial = int(field.newContents.text) account = field.newContentsDescription.text elif field.columnName.text == "MINIMUM_AMOUNT": minimum_amount = field.newContents.text elif field.columnName.text == "MAXIMUM_AMOUNT": maximum_amount = field.newContents.text elif field.columnName.text == "DAYS_BEFORE_EVENT": days_before_event = int(field.newContents.text) for field in sequence.transaction.step[2].record.field: if hasattr(field, "newContents"): if field.columnName.text == "PERSON_CONTACT_SERIAL": person_contact_serial = int(field.newContents.text) elif field.columnName.text == "CONTACT_METHOD": contact_method = field.newContents.text category = getattr( self.alert_type_details.get(category_serial, {}), "category", "" ) if serial_type in ["LOAN_SERIAL", "CARD_SERIAL", "SHARE_SERIAL"]: alert = models.TransactionAlert( account=account, account_serial=account_serial, serial=serial, serial_type=serial_type, category=category, category_serial=category_serial, description=alert_description, contact_method=contact_method, contact_address=contact_address, person_contact_serial=person_contact_serial, days_before_event=days_before_event, minimum_amount=minimum_amount, maximum_amount=maximum_amount, ) else: alert = models.BaseAlert( account=account, account_serial=account_serial, serial=serial, serial_type=serial_type, category=category, category_serial=category_serial, description=alert_description, contact_method=contact_method, contact_address=contact_address, person_contact_serial=person_contact_serial, days_before_event=days_before_event, ) alerts.append(alert) return alerts
[docs] def get_contact_address(element_value: str) -> str: """Returns the contact address from a rowDescription element Example input: "Email tester@q2.com" Example output: "tester@q2.com" :param element_value: rowDescription element value :return: Contact address """ contact_address = element_value.split(" ") return contact_address[-1]