Source code for q2_sdk.core.http_handlers.ardent_handler

import json
from typing import Optional

from q2_sdk.core.http_handlers.hq_handler import Q2HqRequestHandler
from q2_sdk.core.configuration import settings
from q2_sdk.hq.db.zone_wedge_address_config import ZoneWedgeAddressConfig


[docs] class Q2ArdentRequestHandler(Q2HqRequestHandler): """ RequestHandler meant to be used for requests incoming from Ardent. """ DESCRIPTION = "" def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.allow_non_q2_traffic = True self._form_fields = None @property def one_hour_cache(self): return 60 * 60 * 1
[docs] async def prepare(self): await super().prepare() customer_key = self.request.headers.get("customerKey", "") customer_key = customer_key.replace("CHANGEME", "") zone_id = self.request.headers.get("Zoneid") wedge_json_data = json.loads(self.request.headers.get("Wedgejsondata", "{}")) use_only_wedge_json_data = ( True if "migrated" in wedge_json_data.keys() else False ) if not customer_key: customer_key = wedge_json_data.get("customerKey", "") if customer_key and customer_key != settings.VAULT_KEY: supplied_hq = self._get_hq_from_key(customer_key) if supplied_hq: self.hq_credentials = supplied_hq if self.hq_credentials: self._db_config = await self._resolve_config( use_only_wedge_json_data, wedge_json_data, zone_id=zone_id )
async def _resolve_config( self, use_only_wedge_json_data, wedge_json_data, zone_id: Optional[str] = None ): if zone_id: config = await self._get_zone_configs(zone_id) if config: return config if use_only_wedge_json_data: return wedge_json_data await self.get_wedge_address_configs() return self._db_config if not use_only_wedge_json_data: await self.get_wedge_address_configs() return self._db_config return wedge_json_data async def _get_zone_configs(self, zone_id) -> dict | None: wedge_name = self.request.headers.get("Wedgeaddresstypename") zone_cache_key = f"{self.hq_credentials.customer_key}-{wedge_name}-{zone_id}" cached_zone_configs = await self.cache.get_async(zone_cache_key) if not cached_zone_configs: zone_wedge_configs = ZoneWedgeAddressConfig( self.logger, hq_credentials=self.hq_credentials ) zone_configs = await zone_wedge_configs.get_all_by_wedge_address( wedge_address_name=wedge_name ) this_zone_config = None for config in zone_configs: if config.ZoneID.text == str(zone_id): this_zone_config = json.loads(config.Config.text) break if this_zone_config: await self.cache.set_async( zone_cache_key, this_zone_config, expire=self.one_hour_cache ) else: this_zone_config = cached_zone_configs return this_zone_config async def route_request(self, routing_key: Optional[str] = None): if not routing_key: routing_key = self.form_fields.get("routing_key") if not routing_key: routing_key = self.DEFAULT_ROUTE if isinstance(routing_key, list): if len(routing_key) > 1: self.logger.error("Multiple routing keys present: '%s'.", routing_key) routing_key = self.DEFAULT_ROUTE else: routing_key = routing_key[0].decode("utf-8") if routing_key not in self.router: self.logger.error( "Routing key '%s' not present in router dictionary.", routing_key ) routing_key = self.DEFAULT_ROUTE route = self.router[routing_key] self.logger.info("Transition: routing to '%s'", routing_key) self.active_route = routing_key return await self.call_route(route) def parse_query_parameters(self) -> dict: if not self.request.arguments: return {} return { key: str(value[0], "utf-8") for key, value in self.request.arguments.items() } @property def form_fields(self): if not self._form_fields: fields = self.parse_query_parameters() if self.request.body: content_type = self.request.headers.get("Content-Type") overlay_fields = {} try: if content_type is None or content_type.lower().startswith( "application/json" ): overlay_fields = json.loads(self.request.body) except json.decoder.JSONDecodeError: self.logger.error( f"Unable to decode non-JSON body but content-Type specified json: {self.request.body=}" ) fields.update(overlay_fields) self._form_fields = fields return self._form_fields