Source code for q2_sdk.extensions.Inspect.extension

"""
Inspect Extension returns information about the current configuration
"""

import json
from dataclasses import fields
from importlib import import_module

import q2_sdk
from q2_sdk.core.configuration import settings
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core.http_handlers.handler_types import handler_to_extension_type
from q2_sdk.core.http_handlers.q2_config_handler import Q2ConfigRequestHandler
from q2_sdk.core.install_steps.base import InstallStep
from q2_sdk.core.install_steps.db_plan import DbPlan
from q2_sdk.hq.models.db_config.db_config_list import DbConfigList
from q2_sdk.hq.models.instant_payments import InstantPaymentsParams
from q2_sdk.models.recursive_encoder import RecursiveEncoder
from q2_sdk.q2config import utils as q2config_utils
from q2_sdk.tools import utils


def _pattern_matches_extension(extension, pattern):
    endpoints = utils.get_http_extension_urls(extension)
    for endpoint in endpoints:
        if endpoint.regex.match(f"/{pattern}"):
            return True
    return False


def _normalize_extension_name(extension_name):
    installed_extensions = utils.get_installed_extensions_as_modules()
    if extension_name in installed_extensions:
        return extension_name

    for configured_extension in installed_extensions:
        match = _pattern_matches_extension(configured_extension, extension_name)
        if match:
            extension_name = configured_extension
            break
    return extension_name


