import json
from typing import Optional
from q2_sdk.core.http_handlers.hq_handler import Q2HqRequestHandler
from q2_sdk.core.configuration import settings
from q2_sdk.hq.db.zone_wedge_address_config import ZoneWedgeAddressConfig
[docs]
class Q2ArdentRequestHandler(Q2HqRequestHandler):
"""
RequestHandler meant to be used for requests incoming from Ardent.
"""
DESCRIPTION = ""
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.allow_non_q2_traffic = True
self._form_fields = None
@property
def one_hour_cache(self):
return 60 * 60 * 1
[docs]
async def prepare(self):
await super().prepare()
customer_key = self.request.headers.get("customerKey", "")
customer_key = customer_key.replace("CHANGEME", "")
zone_id = self.request.headers.get("Zoneid")
wedge_json_data = json.loads(self.request.headers.get("Wedgejsondata", "{}"))
use_only_wedge_json_data = (
True if "migrated" in wedge_json_data.keys() else False
)
if not customer_key:
customer_key = wedge_json_data.get("customerKey", "")
if customer_key and customer_key != settings.VAULT_KEY:
supplied_hq = self._get_hq_from_key(customer_key)
if supplied_hq:
self.hq_credentials = supplied_hq
if self.hq_credentials:
self._db_config = await self._resolve_config(
use_only_wedge_json_data, wedge_json_data, zone_id=zone_id
)
async def _resolve_config(
self, use_only_wedge_json_data, wedge_json_data, zone_id: Optional[str] = None
):
if zone_id:
config = await self._get_zone_configs(zone_id)
if config:
return config
if use_only_wedge_json_data:
return wedge_json_data
await self.get_wedge_address_configs()
return self._db_config
if not use_only_wedge_json_data:
await self.get_wedge_address_configs()
return self._db_config
return wedge_json_data
async def _get_zone_configs(self, zone_id) -> dict | None:
wedge_name = self.request.headers.get("Wedgeaddresstypename")
zone_cache_key = f"{self.hq_credentials.customer_key}-{wedge_name}-{zone_id}"
cached_zone_configs = await self.cache.get_async(zone_cache_key)
if not cached_zone_configs:
zone_wedge_configs = ZoneWedgeAddressConfig(
self.logger, hq_credentials=self.hq_credentials
)
zone_configs = await zone_wedge_configs.get_all_by_wedge_address(
wedge_address_name=wedge_name
)
this_zone_config = None
for config in zone_configs:
if config.ZoneID.text == str(zone_id):
this_zone_config = json.loads(config.Config.text)
break
if this_zone_config:
await self.cache.set_async(
zone_cache_key, this_zone_config, expire=self.one_hour_cache
)
else:
this_zone_config = cached_zone_configs
return this_zone_config
async def route_request(self, routing_key: Optional[str] = None):
if not routing_key:
routing_key = self.form_fields.get("routing_key")
if not routing_key:
routing_key = self.DEFAULT_ROUTE
if isinstance(routing_key, list):
if len(routing_key) > 1:
self.logger.error("Multiple routing keys present: '%s'.", routing_key)
routing_key = self.DEFAULT_ROUTE
else:
routing_key = routing_key[0].decode("utf-8")
if routing_key not in self.router:
self.logger.error(
"Routing key '%s' not present in router dictionary.", routing_key
)
routing_key = self.DEFAULT_ROUTE
route = self.router[routing_key]
self.logger.info("Transition: routing to '%s'", routing_key)
self.active_route = routing_key
return await self.call_route(route)
def parse_query_parameters(self) -> dict:
if not self.request.arguments:
return {}
return {
key: str(value[0], "utf-8") for key, value in self.request.arguments.items()
}
@property
def form_fields(self):
if not self._form_fields:
fields = self.parse_query_parameters()
if self.request.body:
content_type = self.request.headers.get("Content-Type")
overlay_fields = {}
try:
if content_type is None or content_type.lower().startswith(
"application/json"
):
overlay_fields = json.loads(self.request.body)
except json.decoder.JSONDecodeError:
self.logger.error(
f"Unable to decode non-JSON body but content-Type specified json: {self.request.body=}"
)
fields.update(overlay_fields)
self._form_fields = fields
return self._form_fields