Source code for q2_sdk.core.http_handlers.q2_config_handler

import json
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import TectonError
from q2_sdk.core.http_handlers.tecton_client_handler import Q2TectonClientRequestHandler
from q2_sdk.models.q2config_shapes import (
    Q2ConfigRequestData,
    Q2ConfigPromoteRequestData,
)
from q2_sdk.models.tecton import InternalServerError, Success


[docs] class Q2ConfigRequestHandler(Q2TectonClientRequestHandler):
[docs] def initialize(self, routing_key=None): """Handle extra keyword arguments passed via Tornado's URLSpec.""" self.routing_key = routing_key # Store routing_key in the instance
[docs] async def prepare(self): """Prepare request and initialize customer-specific HQ credentials.""" await super().prepare() self.logger.info("Preparing request for routing key") try: body = json.loads(self.request.body) if self.request.body else {} customer_key = body.get("customerKey", "") self.form_fields["q2configRequestData"] = body except json.JSONDecodeError: customer_key = "" self.logger.info(f"Customer Key: {customer_key}") self.logger.info(f"Settings Vault Key: {settings.VAULT_KEY}") if customer_key and customer_key != settings.VAULT_KEY: supplied_hq = self._get_hq_from_key(customer_key) self.logger.info(f"Supplied HQ Credentials: {supplied_hq}") if supplied_hq: self.hq_credentials = supplied_hq if hasattr(self, "form_fields"): self.form_fields["routing_key"] = self.routing_key
[docs] async def q2_post(self, *args, **kwargs): """ Overrides the q2_post method Q2Config expects an error status to mark a publish as failed Extensions should raise a TectonError to return an error response """ self.form_fields = self._parse_tecton_payload() self.set_header("Content-Type", "application/json") try: route_response = await self.route_request() except TectonError as exc: """ args: 0: error_message 1: response_data 2: status_code """ self.logger.error(exc) self.set_status(exc.args[2] if len(exc.args) > 2 else 500) return json.dumps( InternalServerError( exc.args[0], exc.args[1] if len(exc.args) > 1 else None ).to_json() ) if isinstance(route_response, dict): json_dump = json.dumps(Success(route_response).to_json()) elif isinstance(route_response, str): json_dump = route_response else: json_dump = json.dumps(vars(route_response)) self.logger.debug("Tecton response: %s", json_dump) return json_dump if json_dump else ""
@property def router(self): """ Your extension's routing map. To handle a request, a method must be listed here. When a POST request is made to this extension with a routing_key value, the extension will route the request to the method linked to that key. The methods referenced here are defined below. """ router = super().router router.update({ "publish": self.publish, "promote": self.promote, "delete": self.delete, }) return router async def publish(self, q2configRequestData: Q2ConfigRequestData): raise NotImplementedError async def promote(self, q2configRequestData: Q2ConfigPromoteRequestData): raise NotImplementedError async def delete(self, q2configRequestData: Q2ConfigRequestData): raise NotImplementedError