"""Most of the communication with HQ will be handled in the generated hq_api
module.
However, a few specialized pieces such as parsing apart responses and
serializing them into objects will be handled here
"""
import base64
import copy
import html
import json
import re
import uuid
from typing import Optional, Union
from urllib.parse import urlparse
from lxml import etree
from requests.exceptions import RequestException
from q2_sdk.core import q2_requests
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import VaultError
from q2_sdk.core.non_http_handlers.udp import writer as udp_writer
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.models.recursive_encoder import RecursiveEncoder
from q2_sdk.tools.sentinel import DEFAULT
from .exceptions import HQConnectionError, MissingStoredProcError
from .models.hq_commands import HqCommands
from .models.hq_response import HqResponse
XML_PARSER = etree.XMLParser(remove_blank_text=True, huge_tree=True)
def normalize_xml_str(xml_str):
parsed_xml = etree.XML(xml_str, parser=XML_PARSER)
return etree.tostring(parsed_xml).decode()
[docs]
def get_unhandled_token_response(token: uuid.UUID) -> str:
"""
Generates a UUID to handle preflight HQ GetUnsignedTokenRequest
:param token: UUID returned to HQ.
"""
response = f"""
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<soap:Body>
<GetUnsignedTokenResponse xmlns="http://tempuri.org/">
<GetUnsignedTokenResult>{token}</GetUnsignedTokenResult>
</GetUnsignedTokenResponse>
</soap:Body>
</soap:Envelope>
"""
return normalize_xml_str(response)
[docs]
def get_backoffice_report_response(root_node: str, form_data: bytes) -> str:
"""
Builds a soap envelope to respond to an "ExecuteRequestAsXML" request from HQ
:param root_node: Request nodename provided by HQ
:param form_data: Response data
"""
if isinstance(form_data, str):
form_data = form_data.encode()
response = f"""<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Body>
<ExecuteRequestAsXmlResponse xmlns="http://tempuri.org/">
<ExecuteRequestAsXmlResult>
<{root_node} xmlns:q1="http://Q2software.com" xmlns="">
<Result>
<ErrorCode ErrorType="Success">0</ErrorCode>
<ErrorDescription/>
</Result>
<Data>{base64.b64encode(form_data).decode()}</Data>
</{root_node}>
</ExecuteRequestAsXmlResult>
</ExecuteRequestAsXmlResponse>
</soap:Body>
</soap:Envelope>
"""
response = etree.XML(response, parser=XML_PARSER)
return etree.tostring(response).decode()
[docs]
def get_execute_request_as_xml_response(
root_node: str,
form_data: bytes,
attrs: dict,
hq_commands: Optional[HqCommands] = None,
) -> str:
"""
Builds a soap envelope to respond to an "ExecuteRequestAsXML" request from HQ
:param root_node: Request nodename provided by HQ
:param form_data: Response data
:param attrs: Attributes to add to the interior Data node, passed through to HQ
:param hq_commands: (Optional) Instance of `.HqCommands` to send additional commands with the response
"""
if isinstance(form_data, bytes):
form_data = form_data.decode()
try: # If form_data is xml, gracefully embed it
form_data = html.escape(form_data)
except AttributeError:
pass
hq_commands_xml = ""
if hq_commands:
commands_root_node = etree.Element("CommandsXml")
commands_node = etree.Element("HqCommands")
etree.SubElement(commands_node, "ReloadAccountListFromDb").text = str(
hq_commands.reload_account_list_from_db
).lower()
etree.SubElement(commands_node, "ReloadAccountListFromHost").text = str(
hq_commands.reload_account_list_from_host
).lower()
etree.SubElement(commands_node, "ReloadAllAccountInfo").text = str(
hq_commands.reload_all_account_info
).lower()
etree.SubElement(commands_node, "ForceSuccessResponse").text = str(
hq_commands.force_success_response
).lower()
for account_id in hq_commands.account_ids_to_reload:
etree.SubElement(commands_node, "ReloadHostAccountID").text = str(
account_id
)
if hq_commands.disclaimer_id_to_skip:
etree.SubElement(commands_node, "SkipFormDisclaimerID").text = str(
hq_commands.disclaimer_id_to_skip
)
commands_node = etree.tostring(commands_node).decode()
commands_root_node.text = commands_node
hq_commands_xml = etree.tostring(commands_root_node).decode()
data = f"""
<FormRsp xmlns="http://tempuri.org/FormRsp.xsd">
<FormResp>
<TargetUrl/>
<PrependVirtualDirToUrl>false</PrependVirtualDirToUrl>
<MessageToEndUser/>
<NewFormData>{form_data}</NewFormData>
<NewFormShortName/>
{hq_commands_xml}
</FormResp>
</FormRsp>
"""
data = normalize_xml_str(data)
response = f"""
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Body>
<ExecuteRequestAsXmlResponse xmlns="http://tempuri.org/">
<ExecuteRequestAsXmlResult>
<{root_node}>
<Result>
<ErrorCode ErrorType="Success">0</ErrorCode>
<ErrorDescription/>
</Result>
<Data>{html.escape(data)}</Data>
</{root_node}>
</ExecuteRequestAsXmlResult>
</ExecuteRequestAsXmlResponse>
</soap:Body>
</soap:Envelope>
"""
response = etree.XML(response, parser=XML_PARSER)
data_node = response.find(".//{*}Data")
for key, value in sorted(attrs.items()):
data_node.attrib[key] = str(value)
return etree.tostring(response).decode()
def get_login_to_adapter_response(root_node: str, form_data: bytes) -> str:
if isinstance(form_data, bytes):
form_data = form_data.decode()
response = f"""
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Body>
<LoginToAdapterResponse xmlns="http://Q2Software.com/webservices/3rdPartyAdaptors">
<LoginToAdapterResult>
<{root_node}>
<Result>
<ErrorCode ErrorType="Success">0</ErrorCode>
<ErrorDescription/>
</Result>
<Data>
<AdaptorReturn>
<Success>true</Success>
<ReturnType>RawHTML</ReturnType>
<Data>{html.escape(form_data)}</Data>
</AdaptorReturn>
</Data>
</{root_node}>
</LoginToAdapterResult>
</LoginToAdapterResponse>
</soap:Body>
</soap:Envelope>
"""
response = etree.XML(response, parser=XML_PARSER)
return etree.tostring(response).decode()
[docs]
def get_execute_request_as_string_response(
root_node: str, response: str, attrs: dict, hq_commands: Optional[HqCommands] = None
) -> str:
"""
Builds a soap envelope to respond to an "ExecuteRequestAsString" request from HQ
:param root_node: Request nodename provided by HQ
:param response: Response data
:param attrs: Attributes to add to the interior Data node, passed through to HQ
"""
if isinstance(response, bytes):
response = response.decode()
string_result = f"""
<{root_node}>
<Result>
<ErrorCode ErrorType="Success">0</ErrorCode>
<ErrorDescription/>
</Result>
<Data>{response}</Data>
</{root_node}>
"""
string_result = etree.XML(string_result, parser=XML_PARSER)
data_node = string_result.find(".//{*}Data")
for key, value in sorted(attrs.items()):
data_node.attrib[key] = str(value)
string_result = etree.tostring(string_result).decode()
response = f"""
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Body>
<ExecuteRequestAsStringResponse xmlns="http://tempuri.org/">
<ExecuteRequestAsStringResult>{html.escape(string_result)}</ExecuteRequestAsStringResult>
</ExecuteRequestAsStringResponse>
</soap:Body>
</soap:Envelope>
"""
response = etree.XML(response, parser=XML_PARSER)
return etree.tostring(response).decode()
class HqCallObj:
def __init__(
self,
logger: Q2LoggerType,
url: str,
raw_body: Union[str, dict],
headers: dict,
bt_handle: Optional[str] = None,
):
self.logger = logger
self.url = url
self.raw_body = raw_body
self.headers = headers
self.bt_handle = bt_handle
def __repr__(self):
return f"Url: {self.url}\nBody: {self.raw_body}\nheaders: {self.headers}"
@property
def body(self):
body = self.raw_body
if self._get_body_type() == "dict":
body = json.dumps(self.raw_body, cls=RecursiveEncoder)
return body.encode()
async def post_to_hq(
self, refresh_creds=False, response_class=HqResponse, **kwargs
) -> HqResponse:
if refresh_creds:
self._add_creds_to_body()
response = await self._call_hq(**kwargs)
try:
response_obj = response_class(response, logger=self.logger)
except Exception:
response_obj = response_class(
{
"ExceptionMessage": f"Unknown error from HQ_URL: {self.url}",
"RawResponse": response.text,
},
logger=self.logger,
)
return response_obj
def get_response_body(self, response: q2_requests.requests.Response):
if self._get_body_type() == "dict":
response_body = response.json()
else:
response_body = response.text.encode()
return response_body
def _get_body_type(self):
body_type = "xml"
if isinstance(self.raw_body, dict):
body_type = "dict"
return body_type
def _add_creds_to_body(self):
body = self.raw_body
body_type = self._get_body_type()
if body_type == "xml":
body_as_xml = etree.fromstring(body)
csr_logon_name_node = body_as_xml.find(".//{*}CsrLogonName")
csr_password_node = body_as_xml.find(".//{*}CsrPassword")
aba_node = body_as_xml.find(".//{*}ABA")
if csr_logon_name_node is None:
csr_logon_name_node = etree.SubElement(body_as_xml, "CsrLogonName")
if csr_password_node is None:
csr_password_node = etree.SubElement(body_as_xml, "CsrPassword")
if aba_node is None:
aba_node = etree.SubElement(body_as_xml, "ABA")
csr_logon_name_node.text = settings.HQ_CREDENTIALS.csr_user
csr_password_node.text = settings.HQ_CREDENTIALS.csr_pwd
aba_node.text = settings.HQ_CREDENTIALS.aba
body = etree.tostring(body_as_xml).decode()
elif body_type == "dict":
body.update({
"CsrLogonName": settings.HQ_CREDENTIALS.csr_user,
"CsrPassword": settings.HQ_CREDENTIALS.csr_pwd,
"ABA": settings.HQ_CREDENTIALS.aba,
})
self.raw_body = body
async def _call_hq(self, **kwargs):
try:
response = await q2_requests.post(
self.logger,
self.url,
data=self.body,
headers=self.headers,
verify=settings.VERIFY_HQ_CERT,
bt_handle=self.bt_handle,
verify_whitelist=False,
**kwargs,
)
except RequestException as err:
raise HQConnectionError from err
return response
def can_rotate_hq_url(self):
if len(settings.HQ_CREDENTIALS.hq_url_list) <= 1:
return False
if not self.url:
return False
current_hq_url = re.sub(r"\w+\.as[hm]x", "", self.url).rstrip("/")
if current_hq_url not in settings.HQ_CREDENTIALS.hq_url_list:
return False
return True
[docs]
async def call_hq(
call: Union[str, dict],
url: str,
logger: Q2LoggerType,
hq_token: Optional[str] = None,
bt_handle: Optional[str] = None,
response_class: Optional[classmethod] = HqResponse,
backoffice_cookie: Optional[str] = None,
**kwargs,
) -> HqResponse:
"""
Asynchronously calls HQ and returns the response
:param call: Body to send to HQ endpoint
:param url: Hq URL to POST to
:param logger: Instantiated Q2Logger object
:param hq_token: Optional token for using existing HQ session (Necessary for wedge_online_banking HQ endpoints)
:param bt_handle: If defined, will pass through tracking info to AppDynamics
:param response_class: Optional class of object to return
:param backoffice_cookie: Optional string used for making backoffice calls. Sets the session information.
:param kwargs: Optional keyword args as defined for q2_requests.post() e.g. "timeout"
"""
headers = generate_call_headers(call, hq_token, backoffice_cookie)
if url[0] == "/":
# No URL set if it starts with something like /Q2API.ashx...
logger.error(
"No HQ_URL set. Did you remember to set HQ_URL in your settings file?"
)
raise HQConnectionError
call_obj = HqCallObj(logger, url, call, headers, bt_handle)
try:
response_obj = await call_obj.post_to_hq(
response_class=response_class, **kwargs
)
except HQConnectionError as exc:
if call_obj.can_rotate_hq_url():
hq_count = len(settings.HQ_CREDENTIALS.hq_url_list) - 1
response_obj = None
for i in range(hq_count):
try:
old_hq_url = urlparse(call_obj.url)
old_hq_url = f"{old_hq_url.scheme}://{old_hq_url.netloc}"
settings.HQ_CREDENTIALS.rotate_hq_url()
call_obj.url = call_obj.url.replace(
old_hq_url, settings.HQ_CREDENTIALS.hq_url
)
parsed_url = urlparse(call_obj.url)
url_to_log = f"{parsed_url.scheme}://{parsed_url.netloc}"
logger.warning(f"Trying next url in HQ_URL_LIST: {url_to_log}")
response_obj = await call_obj.post_to_hq(
refresh_creds=True, response_class=response_class, **kwargs
)
except HQConnectionError:
continue
else:
break
if response_obj is None:
raise HQConnectionError
else:
raise exc
else:
if not response_obj.success:
missing_sproc_match = re.search(
r"Query for (\w+) returned 0 records", response_obj.error_message
)
if missing_sproc_match:
raise MissingStoredProcError(
"Stored Procedure {} not installed in the database".format(
missing_sproc_match.group(1)
)
)
if settings.USE_VAULT_FOR_CREDS:
# Try to refresh the HQ creds from vault if it was a logon failure
if "logon failure" in response_obj.error_message.lower():
old_creds = copy.deepcopy(settings.HQ_CREDENTIALS)
try:
refresh_hq_creds()
except VaultError:
return response_obj
new_creds = settings.HQ_CREDENTIALS
if old_creds != new_creds:
logger.info("Attempting to refresh creds from vault")
response_obj = await call_obj.post_to_hq(
refresh_creds=True, response_class=response_class
)
if response_obj.success:
return response_obj
return response_obj
def generate_call_headers(call, hq_token=None, backoffice_cookie=None):
headers = {"Accept": "application/xml"}
if isinstance(call, str):
headers["Content-Type"] = "application/soap+xml"
elif isinstance(call, dict):
headers["Content-Type"] = "application/json"
if hq_token:
headers["HQ-AUTH-TOKEN"] = hq_token
if backoffice_cookie:
headers["Cookie"] = backoffice_cookie
return headers
def refresh_hq_creds(is_parent=False, logger=None, timeout=DEFAULT):
from q2_sdk.core import vault
client = vault.get_client(logger=logger, timeout=timeout)
if client:
hq_creds = client.get_hq_creds(settings.VAULT_KEY)
if hq_creds != settings.HQ_CREDENTIALS:
if settings.FORK_REQUESTS and not is_parent:
udp_writer.send_msg(udp_writer.MsgType.HqCredentials, {})
return
settings.HQ_CREDENTIALS = hq_creds