Source code for q2_cores.CoreAPI.mappers.demographic_info

from typing import List, Optional
from lxml import objectify

from q2_sdk.models.cores.mappers.demographic_info import BaseDemographicInfoMapper
from q2_sdk.models.cores.models.core_user import CoreUser
from q2_sdk.models.demographic import Address, DemographicInfo, Phone, PhoneType
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_cores.exceptions import CoreException
from ..queries import CoreAPIBaseQuery


[docs] class DemographicInfoMapper(BaseDemographicInfoMapper): def __init__( self, list_of_queries: List[CoreAPIBaseQuery], get_cif_from_user_fields: Optional[dict] = None, hq_credentials: Optional[HqCredentials] = None, zone_context: Optional[CoreUser] = None, ): super().__init__( list_of_queries, hq_credentials=hq_credentials, zone_context=zone_context ) self.get_cif_from_user_fields = get_cif_from_user_fields
[docs] def parse_returned_queries(self, list_of_queries: List[CoreAPIBaseQuery]): assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], CoreAPIBaseQuery) root = objectify.fromstring(list_of_queries[0].raw_core_response) if hasattr(root.SubmitRequestResult.Output.UserAuthentication.Errors, "Error"): raise CoreException( f"Core returned Error: " f"{root.SubmitRequestResult.Output.UserAuthentication.Errors.Error.ErrorMessage.text}" ) response_base = root.SubmitRequestResult.Output.Responses.ResponseBase if hasattr(response_base.Errors, "Error"): raise CoreException( f"Core returned Error: {response_base.Errors.Error.ErrorMessage.text}" ) primary_cif = response_base.findtext("MemberNumber", "") first_name = response_base.Name.findtext("FirstName", "") last_name = response_base.Name.findtext("LastName", "") middle_name = response_base.Name.findtext("MiddleName", " ") title = response_base.Name.findtext("Suffix", " ") ssn = response_base.Demographics.findtext("TaxId", "") dob = response_base.Demographics.findtext("DateOfBirth").split("T")[0] for address in response_base.Addresses.PersOrgAddress: if address.AddressUseCode.text == "PRI": address_line1 = "" address_line2 = "" for address_line in address.AddressLines.PersOrgAddressLine: if address_line.AddressLineNumber.text == "1": address_line1 = address_line.AddressLineText.text if address_line.AddressLineNumber.text == "2": address_line2 = address_line.AddressLineText.text zipcode = address.findtext("ZipCode") if address.findtext("ZipSuffix"): zipcode = f"{zipcode}-{address.ZipSuffix.text}" addresses = Address( address_line1, address_line2, address.findtext("CityName", ""), address.findtext("StateCode", ""), zipcode, ) emails = [] email_details = {} if hasattr(response_base.ElectronicAddresses, "PersOrgElectronicAddress"): for email in response_base.ElectronicAddresses.PersOrgElectronicAddress: if email.ElectronicAddressLines.PersOrgElectronicAddressLine.findtext( "AddressLineText" ): email_addr = email.ElectronicAddressLines.PersOrgElectronicAddressLine.findtext( "AddressLineText" ) emails.append(email_addr) email_details[email_addr] = email.findtext("AddressUseCode", "") additional_details = {"email_details": email_details, "phone_sequence_data": {}} phone_list = [] if hasattr(response_base.Phones, "PersOrgPhone"): for phone in response_base.Phones.PersOrgPhone: try: phone_type = phone.PhoneUseCode.text if phone_type == "PER": home_phone = Phone( area_code=phone.AreaCode.text, phone_number=phone.Exchange.text + phone.PhoneNumber.text, phone_type=PhoneType.PERSONAL, ) phone_list.append(home_phone) additional_details["phone_sequence_data"][phone_type] = ( phone.PhoneSequenceNumber.text ) if phone_type == "BUS": work_phone = Phone( area_code=phone.AreaCode.text, phone_number=phone.Exchange.text + phone.PhoneNumber.text, phone_type=PhoneType.BUSINESS, extension=phone.PhoneExtension.text, ) phone_list.append(work_phone) additional_details["phone_sequence_data"][phone_type] = ( phone.PhoneSequenceNumber.text ) if phone_type == "CELL": mobile_phone = Phone( area_code=phone.AreaCode.text, phone_number=phone.Exchange.text + phone.PhoneNumber.text, phone_type=PhoneType.CELL, ) phone_list.append(mobile_phone) additional_details["phone_sequence_data"][phone_type] = ( phone.PhoneSequenceNumber.text ) except AttributeError: continue person_number = response_base.findtext("PersonNumber", "") if self.get_cif_from_user_fields: if hasattr(response_base, "UserFields"): if hasattr(response_base.UserFields, "PersOrgUserField"): for user_fields in response_base.UserFields.PersOrgUserField: for name, code in self.get_cif_from_user_fields.items(): if user_fields.findtext(".//{*}%s" % name) == code: primary_cif = user_fields.Value.text break additional_details["person_number"] = person_number return DemographicInfo( dob, emails, phone_list, [addresses], first_name, last_name, ssn, middle_name=middle_name, primary_cif=primary_cif, title=title, additional_details=additional_details, )