import logging
from lxml import etree
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores import data_helpers
from q2_cores.MiserCohesion.queries import mock_responses
from q2_cores.exceptions import ConfigException
[docs]
class DemographicInfoQuery(BaseQuery):
"""
Returns addresses, First/Last Name, Phones, etc for an MiserCohesion cif_nbr
"""
def __init__(self, logger: logging.Logger, cif_nbr="", tax_id=""):
assert cif_nbr or tax_id, "Either cif_nbr or tax_id must be provided"
self.cif_nbr = cif_nbr
self.tax_id = tax_id
super().__init__(logger)
[docs]
def build(self):
"""
Call Shape:
.. code::
<VBTPRequest xmlns="http://schemas.fidelityinfoservices.com/webservices/miser/vbtp">
<Transaction>
<ApplCode>CF</ApplCode>
<TranCode>CUS</TranCode>
</Transaction>
<Parameters>
<CIFNbr>{cif_nbr}</CIFNbr> # Either this one
<TaxID>{tax_id}</TaxID> # or this one
</Parameters>
</VBTPRequest>
"""
root = etree.Element(
"VBTPRequest",
xmlns="http://schemas.fidelityinfoservices.com/webservices/miser/vbtp",
)
tran_node = etree.SubElement(root, "Transaction")
etree.SubElement(tran_node, "ApplCode").text = "CF"
etree.SubElement(tran_node, "TranCode").text = "CUS"
param_node = etree.SubElement(root, "Parameters")
if self.cif_nbr:
etree.SubElement(param_node, "CIFNbr").text = str(self.cif_nbr)
elif self.tax_id:
etree.SubElement(param_node, "TaxID").text = str(self.tax_id)
xml_str = etree.tostring(root).decode("utf8")
return data_helpers.normalize_xml_str(xml_str)
[docs]
def mock_response(self):
if self.cif_nbr:
response = mock_responses.mock_demo_info_response_with_cif()
elif self.tax_id:
response = mock_responses.mock_demo_info_response_with_ssn()
else:
raise ConfigException("Must provide either a cif_nbr or tax_id")
return response