import json
from typing import List, Optional
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ...exceptions import CoreError
from ...models import Phone, RecordType
from ...models import ExtendedPhoneType
from ..queries.get_phone_numbers_query import GetPhoneNumbersQuery
PHONE_TYPE_MAP = {
"HME": ExtendedPhoneType.PERSONAL,
"MBL": ExtendedPhoneType.CELL,
"WRK": ExtendedPhoneType.BUSINESS,
}
[docs]
class GetPhoneNumbersMapper(BaseDemographicInfoMapper):
def __init__(
self,
list_of_queries: List[BaseQuery] = None,
hq_credentials: Optional[HqCredentials] = None,
):
super().__init__(list_of_queries, hq_credentials)
[docs]
def parse_returned_queries(self, list_of_queries: List[BaseQuery]) -> List[Phone]:
"""
Handles the phone number response from the core
:return: returns a list of phones
"""
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], GetPhoneNumbersQuery), (
"Query must be an instance of FISIBSOpenAPI.queries.GetPhoneNumbersQuery"
)
raw_core_resp = list_of_queries[0].raw_core_response
response = (
json.loads(raw_core_resp)
if not isinstance(raw_core_resp, dict)
else raw_core_resp
)
phones = []
try:
customer_phones = response["Entity"]["phone-numbersLst"]
except TypeError as err:
raise CoreError(f"There was a problem with the request to the core. {err}")
for phone in customer_phones:
area_code = str(phone["AreaCde"])
local_number = str(phone["LocalNbr"])
phone_type = PHONE_TYPE_MAP.get(phone["ResnCde"], "")
country_code = phone.get("CtryCde", "USA")
record_type = (
RecordType.INTERNATIONAL
if country_code and country_code != "USA"
else RecordType.DOMESTIC
)
extension = str(phone.get("Extn", ""))
phone_obj = Phone(
area_code,
local_number,
phone_type,
record_type=record_type,
extension=extension,
country=country_code,
provider_data=phone,
)
phones.append(phone_obj)
return phones