import inspect
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict
import q2_sdk
from q2_sdk.core import configuration
from q2_sdk.core.q2_logging.logger import Q2LoggerAdapter
from .entrypoint import BaseEntryPoint
[docs]
class PeriodicJobEntryPoint(BaseEntryPoint):
"""
Same as BaseEntryPoint but specialized for registering
with a Cron-like system. Adds more logging and defined
success/failure hooks for easier debugging when run
overnight, for instance.
"""
[docs]
def setup(self, logger):
"""
Ensures periodic jobs will log in the format of request handlers
"""
_logger = logging.getLogger(f"periodic.{self.name}")
author = "Q2"
if configuration.settings.IS_CUSTOMER_CREATED:
author = "Cust"
extra = {"guid": uuid.uuid4().hex, "author": author}
# Add Global filters
for log_filter in configuration.settings.GLOBAL_PERIODIC_JOB_LOGGING_FILTERS:
_logger.addFilter(log_filter)
# Add entrypoint specific filters
for log_filter in self.LOGGING_FILTERS:
_logger.addFilter(log_filter)
logging_level = logging.getLevelName(configuration.settings.LOGGING_LEVEL)
_logger.setLevel(logging_level)
self.logger = Q2LoggerAdapter(_logger, extra)
[docs]
async def run(self):
"""
Code here will be executed when periodic job starts
"""
raise NotImplementedError(
"subclasses of PeriodicJobEntryPoint must provide a run() method"
)
[docs]
async def handle(self, *args, **kwargs):
try:
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M")
self.logger.info("SDK Version: %s", q2_sdk.__version__)
if configuration.settings.IMAGE_TAG:
self.logger.info("Image Tag: %s", configuration.settings.IMAGE_TAG)
self.logger.info("job %s began execution at %s", self.name, now)
await self.on_pre_run()
result = await self.run()
await self.on_success(result)
self.logger.info("job %s successfully executed.", self.name)
except Exception as err:
self.logger.exception("job %s failed. Cleaning up...", self.name)
failure_dict = locals()
func_locals = inspect.trace()[-1][0].f_locals
failure_dict.update({
"self": vars(self),
"local_vars": {x: y for x, y in func_locals.items() if x != "self"},
})
self.logger.info(failure_dict)
self.logger.replay_buffer()
await self.on_failure(err, failure_dict)
# Need to reraise the exception to mark the job as failed in Nomad
raise
[docs]
async def on_pre_run(self):
"""
Called before run()
"""
[docs]
async def on_success(self, run_result):
"""
Called after run() if it did not raise an Exception.
If run() returns a value, it will be passed as the first parameter here.
"""
[docs]
async def on_failure(self, err: Exception, failure_dict: Dict[str, Any]):
"""
Called after run() if it raised an Exception.
By default, all local variables (failure_dict) will be logged on failure.
This hook exists if additional failure processing is desired.
"""