import argparse
import hashlib
import logging
import sys
from typing import List, TYPE_CHECKING, Optional
from q2_sdk.core import cache
from q2_sdk.core.cache import Q2CacheClient
from q2_sdk.core.configuration import settings
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_sdk.core import vault
if TYPE_CHECKING: # pragma: no cover
from q2_sdk.hq.db.db_object_factory import DbObjectFactory
[docs]
class BaseEntryPoint:
"""
This is largely based off of the way Django organizes its Management Commands.
We do not need the power nor the flexibility that Django provides here,
but it's such a great pattern, it seems a shame to reinvent the wheel.
Basic idea:
When running ``q2`` at the command line, each file in the entrypoints folder of the
SDK that ends in .py and does not start with _ is loaded. The parser is built off of
what lives in add_arguments, while the action when run is determined by what lives in
the handle function.
name: The command line parameter to be called (name=inspect means ``q2 inspect``)
help: Help message at the command line
run_forever: If True, the asyncio loop will not exit after completion of handle function
"""
name = ""
help = ""
run_forever = False
raise_on_unknown_args = True
LOGGING_FILTERS: List[logging.Filter] = []
def __init__(self):
self.logger: Optional[logging.LoggerAdapter] = None
self.parser: Optional[argparse.ArgumentParser] = None
self.hq_credentials: HqCredentials = settings.HQ_CREDENTIALS
self._cache = None
@property
def extension_name(self):
split = self.__module__.split(".")
return ".".join(split[:-1])
@property
def vault(self) -> vault.Q2Vault:
return vault.get_client(logger=self.logger)
@property
def cache(self) -> Q2CacheClient:
"""
Instantiates a communication object between this extension and the pymemcache library.
Cache can be configured via configuration.settings.CACHE.
:return: Instantiated pymemcache.cache.Client object
"""
if not self._cache:
self._cache = cache.get_cache(logger=self.logger)
return self._cache
@property
def db(self) -> "DbObjectFactory":
from q2_sdk.hq.db.db_object_factory import DbObjectFactory
return DbObjectFactory(self.logger, self.hq_credentials)
def create_parser(self, subparsers) -> argparse.ArgumentParser:
sub_parser = subparsers.add_parser(
self.name, help=self.help, description=self.help
)
sub_parser.add_argument(
"-l",
"--logging-level",
dest="logging_level",
default=settings.LOGGING_LEVEL,
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
type=str.upper,
help="Logging Level",
)
sub_parser.add_argument(
"--completion",
dest="generate_completion",
action="store_true",
help="Generates completion cache file",
)
self.add_arguments(sub_parser)
self.parser = sub_parser
return sub_parser
[docs]
async def async_run_forever_cleanup(self):
"""
Hook as the run forever loop is terminating
"""
[docs]
def run_forever_cleanup(self):
"""
Hook for any cleanup necessary after run forever loop is terminated
"""
[docs]
def add_arguments(self, parser: argparse.ArgumentParser):
"""
Hook for subclassed EntryPoints to add custom arguments.
"""
[docs]
def setup(self, logger):
"""
Registers a logger in self.logger with all appropriate filters
"""
# Add Global filters
for log_filter in settings.GLOBAL_PERIODIC_JOB_LOGGING_FILTERS:
logger.addFilter(log_filter)
# Add entrypoint specific filters
for log_filter in self.LOGGING_FILTERS:
logger.addFilter(log_filter)
self.logger = logger
[docs]
def generate_completion(self):
"""
Works with ``q2 install_completion`` to create
completion file in ``<root>/.antilles/entrypoints/<self.name>``
"""
optional_args_group = self.parser._optionals
actions = optional_args_group._actions
options = [",".join(x.option_strings) for x in actions if x.option_strings]
subparsers = []
for action in actions:
if isinstance(action, argparse._SubParsersAction):
for key in action.choices.keys():
subparsers.append(key)
options = ",".join(options)
subparsers = ",".join(subparsers)
description = self.help
path = sys.modules[self.__module__].__file__
import_name = sys.modules[self.__module__].__name__
with open(path) as handle:
file_hash = hashlib.sha1(handle.read().encode()).hexdigest()
output = [
"path: {}".format(path),
"import_name: {}".format(import_name),
"hash: {}".format(file_hash),
"options: {}".format(options),
"subparsers: {}".format(subparsers),
"description: {}".format(description),
]
return "\n".join(output)
[docs]
async def handle(self, *args, **kwargs):
"""
The actual logic of the command. Subclasses must implement
this method.
"""
raise NotImplementedError(
"subclasses of BaseEntryPoint must provide a handle() method"
)