Source code for q2_sdk.hq.http

"""Most of the communication with HQ will be handled in the generated hq_api
module.

However, a few specialized pieces such as parsing apart responses and
serializing them into objects will be handled here
"""

import base64
import copy
import html
import json
import re
import uuid
from typing import Optional, Union
from urllib.parse import urlparse

from lxml import etree
from requests.exceptions import RequestException

from q2_sdk.core import q2_requests
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import VaultError
from q2_sdk.core.non_http_handlers.udp import writer as udp_writer
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.models.recursive_encoder import RecursiveEncoder
from q2_sdk.tools.sentinel import DEFAULT

from .exceptions import HQConnectionError, MissingStoredProcError
from .models.hq_commands import HqCommands
from .models.hq_response import HqResponse

XML_PARSER = etree.XMLParser(remove_blank_text=True, huge_tree=True)


def normalize_xml_str(xml_str):
    parsed_xml = etree.XML(xml_str, parser=XML_PARSER)
    return etree.tostring(parsed_xml).decode()


[docs] def get_unhandled_token_response(token: uuid.UUID) -> str: """ Generates a UUID to handle preflight HQ GetUnsignedTokenRequest :param token: UUID returned to HQ. """ response = f""" <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"> <soap:Body> <GetUnsignedTokenResponse xmlns="http://tempuri.org/"> <GetUnsignedTokenResult>{token}</GetUnsignedTokenResult> </GetUnsignedTokenResponse> </soap:Body> </soap:Envelope> """ return normalize_xml_str(response)
[docs] def get_backoffice_report_response(root_node: str, form_data: bytes) -> str: """ Builds a soap envelope to respond to an "ExecuteRequestAsXML" request from HQ :param root_node: Request nodename provided by HQ :param form_data: Response data """ if isinstance(form_data, str): form_data = form_data.encode() response = f"""<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <soap:Body> <ExecuteRequestAsXmlResponse xmlns="http://tempuri.org/"> <ExecuteRequestAsXmlResult> <{root_node} xmlns:q1="http://Q2software.com" xmlns=""> <Result> <ErrorCode ErrorType="Success">0</ErrorCode> <ErrorDescription/> </Result> <Data>{base64.b64encode(form_data).decode()}</Data> </{root_node}> </ExecuteRequestAsXmlResult> </ExecuteRequestAsXmlResponse> </soap:Body> </soap:Envelope> """ response = etree.XML(response, parser=XML_PARSER) return etree.tostring(response).decode()
[docs] def get_execute_request_as_xml_response( root_node: str, form_data: bytes, attrs: dict, hq_commands: Optional[HqCommands] = None, ) -> str: """ Builds a soap envelope to respond to an "ExecuteRequestAsXML" request from HQ :param root_node: Request nodename provided by HQ :param form_data: Response data :param attrs: Attributes to add to the interior Data node, passed through to HQ :param hq_commands: (Optional) Instance of `.HqCommands` to send additional commands with the response """ if isinstance(form_data, bytes): form_data = form_data.decode() try: # If form_data is xml, gracefully embed it form_data = html.escape(form_data) except AttributeError: pass hq_commands_xml = "" if hq_commands: commands_root_node = etree.Element("CommandsXml") commands_node = etree.Element("HqCommands") etree.SubElement(commands_node, "ReloadAccountListFromDb").text = str( hq_commands.reload_account_list_from_db ).lower() etree.SubElement(commands_node, "ReloadAccountListFromHost").text = str( hq_commands.reload_account_list_from_host ).lower() etree.SubElement(commands_node, "ReloadAllAccountInfo").text = str( hq_commands.reload_all_account_info ).lower() etree.SubElement(commands_node, "ForceSuccessResponse").text = str( hq_commands.force_success_response ).lower() for account_id in hq_commands.account_ids_to_reload: etree.SubElement(commands_node, "ReloadHostAccountID").text = str( account_id ) if hq_commands.disclaimer_id_to_skip: etree.SubElement(commands_node, "SkipFormDisclaimerID").text = str( hq_commands.disclaimer_id_to_skip ) commands_node = etree.tostring(commands_node).decode() commands_root_node.text = commands_node hq_commands_xml = etree.tostring(commands_root_node).decode() data = f""" <FormRsp xmlns="http://tempuri.org/FormRsp.xsd"> <FormResp> <TargetUrl/> <PrependVirtualDirToUrl>false</PrependVirtualDirToUrl> <MessageToEndUser/> <NewFormData>{form_data}</NewFormData> <NewFormShortName/> {hq_commands_xml} </FormResp> </FormRsp> """ data = normalize_xml_str(data) response = f""" <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <soap:Body> <ExecuteRequestAsXmlResponse xmlns="http://tempuri.org/"> <ExecuteRequestAsXmlResult> <{root_node}> <Result> <ErrorCode ErrorType="Success">0</ErrorCode> <ErrorDescription/> </Result> <Data>{html.escape(data)}</Data> </{root_node}> </ExecuteRequestAsXmlResult> </ExecuteRequestAsXmlResponse> </soap:Body> </soap:Envelope> """ response = etree.XML(response, parser=XML_PARSER) data_node = response.find(".//{*}Data") for key, value in sorted(attrs.items()): data_node.attrib[key] = str(value) return etree.tostring(response).decode()
def get_login_to_adapter_response(root_node: str, form_data: bytes) -> str: if isinstance(form_data, bytes): form_data = form_data.decode() response = f""" <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <soap:Body> <LoginToAdapterResponse xmlns="http://Q2Software.com/webservices/3rdPartyAdaptors"> <LoginToAdapterResult> <{root_node}> <Result> <ErrorCode ErrorType="Success">0</ErrorCode> <ErrorDescription/> </Result> <Data> <AdaptorReturn> <Success>true</Success> <ReturnType>RawHTML</ReturnType> <Data>{html.escape(form_data)}</Data> </AdaptorReturn> </Data> </{root_node}> </LoginToAdapterResult> </LoginToAdapterResponse> </soap:Body> </soap:Envelope> """ response = etree.XML(response, parser=XML_PARSER) return etree.tostring(response).decode()
[docs] def get_execute_request_as_string_response( root_node: str, response: str, attrs: dict, hq_commands: Optional[HqCommands] = None ) -> str: """ Builds a soap envelope to respond to an "ExecuteRequestAsString" request from HQ :param root_node: Request nodename provided by HQ :param response: Response data :param attrs: Attributes to add to the interior Data node, passed through to HQ """ if isinstance(response, bytes): response = response.decode() string_result = f""" <{root_node}> <Result> <ErrorCode ErrorType="Success">0</ErrorCode> <ErrorDescription/> </Result> <Data>{response}</Data> </{root_node}> """ string_result = etree.XML(string_result, parser=XML_PARSER) data_node = string_result.find(".//{*}Data") for key, value in sorted(attrs.items()): data_node.attrib[key] = str(value) string_result = etree.tostring(string_result).decode() response = f""" <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <soap:Body> <ExecuteRequestAsStringResponse xmlns="http://tempuri.org/"> <ExecuteRequestAsStringResult>{html.escape(string_result)}</ExecuteRequestAsStringResult> </ExecuteRequestAsStringResponse> </soap:Body> </soap:Envelope> """ response = etree.XML(response, parser=XML_PARSER) return etree.tostring(response).decode()
class HqCallObj: def __init__( self, logger: Q2LoggerType, url: str, raw_body: Union[str, dict], headers: dict, bt_handle: Optional[str] = None, ): self.logger = logger self.url = url self.raw_body = raw_body self.headers = headers self.bt_handle = bt_handle def __repr__(self): return f"Url: {self.url}\nBody: {self.raw_body}\nheaders: {self.headers}" @property def body(self): body = self.raw_body if self._get_body_type() == "dict": body = json.dumps(self.raw_body, cls=RecursiveEncoder) return body.encode() async def post_to_hq( self, refresh_creds=False, response_class=HqResponse, **kwargs ) -> HqResponse: if refresh_creds: self._add_creds_to_body() response = await self._call_hq(**kwargs) try: response_obj = response_class(response, logger=self.logger) except Exception: response_obj = response_class( { "ExceptionMessage": f"Unknown error from HQ_URL: {self.url}", "RawResponse": response.text, }, logger=self.logger, ) return response_obj def get_response_body(self, response: q2_requests.requests.Response): if self._get_body_type() == "dict": response_body = response.json() else: response_body = response.text.encode() return response_body def _get_body_type(self): body_type = "xml" if isinstance(self.raw_body, dict): body_type = "dict" return body_type def _add_creds_to_body(self): body = self.raw_body body_type = self._get_body_type() if body_type == "xml": body_as_xml = etree.fromstring(body) csr_logon_name_node = body_as_xml.find(".//{*}CsrLogonName") csr_password_node = body_as_xml.find(".//{*}CsrPassword") aba_node = body_as_xml.find(".//{*}ABA") if csr_logon_name_node is None: csr_logon_name_node = etree.SubElement(body_as_xml, "CsrLogonName") if csr_password_node is None: csr_password_node = etree.SubElement(body_as_xml, "CsrPassword") if aba_node is None: aba_node = etree.SubElement(body_as_xml, "ABA") csr_logon_name_node.text = settings.HQ_CREDENTIALS.csr_user csr_password_node.text = settings.HQ_CREDENTIALS.csr_pwd aba_node.text = settings.HQ_CREDENTIALS.aba body = etree.tostring(body_as_xml).decode() elif body_type == "dict": body.update({ "CsrLogonName": settings.HQ_CREDENTIALS.csr_user, "CsrPassword": settings.HQ_CREDENTIALS.csr_pwd, "ABA": settings.HQ_CREDENTIALS.aba, }) self.raw_body = body async def _call_hq(self, **kwargs): try: response = await q2_requests.post( self.logger, self.url, data=self.body, headers=self.headers, verify=settings.VERIFY_HQ_CERT, bt_handle=self.bt_handle, verify_whitelist=False, **kwargs, ) except RequestException as err: raise HQConnectionError from err return response def can_rotate_hq_url(self): if len(settings.HQ_CREDENTIALS.hq_url_list) <= 1: return False if not self.url: return False current_hq_url = re.sub(r"\w+\.as[hm]x", "", self.url).rstrip("/") if current_hq_url not in settings.HQ_CREDENTIALS.hq_url_list: return False return True
[docs] async def call_hq( call: Union[str, dict], url: str, logger: Q2LoggerType, hq_token: Optional[str] = None, bt_handle: Optional[str] = None, response_class: Optional[classmethod] = HqResponse, backoffice_cookie: Optional[str] = None, **kwargs, ) -> HqResponse: """ Asynchronously calls HQ and returns the response :param call: Body to send to HQ endpoint :param url: Hq URL to POST to :param logger: Instantiated Q2Logger object :param hq_token: Optional token for using existing HQ session (Necessary for wedge_online_banking HQ endpoints) :param bt_handle: If defined, will pass through tracking info to AppDynamics :param response_class: Optional class of object to return :param backoffice_cookie: Optional string used for making backoffice calls. Sets the session information. :param kwargs: Optional keyword args as defined for q2_requests.post() e.g. "timeout" """ headers = generate_call_headers(call, hq_token, backoffice_cookie) if url[0] == "/": # No URL set if it starts with something like /Q2API.ashx... logger.error( "No HQ_URL set. Did you remember to set HQ_URL in your settings file?" ) raise HQConnectionError call_obj = HqCallObj(logger, url, call, headers, bt_handle) try: response_obj = await call_obj.post_to_hq( response_class=response_class, **kwargs ) except HQConnectionError as exc: if call_obj.can_rotate_hq_url(): hq_count = len(settings.HQ_CREDENTIALS.hq_url_list) - 1 response_obj = None for i in range(hq_count): try: old_hq_url = urlparse(call_obj.url) old_hq_url = f"{old_hq_url.scheme}://{old_hq_url.netloc}" settings.HQ_CREDENTIALS.rotate_hq_url() call_obj.url = call_obj.url.replace( old_hq_url, settings.HQ_CREDENTIALS.hq_url ) parsed_url = urlparse(call_obj.url) url_to_log = f"{parsed_url.scheme}://{parsed_url.netloc}" logger.warning(f"Trying next url in HQ_URL_LIST: {url_to_log}") response_obj = await call_obj.post_to_hq( refresh_creds=True, response_class=response_class, **kwargs ) except HQConnectionError: continue else: break if response_obj is None: raise HQConnectionError else: raise exc else: if not response_obj.success: missing_sproc_match = re.search( r"Query for (\w+) returned 0 records", response_obj.error_message ) if missing_sproc_match: raise MissingStoredProcError( "Stored Procedure {} not installed in the database".format( missing_sproc_match.group(1) ) ) if settings.USE_VAULT_FOR_CREDS: # Try to refresh the HQ creds from vault if it was a logon failure if "logon failure" in response_obj.error_message.lower(): old_creds = copy.deepcopy(settings.HQ_CREDENTIALS) try: refresh_hq_creds() except VaultError: return response_obj new_creds = settings.HQ_CREDENTIALS if old_creds != new_creds: logger.info("Attempting to refresh creds from vault") response_obj = await call_obj.post_to_hq( refresh_creds=True, response_class=response_class ) if response_obj.success: return response_obj return response_obj
def generate_call_headers(call, hq_token=None, backoffice_cookie=None): headers = {"Accept": "application/xml"} if isinstance(call, str): headers["Content-Type"] = "application/soap+xml" elif isinstance(call, dict): headers["Content-Type"] = "application/json" if hq_token: headers["HQ-AUTH-TOKEN"] = hq_token if backoffice_cookie: headers["Cookie"] = backoffice_cookie return headers def refresh_hq_creds(is_parent=False, logger=None, timeout=DEFAULT): from q2_sdk.core import vault client = vault.get_client(logger=logger, timeout=timeout) if client: hq_creds = client.get_hq_creds(settings.VAULT_KEY) if hq_creds != settings.HQ_CREDENTIALS: if settings.FORK_REQUESTS and not is_parent: udp_writer.send_msg(udp_writer.MsgType.HqCredentials, {}) return settings.HQ_CREDENTIALS = hq_creds