from collections import Counter
from typing import List, Optional
from lxml import etree, objectify
from q2_sdk.hq.models.central_user import CentralUser
from q2_sdk.hq.models.hq_credentials import BackOfficeHqCredentials
from .hq_handler import Q2HqRequestHandler
[docs]
class Q2CentralRequestHandler(Q2HqRequestHandler):
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.central_user: Optional[CentralUser] = None
self.hq_credentials: BackOfficeHqCredentials
@property
def default_summary_log(self):
summary_log = super().default_summary_log
summary_log["HQ_ID"] = self.central_user.hq_session_id
summary_log["login_name"] = self.central_user.login_name
return summary_log
async def _build_models_from_hq(self, element):
try:
self.central_user = CentralUser(element)
except AttributeError:
self.logger.debug("No central user could be found in request")
await super()._build_models_from_hq(element)
self._build_backoffice_creds()
def _build_backoffice_creds(self):
session = None
if self.central_user:
session = self.central_user.hq_session_id
if not session:
return
backoffice_session_token = f"Q2_SessionId={session}; path=/"
self.hq_credentials = BackOfficeHqCredentials(
self.logger,
self.hq_credentials,
auto_logoff=False,
session_token=backoffice_session_token,
)
def _convert_form_fields(self, form_fields: dict) -> dict:
"""Converts central form fields dictionary to standard format"""
parsed_form_fields = {}
all_keys = Counter([x.get("key") for x in form_fields])
for field in form_fields:
name = field.get("key")
value = field.get("val") if field.get("val") else ""
if all_keys[name] > 1:
parsed_form_fields.setdefault(name, [])
parsed_form_fields[name].append(value)
else:
parsed_form_fields[name] = value
return parsed_form_fields
async def route_request(self, routing_key: Optional[str] = None):
if not routing_key:
routing_key = self.form_fields.get("routing_key")
if not routing_key:
routing_key = self.find_routing_key_central_form()
return await super().route_request(routing_key)
[docs]
def get_central_post_data_value(self, xml_str: str, search_key: str):
"""
Expects a string that contains valid xml, xml has children under the first child of the root named PostData
that have key, value attributes.
Ex: <root><someChild><PostData key="report_id" val="1" /><someChild></root>
Most likely to be used to pull post-back data from a Central form post-back request.
"""
routing_key = ""
if isinstance(xml_str, str):
try:
form_data_obj = objectify.fromstring(xml_str)
except etree.XMLSyntaxError:
form_data_obj = None
if form_data_obj is not None:
match_ele = form_data_obj.xpath(f"./*/PostData[@key='{search_key}']")
if match_ele:
routing_key = match_ele[0].attrib["val"]
return routing_key