from typing import List
from lxml import objectify
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from q2_sdk.models.cores.queries.base_query import BaseQuery
from ...Symitar.queries import AccountDemographicQuery, AccountDemographicUpdateQuery
from ...Symitar.models.extended_demographic import (
DemographicInfoExt,
AddressRec,
Other,
Account,
)
from ...Symitar.models import parser
[docs]
class AccountDemographicMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> list:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], AccountDemographicQuery)
response = list_of_queries[0].raw_core_response
symitar_response = parser.parse(response)
root = objectify.fromstring(symitar_response.payload)
try:
name_records = root.NAMERECORDS.NAME
account_record = Account(
memo_mode=root.findtext("MEMOMODE"),
account_number=root.findtext("ACCOUNTNUMBER"),
account_type=root.findtext("ACCOUNTTYPE"),
account_close_date=root.findtext("ACCOUNTCLOSEDATE"),
)
except AttributeError:
return []
name_profiles = []
for profile_name in name_records:
core_specific = {
"account_level_signer": profile_name.findtext("ACCOUNTLEVELSIGNER"),
"share_id": profile_name.findtext("SHAREID"),
"locator": profile_name.findtext("LOCATOR"),
"name_type": profile_name.findtext("NAMETYPE"),
"extended_name": profile_name.findtext("EXTENDEDNAME"),
"address_expiration_date": profile_name.findtext("EXPIRATIONDATE"),
"last_address_verify_date": profile_name.findtext("LASTADDRVERIFDATE"),
"preferred_contact_method": profile_name.findtext(
"PREFERREDCONTACTMETHOD"
),
}
demo_record = DemographicInfoExt(
profile_name.findtext("LONGNAME"),
profile_name.findtext("FIRST"),
profile_name.findtext("LAST"),
profile_name.findtext("MIDDLE"),
profile_name.findtext("MMN"),
profile_name.findtext("BIRTHDATE"),
profile_name.findtext("SSN"),
profile_name.findtext("NAMETYPE"),
root.findtext("ACCOUNTNUMBER"),
root.findtext("ACCOUNTTYPE"),
root.findtext("ACCOUNTCLOSEDATE"),
profile_name.findtext("LICENSENUMBER"),
core_specific,
AddressRec(
profile_name.findtext("STREET"),
profile_name.findtext("EXTRAADDRESS"),
profile_name.findtext("CITY"),
profile_name.findtext("STATE"),
profile_name.findtext("ZIPCODE"),
profile_name.findtext("ADDRESSTYPE"),
profile_name.findtext("COUNTRY"),
profile_name.findtext("COUNTRYCODE"),
),
Other(
profile_name.findtext("HOMEPHONE"),
profile_name.findtext("MOBILEPHONE"),
profile_name.findtext("WORKPHONE"),
profile_name.findtext("WORKPHONEEXTENSION"),
profile_name.findtext("PHONETYPE"),
profile_name.findtext("ALTEMAIL"),
profile_name.findtext("EMAIL"),
),
)
name_profiles.append(demo_record)
return [account_record, name_profiles]
[docs]
class AccountDemographicUpdateMapper(BaseMapper):
[docs]
@staticmethod
def parse_returned_queries(list_of_queries: List[BaseQuery]) -> dict:
assert len(list_of_queries) == 1
assert isinstance(list_of_queries[0], AccountDemographicUpdateQuery)
response = list_of_queries[0].raw_core_response
symitar_response = parser.parse(response)
root = objectify.fromstring(symitar_response.payload)
data = {
"ACCOUNTFMERROR": root.findtext("ACCOUNTFMERROR"),
"SHAREFMERROR": root.findtext("SHAREFMERROR"),
"FMERROR": root.findtext("FMERROR"),
"LOANFMERROR": root.findtext("LOANFMERROR"),
"CARDFMERROR": root.findtext("CARDFMERROR"),
"FMERRORWARNING": root.findtext("FMERRORWARNING"),
"FMERRORSETWARNING": root.findtext("FMERRORSETWARNING"),
"FMERRORSTATEMENTMAILCODE": root.findtext("FMERRORSTATEMENTMAILCODE"),
}
return data