[docs] class InspectHandler(Q2BaseRequestHandler): def __init__(self, application, request, **kwargs): super().__init__(application, request, **kwargs) self.enable_summary_log = False
[docs] def set_default_headers(self): self.set_header("Access-Control-Allow-Origin", "*") self.set_header( "Access-Control-Allow-Methods", "POST, GET, PUT, OPTIONS, DELETE" )
def options(self, *args, **kwargs): self.set_status(204) self.finish() def get(self, extension_name=None): if not extension_name: self.return_server_info() else: self.return_extension_info(extension_name)
[docs] def return_server_info(self): """Returns configured information for the running server""" sdk_version = q2_sdk.__version__ repository_version = settings.REPOSITORY_VERSION docker_image_tag = settings.IMAGE_TAG endpoints = [] frontend_dict = {} installed_extensions = utils.get_installed_extensions() extension_list = installed_extensions + settings.DEFAULT_EXTENSIONS for extension in extension_list: endpoints.extend(utils.get_http_extension_urls(extension)) assets_endpoint = utils.get_assets_endpoint(extension) if assets_endpoint: endpoints.append(assets_endpoint) feature_manifest = utils.get_feature_manifest(extension) if feature_manifest: frontend_dict.update({extension: feature_manifest}) response_format = "text/html" endpoint_set = sorted(list(set([url.regex.pattern for url in endpoints]))) if any([ self.get_query_argument("type", "") == "json", self.get_query_argument("format", "") == "json", self.request.headers.get("Accept", "").startswith("application/json"), ]): response_format = "application/json" if response_format == "application/json": self.set_header("Content-Type", "application/json") output = { "antilles_version": sdk_version, "repository_version": repository_version, "docker_image_tag": docker_image_tag, "is_customer_created": settings.IS_CUSTOMER_CREATED, "company": settings.COMPANY, "fork_requests": settings.FORK_REQUESTS, "hq": settings.HQ_CREDENTIALS.hq_url, "hq_list": settings.HQ_CREDENTIALS.hq_url_list, "aba": settings.HQ_CREDENTIALS.aba, "core": settings.CORE, "cache": settings.CACHE, "installed_extensions": installed_extensions, "installed_entrypoints": utils.get_installed_entrypoint_names(), "endpoints": endpoint_set, "frontend": frontend_dict, "multitenant": settings.MULTITENANT, } else: output = [ "<b>Q2 Antilles Version:</b> {0}".format(sdk_version), "<b>Repository Version:</b> {0}".format(repository_version), "<b>Docker Image Tag</b> {0}".format(docker_image_tag), "<b>Is Customer Created:</b> {0}".format(settings.IS_CUSTOMER_CREATED), "<b>Company:</b> {0}".format(settings.COMPANY), "<b>Request Forking: </b> {0}".format(settings.FORK_REQUESTS), "<b>Debug:</b> {0}".format(settings.DEBUG), "<b>Configured HQ(s):</b> {}".format( settings.HQ_CREDENTIALS.hq_url_list ), "<b>Active HQ:</b> {}".format(settings.HQ_CREDENTIALS.hq_url), "<b>Configured ABA:</b> {}".format(settings.HQ_CREDENTIALS.aba), "<b>Core:</b> {}".format(settings.CORE), "<b>Cache:</b> {}".format(settings.CACHE), "<b>Multitenant:</b> {}".format(settings.MULTITENANT), "<b>Registered Extensions:</b> {}".format(installed_extensions), "<b>Registered Endpoints:</b> {}".format(endpoint_set), "<b>Registered Entrypoints:</b> {}".format( utils.get_installed_entrypoint_names() ), ] output = "<br>".join(output) self.write(output)
def return_extension_info(self, extension_name): show_details = self.get_query_argument("detail", "") == "1" extension_name = _normalize_extension_name(extension_name) installed_extensions = utils.get_installed_extensions_as_modules() if extension_name not in installed_extensions: self.set_status(404, "This is not an installed extension") self.write( "<b>ERROR: </b> '{}' is not an installed extension".format( extension_name ) ) self.write("<br>") self.write("<b>Installed Extensions:</b> {}".format(installed_extensions)) return urls = utils.get_http_extension_urls(extension_name) handler_class = urls[0].handler_class results = dict() feature_manifest = utils.get_feature_manifest(extension_name) if feature_manifest: if getattr(handler_class, "IS_MFA", False): feature_manifest["isMFA"] = True if issubclass(handler_class, Q2ConfigRequestHandler): extension_config = q2config_utils.get_extension_config(extension_name) results.update({extension_name: extension_config}) else: results.update({"frontend": feature_manifest}) db_plan = get_db_plan_from_handler(handler_class) if db_plan == {}: db_plan_module = import_module("q2_sdk.core.install_steps.db_plan") db_plan = db_plan_module.DbPlan() if db_plan: results.update(parse_db_plan(db_plan)) db_configs = getattr(handler_class, "WEDGE_ADDRESS_CONFIGS", {}) if isinstance(db_configs, dict): opt_wa_configs = getattr( handler_class, "OPTIONAL_WEDGE_ADDRESS_CONFIGS", {} ) db_configs = DbConfigList.from_dict(db_configs, opt_wa_configs) if show_details: results["db_configurations"] = db_configs.required results["optional_db_configurations"] = db_configs.optional else: results["db_configurations"] = { x.name: x.default for x in db_configs.required } results["optional_db_configurations"] = { x.name: x.default for x in db_configs.optional } results["core_configuration_options"] = {"None": {}} if getattr(handler_class, "DYNAMIC_CORE_SELECTION", False): results["core_configuration_options"] = utils.get_core_config_options() if not results["core_configuration_options"]: results["core_configuration_options"] = None results["is_customer_created"] = settings.IS_CUSTOMER_CREATED results["is_unauthenticated"] = getattr( handler_class, "IS_UNAUTHENTICATED", False ) results["company"] = settings.COMPANY extension_type = handler_to_extension_type(handler_class) results["extension_type"] = extension_type results["is_unauthenticated"] = getattr( handler_class, "IS_UNAUTHENTICATED", False ) results["base_assets_url"] = utils.get_base_assets_url(extension_name) if extension_type == "instant_payments": results["instant_payments_params_data"] = get_instant_payments_params_data( handler_class ) if extension_type == "sso": results["sso_config"] = get_sso_config( handler_class, show_details=show_details ) if extension_type in ["ardent", "caliper_api_custom"]: results["description"] = getattr(handler_class, "DESCRIPTION") if extension_type == "audit_action": blocked_actions = settings.BLOCKED_AUDIT_ACTIONS selected_actions = getattr(handler_class, "AUDIT_ACTIONS") allowed_actions = [x for x in selected_actions if x not in blocked_actions] results["audit_actions"] = allowed_actions results["friendly_name"] = getattr( handler_class, "FRIENDLY_NAME", extension_name ) results["property_long_name"] = getattr( handler_class, "PROPERTY_LONG_NAME", None ) results["is_mfa"] = getattr(handler_class, "IS_MFA", False) if getattr(handler_class, "IS_MFA", False): results["mfa_requires_registration"] = getattr( handler_class, "MFA_REQUIRES_REGISTRATION", False ) results["mfa_token_lifetime_in_minutes"] = getattr( handler_class, "MFA_TOKEN_LIFETIME_IN_MINUTES", 5 ) if issubclass(handler_class, Q2ConfigRequestHandler): results = {extension_name: results.get(extension_name)} self.write("{}".format(json.dumps(results, cls=RecursiveEncoder))) self.set_header("Content-Type", "application/json")
def get_sso_config(handler_class, show_details=False): results = dict() vendor_configs = getattr(handler_class, "VENDOR_CONFIGS", {}) if isinstance(vendor_configs, dict): vendor_configs = DbConfigList.from_dict(vendor_configs, {}) if show_details: results["vendor_configs"] = vendor_configs.db_configs else: results["vendor_configs"] = { x.name: x.default for x in vendor_configs.db_configs } results["render_in_new_window"] = getattr( handler_class, "RENDER_IN_NEW_WINDOW", True ) results["purge_account_list_on_success"] = getattr( handler_class, "PURGE_ACCOUNT_LIST_ON_SUCCESS", False ) results["send_user_info"] = getattr(handler_class, "SEND_USER_INFO", True) results["send_account_info"] = getattr(handler_class, "SEND_ACCOUNT_INFO", False) results["send_dep_only_accounts"] = getattr( handler_class, "SEND_DEP_ONLY_ACCOUNTS", False ) results["send_view_only_accounts"] = getattr( handler_class, "SEND_VIEW_ONLY_ACCOUNTS", False ) results["send_wdl_only_accounts"] = getattr( handler_class, "SEND_WDL_ONLY_ACCOUNTS", False ) results["send_dep_view_accounts"] = getattr( handler_class, "SEND_DEP_VIEW_ACCOUNTS", False ) results["send_dep_wdl_accounts"] = getattr( handler_class, "SEND_DEP_WDL_ACCOUNTS", False ) results["send_view_wdl_accounts"] = getattr( handler_class, "SEND_VIEW_WDL_ACCOUNTS", False ) results["send_dep_view_wdl_accounts"] = getattr( handler_class, "SEND_DEP_VIEW_WDL_ACCOUNTS", False ) return results def get_instant_payments_params_data(handler_class): results = dict() for field in fields(InstantPaymentsParams): results[field.name] = getattr(handler_class, field.name.upper()) return results def get_db_plan_from_handler(handler): db_plan_defaults = getattr(handler, "DB_PLAN", {}) if not db_plan_defaults: db_plan_defaults = {} return db_plan_defaults def parse_db_plan(db_plan: DbPlan): results = {} i_vars = vars(db_plan) results["db_plan"] = { k: i_vars[k] for k in i_vars.keys() if ( i_vars[k] and isinstance(i_vars[k], list) and all(isinstance(i, InstallStep) for i in i_vars[k]) ) } results["ui_text_prefix"] = getattr(db_plan, "ui_text_prefix") results["disallow_add_to_nav"] = getattr(db_plan, "disallow_add_to_nav") results["send_account_list"] = getattr(db_plan, "send_account_list") results["send_account_details"] = getattr(db_plan, "send_account_details") results["payload_stored_proc"] = getattr(db_plan, "payload_stored_proc") results["account_rights_bit_flag"] = getattr(db_plan, "account_rights_bit_flag") results["insight_features"] = getattr(db_plan, "insight_features") results["marketplace_features"] = getattr(db_plan, "marketplace_features") results["upde_allow_user_view"] = getattr(db_plan, "upde_allow_user_view") results["upde_allow_user_edit"] = getattr(db_plan, "upde_allow_user_edit") results["upde_allow_cust_view"] = getattr(db_plan, "upde_allow_cust_view") feature_enum = getattr(db_plan, "user_property_feature") results["user_property_feature"] = feature_enum.value return results