import asyncio
from datetime import datetime
from enum import Enum
import functools
import logging
from typing import Optional
from q2_sdk.core import contexts
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core.configuration import settings
from q2_sdk.core.request_handler.templating import LocalAssetLoader
from q2_sdk.core.non_http_handlers.udp import writer
from q2_sdk.tools.utils import to_bool
DURATION_IN_SECONDS = 300
class TemporaryTask:
def __init__(self, task, duration):
self.task = task
self.start_time = datetime.now()
self.duration = duration
@property
def remaining_time(self):
diff = datetime.now() - self.start_time
return self.duration - int(diff.total_seconds())
def cancel(self):
self.task.cancel()
[docs]
class TaskType(Enum):
LogLevel = "LogLevel"
FullResponse = "FullResponse"
[docs]
class LoggingHandler(Q2BaseRequestHandler):
"""
Endpoint to temporarily change the server's LogLevel at runtime
Allows for setting a single override at a time. If an override is set,
begins a timer as a separate asyncio Task, which will reset the LogLevel
to default on expiration.
"""
LOG_LEVEL_TASK: Optional[TemporaryTask] = None
FULL_RESPONSE_TASK: Optional[TemporaryTask] = None
[docs]
def get(self):
"""UI interface for interacting with this endpoint"""
current_log_level = self.get_current_log_level()
current_log_response = self.get_current_log_response()
html = self.get_template(
"index.html.jinja2",
{
"log_level": current_log_level,
"log_response": current_log_response,
"log_level_task": LoggingHandler.LOG_LEVEL_TASK,
"full_response_task": LoggingHandler.FULL_RESPONSE_TASK,
},
)
html = LocalAssetLoader(self.templater, html).expand_local_sources()
self.write(html)
def get_current_log_level(self):
if settings.TEMPORARY_LOG_LEVEL:
return logging.getLevelName(settings.TEMPORARY_LOG_LEVEL)
return settings.LOGGING_LEVEL
def get_current_log_response(self):
if settings.TEMPORARY_LOG_RESPONSE_ENABLE:
return True
return False
[docs]
async def post(self):
"""Business logic. Takes care of changing log levels and cleanup"""
requested_level = self.get_body_argument("log_level", None)
log_response = self.get_body_argument("log_response", None)
if all((requested_level is None, log_response is None)):
self.redirect("logging")
return
if requested_level:
requested_level = int(requested_level)
self.logger.info("New LogLevel: %s", logging.getLevelName(requested_level))
if settings.FORK_REQUESTS:
writer.send_msg(writer.MsgType.Logging, requested_level)
else:
self.set(TaskType.LogLevel, requested_level=requested_level)
if log_response:
log_response = to_bool(int(log_response))
self.logger.info("New LogResponse Setting: %s", log_response)
self.set(TaskType.FullResponse, enable=log_response)
self.redirect("logging")
@staticmethod
def set(task_type: TaskType, **kwargs):
match task_type: # noqa: E999
case TaskType.LogLevel:
LoggingHandler._change_server_log_level(kwargs["requested_level"])
case TaskType.FullResponse:
LoggingHandler._set_full_response(kwargs["enable"])
@staticmethod
def _set_full_response(enable: bool):
LoggingHandler._cancel_existing_task(TaskType.FullResponse)
settings.TEMPORARY_LOG_RESPONSE_ENABLE = enable
LoggingHandler._start_reset_timer(TaskType.FullResponse)
@staticmethod
def _cancel_existing_task(task_type: TaskType):
match task_type: # noqa: E999
case TaskType.LogLevel:
if LoggingHandler.LOG_LEVEL_TASK:
LoggingHandler.LOG_LEVEL_TASK.cancel()
case TaskType.FullResponse:
if LoggingHandler.FULL_RESPONSE_TASK:
LoggingHandler.FULL_RESPONSE_TASK.cancel()
@staticmethod
def _change_server_log_level(requested_level):
LoggingHandler._cancel_existing_task(TaskType.LogLevel)
settings.TEMPORARY_LOG_LEVEL = requested_level
LoggingHandler._start_reset_timer(TaskType.LogLevel)
@staticmethod
def _start_reset_timer(task_type: TaskType):
current_request = contexts.get_current_request(raise_if_none=False)
if current_request:
logger = current_request.request_handler.logger
else:
logger = logging.getLogger("q2_sdk.general")
task = asyncio.Task(asyncio.sleep(DURATION_IN_SECONDS))
match task_type: # noqa: E999
case TaskType.LogLevel:
LoggingHandler.LOG_LEVEL_TASK = TemporaryTask(task, DURATION_IN_SECONDS)
logger.info(
f"Manually changing LogLevel for {DURATION_IN_SECONDS} seconds"
)
case TaskType.FullResponse:
LoggingHandler.FULL_RESPONSE_TASK = TemporaryTask(
task, DURATION_IN_SECONDS
)
logger.info(
f"Starting {DURATION_IN_SECONDS} second timer for full response logger"
)
asyncio.ensure_future(task)
task.add_done_callback(
functools.partial(LoggingHandler._reset_log_level, task_type=task_type)
)
@staticmethod
def _reset_log_level(task, task_type: TaskType):
current_request = contexts.get_current_request(raise_if_none=False)
if current_request:
logger = current_request.request_handler.logger
else:
logger = logging.getLogger("q2_sdk.general")
match task_type: # noqa: E999
case TaskType.LogLevel:
if LoggingHandler.LOG_LEVEL_TASK.task == task:
logger.info("Resetting LogLevel to default")
settings.TEMPORARY_LOG_LEVEL = None
LoggingHandler.LOG_LEVEL_TASK = None
case TaskType.FullResponse:
if LoggingHandler.FULL_RESPONSE_TASK.task == task:
logger.info("Resetting FullResponse to default")
settings.TEMPORARY_LOG_RESPONSE_ENABLE = None
LoggingHandler.FULL_RESPONSE_TASK = None