Source code for q2_sdk.extensions.Default.extension

"""
Fallback for any requests that don't match anything else installed
"""

import asyncio
import os

from tornado.web import HTTPError, RequestHandler

from q2_sdk.core import configuration
from q2_sdk.core.opentelemetry.handler import SpanContext


[docs] class DefaultHandler(RequestHandler): """ Returns a 404 Not Found for all requests. Similar to Tornado's base behavior, but provides a hook for custom cleanup logic specific to the SDK framework. """ def __init__(self, application, request, **kwargs): self.extension_name = "Unimplemented" super().__init__(application, request, **kwargs) def _unimplemented_method(self, *args, **kwargs): raise HTTPError(404) async def _execute(self, transforms, *args: bytes, **kwargs: bytes) -> None: """ This is the last function before the request ends. Also a convenient place to kill the process if running forked. """ with SpanContext(self): await super()._execute(transforms, *args, **kwargs) if configuration.settings.FORK_REQUESTS: this_pid = os.getpid() pid_path = f"{configuration.settings.FORKED_CHILD_PID_DIR}/{this_pid}" while not self.request.server_connection.stream.closed(): await asyncio.sleep(0.1) if this_pid != configuration.settings.SERVER_PID: os.remove(pid_path) exit(0) head = _unimplemented_method get = _unimplemented_method post = _unimplemented_method delete = _unimplemented_method patch = _unimplemented_method put = _unimplemented_method options = _unimplemented_method