Source code for q2_sdk.core.entrypoint

import argparse
import hashlib
import logging
import sys
from typing import List, TYPE_CHECKING, Optional

from q2_sdk.core import cache
from q2_sdk.core.cache import Q2CacheClient
from q2_sdk.core.configuration import settings
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.core import vault

if TYPE_CHECKING:  # pragma: no cover
    from q2_sdk.hq.db.db_object_factory import DbObjectFactory


[docs] class BaseEntryPoint: """ This is largely based off of the way Django organizes its Management Commands. We do not need the power nor the flexibility that Django provides here, but it's such a great pattern, it seems a shame to reinvent the wheel. Basic idea: When running ``q2`` at the command line, each file in the entrypoints folder of the SDK that ends in .py and does not start with _ is loaded. The parser is built off of what lives in add_arguments, while the action when run is determined by what lives in the handle function. name: The command line parameter to be called (name=inspect means ``q2 inspect``) help: Help message at the command line run_forever: If True, the asyncio loop will not exit after completion of handle function """ name = "" help = "" run_forever = False raise_on_unknown_args = True LOGGING_FILTERS: List[logging.Filter] = [] def __init__(self): self.logger: Optional[logging.LoggerAdapter] = None self.parser: Optional[argparse.ArgumentParser] = None self.hq_credentials: HqCredentials = settings.HQ_CREDENTIALS self._cache = None @property def extension_name(self): split = self.__module__.split(".") return ".".join(split[:-1]) @property def vault(self) -> vault.Q2Vault: return vault.get_client(logger=self.logger) @property def cache(self) -> Q2CacheClient: """ Instantiates a communication object between this extension and the pymemcache library. Cache can be configured via configuration.settings.CACHE. :return: Instantiated pymemcache.cache.Client object """ if not self._cache: self._cache = cache.get_cache(logger=self.logger) return self._cache @property def db(self) -> "DbObjectFactory": from q2_sdk.hq.db.db_object_factory import DbObjectFactory return DbObjectFactory(self.logger, self.hq_credentials) def create_parser(self, subparsers) -> argparse.ArgumentParser: sub_parser = subparsers.add_parser( self.name, help=self.help, description=self.help ) sub_parser.add_argument( "-l", "--logging-level", dest="logging_level", default=settings.LOGGING_LEVEL, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], type=str.upper, help="Logging Level", ) sub_parser.add_argument( "--completion", dest="generate_completion", action="store_true", help="Generates completion cache file", ) self.add_arguments(sub_parser) self.parser = sub_parser return sub_parser
[docs] async def async_run_forever_cleanup(self): """ Hook as the run forever loop is terminating """
[docs] def run_forever_cleanup(self): """ Hook for any cleanup necessary after run forever loop is terminated """
[docs] def add_arguments(self, parser: argparse.ArgumentParser): """ Hook for subclassed EntryPoints to add custom arguments. """
[docs] def setup(self, logger): """ Registers a logger in self.logger with all appropriate filters """ # Add Global filters for log_filter in settings.GLOBAL_PERIODIC_JOB_LOGGING_FILTERS: logger.addFilter(log_filter) # Add entrypoint specific filters for log_filter in self.LOGGING_FILTERS: logger.addFilter(log_filter) self.logger = logger
[docs] def generate_completion(self): """ Works with ``q2 install_completion`` to create completion file in ``<root>/.antilles/entrypoints/<self.name>`` """ optional_args_group = self.parser._optionals actions = optional_args_group._actions options = [",".join(x.option_strings) for x in actions if x.option_strings] subparsers = [] for action in actions: if isinstance(action, argparse._SubParsersAction): for key in action.choices.keys(): subparsers.append(key) options = ",".join(options) subparsers = ",".join(subparsers) description = self.help path = sys.modules[self.__module__].__file__ import_name = sys.modules[self.__module__].__name__ with open(path) as handle: file_hash = hashlib.sha1(handle.read().encode()).hexdigest() output = [ "path: {}".format(path), "import_name: {}".format(import_name), "hash: {}".format(file_hash), "options: {}".format(options), "subparsers: {}".format(subparsers), "description: {}".format(description), ] return "\n".join(output)
[docs] async def handle(self, *args, **kwargs): """ The actual logic of the command. Subclasses must implement this method. """ raise NotImplementedError( "subclasses of BaseEntryPoint must provide a handle() method" )