Source code for q2_cores.Phoenix.queries.demographic_info_query

import logging
import time
from datetime import datetime
from lxml import etree
from dateutil.tz import tzlocal
from q2_sdk.models.cores.queries.base_query import BaseQuery
from q2_cores.Phoenix.queries import mock_responses
from q2_cores import data_helpers


[docs] class DemographicInfoQuery(BaseQuery): def __init__( self, logger: logging.Logger, service_id: int, emp_id: int, ssn: int = None, acct_no: str = None, acct_type: str = None, ): self.ssn = ssn self.acct_no = acct_no self.acct_type = acct_type self.service_id = service_id self.emp_id = emp_id super().__init__(logger)
[docs] def build(self): if self.acct_no and self.acct_type: self.logger.debug( f"Building with account and type: {self.acct_no} {self.acct_type}" ) build = self.build_with_account_and_type() else: self.logger.debug(f"Building with ssn: {self.ssn}") build = self.build_with_ssn() return build
[docs] def build_with_ssn(self): """ shape:: <INQUIRY> <TRAN_INFO> <ReferenceNo>YYYYMMDDHHMMSS[TZ]</ReferenceNo> <XapiTranCode>13005</XapiTranCode> <OutputTypeId>2</OutputTypeId> </TRAN_INFO> <Tin>123456789%</Tin> </INQUIRY> """ # Unique to this type of inquiry tran_code = "13005" output_type_id = "2" root = etree.Element("INQUIRY") tran_info = etree.SubElement(root, "TRAN_INFO") etree.SubElement(tran_info, "ReferenceNo").text = self.make_reference_code() etree.SubElement(tran_info, "XapiTranCode").text = tran_code etree.SubElement(tran_info, "OutputTypeId").text = output_type_id etree.SubElement(root, "Tin").text = self.ssn xml_str = etree.tostring(self.make_message([root])).decode("utf8") return data_helpers.normalize_xml_str(xml_str)
[docs] def build_with_account_and_type(self): """ shape:: <INQUIRY> <TRAN_INFO> <ReferenceNo>YYYYMMDDHHMMSS[TZ]</ReferenceNo> <XapiTranCode>13005</XapiTranCode> <OutputTypeId>2</OutputTypeId> </TRAN_INFO> <TRAN_ACCT> <AcctNo>5401000012</AcctNo> <AcctType>SAV</AcctType> </TRAN_ACCT> </INQUIRY> """ # Unique to this type of inquiry tran_code = "13005" output_type_id = "2" root = etree.Element("INQUIRY") tran_info = etree.SubElement(root, "TRAN_INFO") etree.SubElement(tran_info, "ReferenceNo").text = self.make_reference_code() etree.SubElement(tran_info, "XapiTranCode").text = tran_code etree.SubElement(tran_info, "OutputTypeId").text = output_type_id tran_acct = etree.SubElement(root, "TRAN_ACCT") etree.SubElement(tran_acct, "AcctNo").text = self.acct_no etree.SubElement(tran_acct, "AcctType").text = self.acct_type xml_str = etree.tostring(self.make_message([root])).decode("utf8") return data_helpers.normalize_xml_str(xml_str)
[docs] def mock_response(self): return mock_responses.mock_demographic_response()
[docs] def make_message(self, message_nodes: list[etree.Element]): """ shape:: <XAPI xmlns="x-schema:phoenix.xapi"> <SERVICE_INFO> <Id>self.service_id</Id> </SERVICE_INFO> <USER_INFO> <EmplId>self.empl_id</EmplId> </USER_INFO> <!-- message_nodes[0] --> <!-- message nodes[1] --> <!-- ... --> </XAPI> """ root = etree.Element("XAPI", attrib={"xmlns": "x-schema:phoenix.xapi"}) service_info = etree.SubElement(root, "SERVICE_INFO") etree.SubElement(service_info, "Id").text = str(self.service_id) user_info = etree.SubElement(root, "USER_INFO") etree.SubElement(user_info, "EmplId").text = str(self.emp_id) for message_node in message_nodes: root.append(message_node) return root
[docs] @staticmethod def make_reference_code(): # TZ is the acronym of the time zone (e.g., "CST" for "Central Standard Time") time_zone = datetime.now(tzlocal()).tzname() return time.strftime(f"%Y%m%d%H%M%S[{time_zone}]")