Source code for q2_sdk.core.entrypoint_periodic

import inspect
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict

import q2_sdk
from q2_sdk.core import configuration
from q2_sdk.core.q2_logging.logger import Q2LoggerAdapter

from .entrypoint import BaseEntryPoint


[docs] class PeriodicJobEntryPoint(BaseEntryPoint): """ Same as BaseEntryPoint but specialized for registering with a Cron-like system. Adds more logging and defined success/failure hooks for easier debugging when run overnight, for instance. """
[docs] def setup(self, logger): """ Ensures periodic jobs will log in the format of request handlers """ _logger = logging.getLogger(f"periodic.{self.name}") author = "Q2" if configuration.settings.IS_CUSTOMER_CREATED: author = "Cust" extra = {"guid": uuid.uuid4().hex, "author": author} # Add Global filters for log_filter in configuration.settings.GLOBAL_PERIODIC_JOB_LOGGING_FILTERS: _logger.addFilter(log_filter) # Add entrypoint specific filters for log_filter in self.LOGGING_FILTERS: _logger.addFilter(log_filter) logging_level = logging.getLevelName(configuration.settings.LOGGING_LEVEL) _logger.setLevel(logging_level) self.logger = Q2LoggerAdapter(_logger, extra)
[docs] async def run(self): """ Code here will be executed when periodic job starts """ raise NotImplementedError( "subclasses of PeriodicJobEntryPoint must provide a run() method" )
[docs] async def handle(self, *args, **kwargs): try: now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M") self.logger.info("SDK Version: %s", q2_sdk.__version__) if configuration.settings.IMAGE_TAG: self.logger.info("Image Tag: %s", configuration.settings.IMAGE_TAG) self.logger.info("job %s began execution at %s", self.name, now) await self.on_pre_run() result = await self.run() await self.on_success(result) self.logger.info("job %s successfully executed.", self.name) except Exception as err: self.logger.exception("job %s failed. Cleaning up...", self.name) failure_dict = locals() func_locals = inspect.trace()[-1][0].f_locals failure_dict.update({ "self": vars(self), "local_vars": {x: y for x, y in func_locals.items() if x != "self"}, }) self.logger.info(failure_dict) self.logger.replay_buffer() await self.on_failure(err, failure_dict) # Need to reraise the exception to mark the job as failed in Nomad raise
[docs] async def on_pre_run(self): """ Called before run() """
[docs] async def on_success(self, run_result): """ Called after run() if it did not raise an Exception. If run() returns a value, it will be passed as the first parameter here. """
[docs] async def on_failure(self, err: Exception, failure_dict: Dict[str, Any]): """ Called after run() if it raised an Exception. By default, all local variables (failure_dict) will be logged on failure. This hook exists if additional failure processing is desired. """