"""
Inspect Extension returns information about the current configuration
"""
import json
from dataclasses import fields
from importlib import import_module
import q2_sdk
from q2_sdk.core.configuration import settings
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core.http_handlers.handler_types import handler_to_extension_type
from q2_sdk.core.http_handlers.q2_config_handler import Q2ConfigRequestHandler
from q2_sdk.core.install_steps.base import InstallStep
from q2_sdk.core.install_steps.db_plan import DbPlan
from q2_sdk.hq.models.db_config.db_config_list import DbConfigList
from q2_sdk.hq.models.instant_payments import InstantPaymentsParams
from q2_sdk.models.recursive_encoder import RecursiveEncoder
from q2_sdk.q2config import utils as q2config_utils
from q2_sdk.tools import utils
def _pattern_matches_extension(extension, pattern):
endpoints = utils.get_http_extension_urls(extension)
for endpoint in endpoints:
if endpoint.regex.match(f"/{pattern}"):
return True
return False
def _normalize_extension_name(extension_name):
installed_extensions = utils.get_installed_extensions_as_modules()
if extension_name in installed_extensions:
return extension_name
for configured_extension in installed_extensions:
match = _pattern_matches_extension(configured_extension, extension_name)
if match:
extension_name = configured_extension
break
return extension_name
[docs]
class InspectHandler(Q2BaseRequestHandler):
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.enable_summary_log = False
[docs]
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header(
"Access-Control-Allow-Methods", "POST, GET, PUT, OPTIONS, DELETE"
)
def options(self, *args, **kwargs):
self.set_status(204)
self.finish()
def get(self, extension_name=None):
if not extension_name:
self.return_server_info()
else:
self.return_extension_info(extension_name)
[docs]
def return_server_info(self):
"""Returns configured information for the running server"""
sdk_version = q2_sdk.__version__
repository_version = settings.REPOSITORY_VERSION
docker_image_tag = settings.IMAGE_TAG
endpoints = []
frontend_dict = {}
installed_extensions = utils.get_installed_extensions()
extension_list = installed_extensions + settings.DEFAULT_EXTENSIONS
for extension in extension_list:
endpoints.extend(utils.get_http_extension_urls(extension))
assets_endpoint = utils.get_assets_endpoint(extension)
if assets_endpoint:
endpoints.append(assets_endpoint)
feature_manifest = utils.get_feature_manifest(extension)
if feature_manifest:
frontend_dict.update({extension: feature_manifest})
response_format = "text/html"
endpoint_set = sorted(list(set([url.regex.pattern for url in endpoints])))
if any([
self.get_query_argument("type", "") == "json",
self.get_query_argument("format", "") == "json",
self.request.headers.get("Accept", "").startswith("application/json"),
]):
response_format = "application/json"
if response_format == "application/json":
self.set_header("Content-Type", "application/json")
output = {
"antilles_version": sdk_version,
"repository_version": repository_version,
"docker_image_tag": docker_image_tag,
"is_customer_created": settings.IS_CUSTOMER_CREATED,
"company": settings.COMPANY,
"fork_requests": settings.FORK_REQUESTS,
"hq": settings.HQ_CREDENTIALS.hq_url,
"hq_list": settings.HQ_CREDENTIALS.hq_url_list,
"aba": settings.HQ_CREDENTIALS.aba,
"core": settings.CORE,
"cache": settings.CACHE,
"installed_extensions": installed_extensions,
"installed_entrypoints": utils.get_installed_entrypoint_names(),
"endpoints": endpoint_set,
"frontend": frontend_dict,
"multitenant": settings.MULTITENANT,
}
else:
output = [
"<b>Q2 Antilles Version:</b> {0}".format(sdk_version),
"<b>Repository Version:</b> {0}".format(repository_version),
"<b>Docker Image Tag</b> {0}".format(docker_image_tag),
"<b>Is Customer Created:</b> {0}".format(settings.IS_CUSTOMER_CREATED),
"<b>Company:</b> {0}".format(settings.COMPANY),
"<b>Request Forking: </b> {0}".format(settings.FORK_REQUESTS),
"<b>Debug:</b> {0}".format(settings.DEBUG),
"<b>Configured HQ(s):</b> {}".format(
settings.HQ_CREDENTIALS.hq_url_list
),
"<b>Active HQ:</b> {}".format(settings.HQ_CREDENTIALS.hq_url),
"<b>Configured ABA:</b> {}".format(settings.HQ_CREDENTIALS.aba),
"<b>Core:</b> {}".format(settings.CORE),
"<b>Cache:</b> {}".format(settings.CACHE),
"<b>Multitenant:</b> {}".format(settings.MULTITENANT),
"<b>Registered Extensions:</b> {}".format(installed_extensions),
"<b>Registered Endpoints:</b> {}".format(endpoint_set),
"<b>Registered Entrypoints:</b> {}".format(
utils.get_installed_entrypoint_names()
),
]
output = "<br>".join(output)
self.write(output)
def return_extension_info(self, extension_name):
show_details = self.get_query_argument("detail", "") == "1"
extension_name = _normalize_extension_name(extension_name)
installed_extensions = utils.get_installed_extensions_as_modules()
if extension_name not in installed_extensions:
self.set_status(404, "This is not an installed extension")
self.write(
"<b>ERROR: </b> '{}' is not an installed extension".format(
extension_name
)
)
self.write("<br>")
self.write("<b>Installed Extensions:</b> {}".format(installed_extensions))
return
urls = utils.get_http_extension_urls(extension_name)
handler_class = urls[0].handler_class
results = dict()
feature_manifest = utils.get_feature_manifest(extension_name)
if feature_manifest:
if getattr(handler_class, "IS_MFA", False):
feature_manifest["isMFA"] = True
if issubclass(handler_class, Q2ConfigRequestHandler):
extension_config = q2config_utils.get_extension_config(extension_name)
results.update({extension_name: extension_config})
else:
results.update({"frontend": feature_manifest})
db_plan = get_db_plan_from_handler(handler_class)
if db_plan == {}:
db_plan_module = import_module("q2_sdk.core.install_steps.db_plan")
db_plan = db_plan_module.DbPlan()
if db_plan:
results.update(parse_db_plan(db_plan))
db_configs = getattr(handler_class, "WEDGE_ADDRESS_CONFIGS", {})
if isinstance(db_configs, dict):
opt_wa_configs = getattr(
handler_class, "OPTIONAL_WEDGE_ADDRESS_CONFIGS", {}
)
db_configs = DbConfigList.from_dict(db_configs, opt_wa_configs)
if show_details:
results["db_configurations"] = db_configs.required
results["optional_db_configurations"] = db_configs.optional
else:
results["db_configurations"] = {
x.name: x.default for x in db_configs.required
}
results["optional_db_configurations"] = {
x.name: x.default for x in db_configs.optional
}
results["core_configuration_options"] = {"None": {}}
if getattr(handler_class, "DYNAMIC_CORE_SELECTION", False):
results["core_configuration_options"] = utils.get_core_config_options()
if not results["core_configuration_options"]:
results["core_configuration_options"] = None
results["is_customer_created"] = settings.IS_CUSTOMER_CREATED
results["is_unauthenticated"] = getattr(
handler_class, "IS_UNAUTHENTICATED", False
)
results["company"] = settings.COMPANY
extension_type = handler_to_extension_type(handler_class)
results["extension_type"] = extension_type
results["is_unauthenticated"] = getattr(
handler_class, "IS_UNAUTHENTICATED", False
)
results["base_assets_url"] = utils.get_base_assets_url(extension_name)
if extension_type == "instant_payments":
results["instant_payments_params_data"] = get_instant_payments_params_data(
handler_class
)
if extension_type == "sso":
results["sso_config"] = get_sso_config(
handler_class, show_details=show_details
)
if extension_type in ["ardent", "caliper_api_custom"]:
results["description"] = getattr(handler_class, "DESCRIPTION")
if extension_type == "audit_action":
blocked_actions = settings.BLOCKED_AUDIT_ACTIONS
selected_actions = getattr(handler_class, "AUDIT_ACTIONS")
allowed_actions = [x for x in selected_actions if x not in blocked_actions]
results["audit_actions"] = allowed_actions
results["friendly_name"] = getattr(
handler_class, "FRIENDLY_NAME", extension_name
)
results["property_long_name"] = getattr(
handler_class, "PROPERTY_LONG_NAME", None
)
results["is_mfa"] = getattr(handler_class, "IS_MFA", False)
if getattr(handler_class, "IS_MFA", False):
results["mfa_requires_registration"] = getattr(
handler_class, "MFA_REQUIRES_REGISTRATION", False
)
results["mfa_token_lifetime_in_minutes"] = getattr(
handler_class, "MFA_TOKEN_LIFETIME_IN_MINUTES", 5
)
if issubclass(handler_class, Q2ConfigRequestHandler):
results = {extension_name: results.get(extension_name)}
self.write("{}".format(json.dumps(results, cls=RecursiveEncoder)))
self.set_header("Content-Type", "application/json")
def get_sso_config(handler_class, show_details=False):
results = dict()
vendor_configs = getattr(handler_class, "VENDOR_CONFIGS", {})
if isinstance(vendor_configs, dict):
vendor_configs = DbConfigList.from_dict(vendor_configs, {})
if show_details:
results["vendor_configs"] = vendor_configs.db_configs
else:
results["vendor_configs"] = {
x.name: x.default for x in vendor_configs.db_configs
}
results["render_in_new_window"] = getattr(
handler_class, "RENDER_IN_NEW_WINDOW", True
)
results["purge_account_list_on_success"] = getattr(
handler_class, "PURGE_ACCOUNT_LIST_ON_SUCCESS", False
)
results["send_user_info"] = getattr(handler_class, "SEND_USER_INFO", True)
results["send_account_info"] = getattr(handler_class, "SEND_ACCOUNT_INFO", False)
results["send_dep_only_accounts"] = getattr(
handler_class, "SEND_DEP_ONLY_ACCOUNTS", False
)
results["send_view_only_accounts"] = getattr(
handler_class, "SEND_VIEW_ONLY_ACCOUNTS", False
)
results["send_wdl_only_accounts"] = getattr(
handler_class, "SEND_WDL_ONLY_ACCOUNTS", False
)
results["send_dep_view_accounts"] = getattr(
handler_class, "SEND_DEP_VIEW_ACCOUNTS", False
)
results["send_dep_wdl_accounts"] = getattr(
handler_class, "SEND_DEP_WDL_ACCOUNTS", False
)
results["send_view_wdl_accounts"] = getattr(
handler_class, "SEND_VIEW_WDL_ACCOUNTS", False
)
results["send_dep_view_wdl_accounts"] = getattr(
handler_class, "SEND_DEP_VIEW_WDL_ACCOUNTS", False
)
return results
def get_instant_payments_params_data(handler_class):
results = dict()
for field in fields(InstantPaymentsParams):
results[field.name] = getattr(handler_class, field.name.upper())
return results
def get_db_plan_from_handler(handler):
db_plan_defaults = getattr(handler, "DB_PLAN", {})
if not db_plan_defaults:
db_plan_defaults = {}
return db_plan_defaults
def parse_db_plan(db_plan: DbPlan):
results = {}
i_vars = vars(db_plan)
results["db_plan"] = {
k: i_vars[k]
for k in i_vars.keys()
if (
i_vars[k]
and isinstance(i_vars[k], list)
and all(isinstance(i, InstallStep) for i in i_vars[k])
)
}
results["ui_text_prefix"] = getattr(db_plan, "ui_text_prefix")
results["disallow_add_to_nav"] = getattr(db_plan, "disallow_add_to_nav")
results["send_account_list"] = getattr(db_plan, "send_account_list")
results["send_account_details"] = getattr(db_plan, "send_account_details")
results["payload_stored_proc"] = getattr(db_plan, "payload_stored_proc")
results["account_rights_bit_flag"] = getattr(db_plan, "account_rights_bit_flag")
results["insight_features"] = getattr(db_plan, "insight_features")
results["marketplace_features"] = getattr(db_plan, "marketplace_features")
results["upde_allow_user_view"] = getattr(db_plan, "upde_allow_user_view")
results["upde_allow_user_edit"] = getattr(db_plan, "upde_allow_user_edit")
results["upde_allow_cust_view"] = getattr(db_plan, "upde_allow_cust_view")
feature_enum = getattr(db_plan, "user_property_feature")
results["user_property_feature"] = feature_enum.value
return results