Source code for q2_sdk.extensions.Admin.extension

"""
Admin Extension provides an interface to do server level operations
"""

import os
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core.request_handler.templating import LocalAssetLoader
from q2_sdk.core.configuration import settings


[docs] class AdminHandler(Q2BaseRequestHandler): def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.terminate_on_completion = False
[docs] async def get(self): """UI interface for interacting with this endpoint""" recent_keys = self.vault.RECENT_KEYS html = self.get_template("index.html.jinja2", {"recent_keys": recent_keys}) html = LocalAssetLoader(self.templater, html).expand_local_sources() self.write(html)
[docs] async def post(self): """API Interface for interacting with this endpoint""" action = self.get_body_argument("action") output = "No action taken" if action == "terminate": self.logger.critical("Received request to terminate from /admin POST") self.terminate_on_completion = True output = "Terminated" self.write(output)
[docs] def on_finish(self): if self.terminate_on_completion: pid_to_kill = os.getpid() if settings.FORK_REQUESTS: pid_to_kill = os.getppid() os.kill(pid_to_kill, 15)