Source code for q2_sdk.core.http_handlers.central_handler

from collections import Counter
from typing import List, Optional

from lxml import etree, objectify
from q2_sdk.hq.models.central_user import CentralUser
from q2_sdk.hq.models.hq_credentials import BackOfficeHqCredentials

from .hq_handler import Q2HqRequestHandler


[docs] class Q2CentralRequestHandler(Q2HqRequestHandler): def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.central_user: Optional[CentralUser] = None self.hq_credentials: BackOfficeHqCredentials @property def default_summary_log(self): summary_log = super().default_summary_log summary_log["HQ_ID"] = self.central_user.hq_session_id summary_log["login_name"] = self.central_user.login_name return summary_log async def _build_models_from_hq(self, element): try: self.central_user = CentralUser(element) except AttributeError: self.logger.debug("No central user could be found in request") await super()._build_models_from_hq(element) self._build_backoffice_creds() def _build_backoffice_creds(self): session = None if self.central_user: session = self.central_user.hq_session_id if not session: return backoffice_session_token = f"Q2_SessionId={session}; path=/" self.hq_credentials = BackOfficeHqCredentials( self.logger, self.hq_credentials, auto_logoff=False, session_token=backoffice_session_token, ) def _convert_form_fields(self, form_fields: dict) -> dict: """Converts central form fields dictionary to standard format""" parsed_form_fields = {} all_keys = Counter([x.get("key") for x in form_fields]) for field in form_fields: name = field.get("key") value = field.get("val") if field.get("val") else "" if all_keys[name] > 1: parsed_form_fields.setdefault(name, []) parsed_form_fields[name].append(value) else: parsed_form_fields[name] = value return parsed_form_fields
[docs] def parse_form_fields(self, form_fields: List[etree.Element]) -> dict: """Parses HQ form fields for central to a dict""" fields = {} for field in form_fields: if field.Name == "FormData" and field.Value: central_form_data = objectify.fromstring(field.Value.text) central_form_fields = central_form_data.findall( ".//PostDataCollection//PostData" ) fields = self._convert_form_fields(central_form_fields) return fields
async def route_request(self, routing_key: Optional[str] = None): if not routing_key: routing_key = self.form_fields.get("routing_key") if not routing_key: routing_key = self.find_routing_key_central_form() return await super().route_request(routing_key)
[docs] def get_central_post_data_value(self, xml_str: str, search_key: str): """ Expects a string that contains valid xml, xml has children under the first child of the root named PostData that have key, value attributes. Ex: <root><someChild><PostData key="report_id" val="1" /><someChild></root> Most likely to be used to pull post-back data from a Central form post-back request. """ routing_key = "" if isinstance(xml_str, str): try: form_data_obj = objectify.fromstring(xml_str) except etree.XMLSyntaxError: form_data_obj = None if form_data_obj is not None: match_ele = form_data_obj.xpath(f"./*/PostData[@key='{search_key}']") if match_ele: routing_key = match_ele[0].attrib["val"] return routing_key
[docs] def find_routing_key_central_form(self): """ Check for routing_key in post back data from Central """ routing_key = "" form_data = self.form_fields.get("FormData") if form_data: routing_key = self.get_central_post_data_value(form_data, "routing_key") return routing_key