import json
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.tecton_client_handler import Q2TectonClientRequestHandler
from q2_sdk.models.q2config_shapes import (
Q2ConfigRequestData,
Q2ConfigPromoteRequestData,
)
from q2_sdk.models.tecton import InternalServerError, Success
[docs]
class Q2ConfigRequestHandler(Q2TectonClientRequestHandler):
[docs]
def initialize(self, routing_key=None):
"""Handle extra keyword arguments passed via Tornado's URLSpec."""
self.routing_key = routing_key # Store routing_key in the instance
[docs]
async def prepare(self):
"""Prepare request and initialize customer-specific HQ credentials."""
await super().prepare()
self.logger.info("Preparing request for routing key")
try:
body = json.loads(self.request.body) if self.request.body else {}
customer_key = body.get("customerKey", "")
self.form_fields["q2configRequestData"] = body
except json.JSONDecodeError:
customer_key = ""
self.logger.info(f"Customer Key: {customer_key}")
self.logger.info(f"Settings Vault Key: {settings.VAULT_KEY}")
if customer_key and customer_key != settings.VAULT_KEY:
supplied_hq = self._get_hq_from_key(customer_key)
self.logger.info(f"Supplied HQ Credentials: {supplied_hq}")
if supplied_hq:
self.hq_credentials = supplied_hq
if hasattr(self, "form_fields"):
self.form_fields["routing_key"] = self.routing_key
[docs]
async def q2_post(self, *args, **kwargs):
"""
Overrides the q2_post method
Q2Config expects an error status to mark a publish as failed
Extensions should raise a TectonError to return an error response
"""
self.form_fields = self._parse_tecton_payload()
self.set_header("Content-Type", "application/json")
try:
route_response = await self.route_request()
except TectonError as exc:
"""
args:
0: error_message
1: response_data
2: status_code
"""
self.logger.error(exc)
self.set_status(exc.args[2] if len(exc.args) > 2 else 500)
return json.dumps(
InternalServerError(
exc.args[0], exc.args[1] if len(exc.args) > 1 else None
).to_json()
)
if isinstance(route_response, dict):
json_dump = json.dumps(Success(route_response).to_json())
elif isinstance(route_response, str):
json_dump = route_response
else:
json_dump = json.dumps(vars(route_response))
self.logger.debug("Tecton response: %s", json_dump)
return json_dump if json_dump else ""
@property
def router(self):
"""
Your extension's routing map. To handle a request, a method must be listed here. When a POST request is
made to this extension with a routing_key value, the extension will route the request to the method linked
to that key. The methods referenced here are defined below.
"""
router = super().router
router.update({
"publish": self.publish,
"promote": self.promote,
"delete": self.delete,
})
return router
async def publish(self, q2configRequestData: Q2ConfigRequestData):
raise NotImplementedError
async def promote(self, q2configRequestData: Q2ConfigPromoteRequestData):
raise NotImplementedError
async def delete(self, q2configRequestData: Q2ConfigRequestData):
raise NotImplementedError