Source code for q2_cores.FISIBSOpenAPI.mappers.get_phone_numbers_mapper

import json
from typing import List, Optional

from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery

from ...exceptions import CoreError
from ...models import Phone, RecordType
from ...models import ExtendedPhoneType
from ..queries.get_phone_numbers_query import GetPhoneNumbersQuery


PHONE_TYPE_MAP = {
    "HME": ExtendedPhoneType.PERSONAL,
    "MBL": ExtendedPhoneType.CELL,
    "WRK": ExtendedPhoneType.BUSINESS,
}


[docs] class GetPhoneNumbersMapper(BaseDemographicInfoMapper): def __init__( self, list_of_queries: List[BaseQuery] = None, hq_credentials: Optional[HqCredentials] = None, ): super().__init__(list_of_queries, hq_credentials)
[docs] def parse_returned_queries(self, list_of_queries: List[BaseQuery]) -> List[Phone]: """ Handles the phone number response from the core :return: returns a list of phones """ assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], GetPhoneNumbersQuery), ( "Query must be an instance of FISIBSOpenAPI.queries.GetPhoneNumbersQuery" ) raw_core_resp = list_of_queries[0].raw_core_response response = ( json.loads(raw_core_resp) if not isinstance(raw_core_resp, dict) else raw_core_resp ) phones = [] try: customer_phones = response["Entity"]["phone-numbersLst"] except TypeError as err: raise CoreError(f"There was a problem with the request to the core. {err}") for phone in customer_phones: area_code = str(phone["AreaCde"]) local_number = str(phone["LocalNbr"]) phone_type = PHONE_TYPE_MAP.get(phone["ResnCde"], "") country_code = phone.get("CtryCde", "USA") record_type = ( RecordType.INTERNATIONAL if country_code and country_code != "USA" else RecordType.DOMESTIC ) extension = str(phone.get("Extn", "")) phone_obj = Phone( area_code, local_number, phone_type, record_type=record_type, extension=extension, country=country_code, provider_data=phone, ) phones.append(phone_obj) return phones