Source code for q2_sdk.extensions.Logging.extension

import asyncio
from datetime import datetime
from enum import Enum
import functools
import logging
from typing import Optional

from q2_sdk.core import contexts
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core.configuration import settings
from q2_sdk.core.request_handler.templating import LocalAssetLoader
from q2_sdk.core.non_http_handlers.udp import writer
from q2_sdk.tools.utils import to_bool

DURATION_IN_SECONDS = 300


class TemporaryTask:
    def __init__(self, task, duration):
        self.task = task
        self.start_time = datetime.now()
        self.duration = duration

    @property
    def remaining_time(self):
        diff = datetime.now() - self.start_time
        return self.duration - int(diff.total_seconds())

    def cancel(self):
        self.task.cancel()


[docs] class TaskType(Enum): LogLevel = "LogLevel" FullResponse = "FullResponse"
[docs] class LoggingHandler(Q2BaseRequestHandler): """ Endpoint to temporarily change the server's LogLevel at runtime Allows for setting a single override at a time. If an override is set, begins a timer as a separate asyncio Task, which will reset the LogLevel to default on expiration. """ LOG_LEVEL_TASK: Optional[TemporaryTask] = None FULL_RESPONSE_TASK: Optional[TemporaryTask] = None
[docs] def get(self): """UI interface for interacting with this endpoint""" current_log_level = self.get_current_log_level() current_log_response = self.get_current_log_response() html = self.get_template( "index.html.jinja2", { "log_level": current_log_level, "log_response": current_log_response, "log_level_task": LoggingHandler.LOG_LEVEL_TASK, "full_response_task": LoggingHandler.FULL_RESPONSE_TASK, }, ) html = LocalAssetLoader(self.templater, html).expand_local_sources() self.write(html)
def get_current_log_level(self): if settings.TEMPORARY_LOG_LEVEL: return logging.getLevelName(settings.TEMPORARY_LOG_LEVEL) return settings.LOGGING_LEVEL def get_current_log_response(self): if settings.TEMPORARY_LOG_RESPONSE_ENABLE: return True return False
[docs] async def post(self): """Business logic. Takes care of changing log levels and cleanup""" requested_level = self.get_body_argument("log_level", None) log_response = self.get_body_argument("log_response", None) if all((requested_level is None, log_response is None)): self.redirect("logging") return if requested_level: requested_level = int(requested_level) self.logger.info("New LogLevel: %s", logging.getLevelName(requested_level)) if settings.FORK_REQUESTS: writer.send_msg(writer.MsgType.Logging, requested_level) else: self.set(TaskType.LogLevel, requested_level=requested_level) if log_response: log_response = to_bool(int(log_response)) self.logger.info("New LogResponse Setting: %s", log_response) self.set(TaskType.FullResponse, enable=log_response) self.redirect("logging")
@staticmethod def set(task_type: TaskType, **kwargs): match task_type: # noqa: E999 case TaskType.LogLevel: LoggingHandler._change_server_log_level(kwargs["requested_level"]) case TaskType.FullResponse: LoggingHandler._set_full_response(kwargs["enable"]) @staticmethod def _set_full_response(enable: bool): LoggingHandler._cancel_existing_task(TaskType.FullResponse) settings.TEMPORARY_LOG_RESPONSE_ENABLE = enable LoggingHandler._start_reset_timer(TaskType.FullResponse) @staticmethod def _cancel_existing_task(task_type: TaskType): match task_type: # noqa: E999 case TaskType.LogLevel: if LoggingHandler.LOG_LEVEL_TASK: LoggingHandler.LOG_LEVEL_TASK.cancel() case TaskType.FullResponse: if LoggingHandler.FULL_RESPONSE_TASK: LoggingHandler.FULL_RESPONSE_TASK.cancel() @staticmethod def _change_server_log_level(requested_level): LoggingHandler._cancel_existing_task(TaskType.LogLevel) settings.TEMPORARY_LOG_LEVEL = requested_level LoggingHandler._start_reset_timer(TaskType.LogLevel) @staticmethod def _start_reset_timer(task_type: TaskType): current_request = contexts.get_current_request(raise_if_none=False) if current_request: logger = current_request.request_handler.logger else: logger = logging.getLogger("q2_sdk.general") task = asyncio.Task(asyncio.sleep(DURATION_IN_SECONDS)) match task_type: # noqa: E999 case TaskType.LogLevel: LoggingHandler.LOG_LEVEL_TASK = TemporaryTask(task, DURATION_IN_SECONDS) logger.info( f"Manually changing LogLevel for {DURATION_IN_SECONDS} seconds" ) case TaskType.FullResponse: LoggingHandler.FULL_RESPONSE_TASK = TemporaryTask( task, DURATION_IN_SECONDS ) logger.info( f"Starting {DURATION_IN_SECONDS} second timer for full response logger" ) asyncio.ensure_future(task) task.add_done_callback( functools.partial(LoggingHandler._reset_log_level, task_type=task_type) ) @staticmethod def _reset_log_level(task, task_type: TaskType): current_request = contexts.get_current_request(raise_if_none=False) if current_request: logger = current_request.request_handler.logger else: logger = logging.getLogger("q2_sdk.general") match task_type: # noqa: E999 case TaskType.LogLevel: if LoggingHandler.LOG_LEVEL_TASK.task == task: logger.info("Resetting LogLevel to default") settings.TEMPORARY_LOG_LEVEL = None LoggingHandler.LOG_LEVEL_TASK = None case TaskType.FullResponse: if LoggingHandler.FULL_RESPONSE_TASK.task == task: logger.info("Resetting FullResponse to default") settings.TEMPORARY_LOG_RESPONSE_ENABLE = None LoggingHandler.FULL_RESPONSE_TASK = None