Source code for q2_sdk.extensions.DbPlan.extension

"""
Endpoint to install an extension's db_plan
"""

import asyncio
import os
import re

from q2_sdk.core import vault
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.tools import utils
from q2_sdk.tools.utils import to_bool


[docs] class DbPlanHandler(Q2BaseRequestHandler):
[docs] def set_default_headers(self): self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")
async def options(self, *args, **kwargs): self.set_status(204) self.finish() async def post(self, extension_name=None): installed_extensions = utils.get_installed_extensions() if not extension_name: self.set_status(404, "No extension name given") self.write("<b>ERROR: </b> No extension name passed") self.write("<br>") self.write("<b>Installed Extensions:</b> {}".format(installed_extensions)) return if extension_name not in installed_extensions: self.set_status(404, "This is not an installed extension") self.write( "<b>ERROR: </b> '{}' is not an installed extension".format( extension_name ) ) self.write("<br>") self.write("<b>Installed Extensions:</b> {}".format(installed_extensions)) return run_all_install_steps = self.get_query_argument("run_all_install_steps", "") if not run_all_install_steps: self.write( "Only custom install steps will be ran. To run all steps add a query" " pram called 'run_all_install_steps' and set to 'true'" ) vault_key = self.request.headers["vault_key"] if not re.match( r"[A-Z0-9]{8}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{12}", vault_key ): self.set_status(404, "Vault Key format not valid") self.write("<b>ERROR: Vault Key passed is in an invalid format<br>") return vault_client = vault.get_client(allow_local=False) if vault_client is None: self.set_status(404, "Could not connect to Vault") self.write("<b>ERROR: Vault Client could not connect to Vault</b>") return uninstall = to_bool(self.request.headers.get("uninstall", False)) file_name = f"dbplan_overrides_{extension_name}_{vault_key}.txt" db_plan_with_overrides = None if not uninstall: db_plan_with_overrides = self.get_body_argument("db_plan", default=None) if not db_plan_with_overrides: db_plan_with_overrides = self.request.headers.get("db_plan") if db_plan_with_overrides is None: self.set_status(404) self.write("No db_plan passed in request") return file = open(file_name, "w+") file.write(db_plan_with_overrides) file.close() cmd = f"VAULT_KEY={vault_key} q2 run_db_plan -e {extension_name}" if uninstall: cmd += " -r" if not run_all_install_steps: cmd += " -c" else: if not run_all_install_steps: cmd += " -c" if db_plan_with_overrides: cmd += f" -y -f {file_name} -m" else: cmd += " -y" proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() self.logger.info(f"[{cmd!r} exited with {proc.returncode}]") match proc.returncode: # noqa case 137 | -9: self.set_status(503) err_desc = ( "There was not enough memory in the container to run the DB Plan." " Please increase the memory resources via the CDM advanced tab" " when starting a service or in the yaml/HCL\n\n" ) self.logger.error(err_desc) self.logger.error(f"[stderr]\n{stderr.decode()}") self.write(err_desc) self.write(f"[stdout]\n{stdout.decode()}") self.write(f"[stderr]\n{stderr.decode()}") case 0: self.logger.info("DB Plan run successfully") self.write("DB Plan run successfully") case _: self.set_status(502) err_desc = f"There was an error installing/uninstalling the DB Plan with error code: {proc.returncode}\n\n" self.logger.error(err_desc) self.logger.error(f"[stderr]\n{stderr.decode()}") self.write(err_desc) self.write(f"[stdout]\n{stdout.decode()}") self.write(f"[stderr]\n{stderr.decode()}") self.logger.debug(f"[stdout]\n{stdout.decode()}") if not uninstall: os.remove(file_name)