"""
Endpoint to install an extension's db_plan
"""
import asyncio
import os
import re
from q2_sdk.core import vault
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.tools import utils
from q2_sdk.tools.utils import to_bool
[docs]
class DbPlanHandler(Q2BaseRequestHandler):
[docs]
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")
async def options(self, *args, **kwargs):
self.set_status(204)
self.finish()
async def post(self, extension_name=None):
installed_extensions = utils.get_installed_extensions()
if not extension_name:
self.set_status(404, "No extension name given")
self.write("<b>ERROR: </b> No extension name passed")
self.write("<br>")
self.write("<b>Installed Extensions:</b> {}".format(installed_extensions))
return
if extension_name not in installed_extensions:
self.set_status(404, "This is not an installed extension")
self.write(
"<b>ERROR: </b> '{}' is not an installed extension".format(
extension_name
)
)
self.write("<br>")
self.write("<b>Installed Extensions:</b> {}".format(installed_extensions))
return
run_all_install_steps = self.get_query_argument("run_all_install_steps", "")
if not run_all_install_steps:
self.write(
"Only custom install steps will be ran. To run all steps add a query"
" pram called 'run_all_install_steps' and set to 'true'"
)
vault_key = self.request.headers["vault_key"]
if not re.match(
r"[A-Z0-9]{8}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{12}", vault_key
):
self.set_status(404, "Vault Key format not valid")
self.write("<b>ERROR: Vault Key passed is in an invalid format<br>")
return
vault_client = vault.get_client(allow_local=False)
if vault_client is None:
self.set_status(404, "Could not connect to Vault")
self.write("<b>ERROR: Vault Client could not connect to Vault</b>")
return
uninstall = to_bool(self.request.headers.get("uninstall", False))
file_name = f"dbplan_overrides_{extension_name}_{vault_key}.txt"
db_plan_with_overrides = None
if not uninstall:
db_plan_with_overrides = self.get_body_argument("db_plan", default=None)
if not db_plan_with_overrides:
db_plan_with_overrides = self.request.headers.get("db_plan")
if db_plan_with_overrides is None:
self.set_status(404)
self.write("No db_plan passed in request")
return
file = open(file_name, "w+")
file.write(db_plan_with_overrides)
file.close()
cmd = f"VAULT_KEY={vault_key} q2 run_db_plan -e {extension_name}"
if uninstall:
cmd += " -r"
if not run_all_install_steps:
cmd += " -c"
else:
if not run_all_install_steps:
cmd += " -c"
if db_plan_with_overrides:
cmd += f" -y -f {file_name} -m"
else:
cmd += " -y"
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
self.logger.info(f"[{cmd!r} exited with {proc.returncode}]")
match proc.returncode: # noqa
case 137 | -9:
self.set_status(503)
err_desc = (
"There was not enough memory in the container to run the DB Plan."
" Please increase the memory resources via the CDM advanced tab"
" when starting a service or in the yaml/HCL\n\n"
)
self.logger.error(err_desc)
self.logger.error(f"[stderr]\n{stderr.decode()}")
self.write(err_desc)
self.write(f"[stdout]\n{stdout.decode()}")
self.write(f"[stderr]\n{stderr.decode()}")
case 0:
self.logger.info("DB Plan run successfully")
self.write("DB Plan run successfully")
case _:
self.set_status(502)
err_desc = f"There was an error installing/uninstalling the DB Plan with error code: {proc.returncode}\n\n"
self.logger.error(err_desc)
self.logger.error(f"[stderr]\n{stderr.decode()}")
self.write(err_desc)
self.write(f"[stdout]\n{stdout.decode()}")
self.write(f"[stderr]\n{stderr.decode()}")
self.logger.debug(f"[stdout]\n{stdout.decode()}")
if not uninstall:
os.remove(file_name)