Source code for q2_cores.SymXchange.mappers.account_demographic

from typing import List

from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_cores.Symitar.models.extended_demographic import (
    DemographicInfoExt,
    AddressRec,
    Other,
    Account,
)
from q2_cores.Symitar.models import parser
from ..queries.base_query import SymXchangeBaseQuery
from ..queries.account_query import AccountQuery


[docs] class AccountDemographicMapper(BaseMapper): """ returns a list of name records including address info, list of shares and selected account info. """
[docs] @staticmethod def parse_returned_queries(list_of_queries: List[SymXchangeBaseQuery]) -> list: assert len(list_of_queries) == 1 assert isinstance(list_of_queries[0], AccountQuery) response = list_of_queries[0].raw_core_response symitar_response = parser.parse(response) root = symitar_response.element selected_field = root.Body[ "{http://www.symxchange.generated.symitar.com/account}getAccountSelectFieldsResponse" ] response = selected_field.getchildren()[0] try: shares = response.Account.ShareList.Share share_ids = [share.Id for share in shares] name_records = response.Account.NameList.Name account_record = Account( memo_mode="0", # an additional call to the GeneralEpisysInformationRequest # is required to determine the status of InMemoMode account_number=response.Account.Number, account_type=response.Account.Type, account_close_date=response.findtext("CloseDate", "--/--/----"), ) except AttributeError: return [] name_profiles = [] for record in name_records: core_specific = { "account_level_signer": str( response.Account.findtext("Type") == "9" ).upper(), "share_id": share_ids, "locator": record.findtext("Locator"), "name_type": record.findtext("Type"), "extended_name": record.findtext("LongName"), "address_expiration_date": record.findtext("AddrRecordExprDate"), "last_address_verify_date": record.findtext("AddrRecordChangeDate"), "preferred_contact_method": record.findtext("PreferredContact"), } demo_record = DemographicInfoExt( record.findtext("LongName"), record.findtext("First"), record.findtext("Last"), record.findtext("Middle"), record.findtext("MothersMaidenName"), record.findtext("BirthDate"), record.findtext("Ssn"), record.findtext("Type"), response.Account.Number, response.Account.Type, response.findtext("CloseDate", "--/--/----"), record.findtext("License"), core_specific, AddressRec( record.findtext("Street"), record.findtext("ExtraAddress"), record.findtext("City"), record.findtext("State"), record.findtext("ZipCode"), record.findtext("AddressType"), record.findtext("Country"), record.findtext("CountryCode"), ), Other( record.findtext("HomePhone"), record.findtext("MobilePhone"), record.findtext("WorkPhone"), record.findtext("WorkPhoneExtension"), record.findtext("PhoneType"), record.findtext("AltEmail"), record.findtext("Email"), ), ) name_profiles.append(demo_record) return [account_record, name_profiles]