import asyncio
import json
import logging
import os
from enum import Enum
from json.decoder import JSONDecodeError
from typing import Optional
from q2_sdk.core import q2_requests
from q2_sdk.hq.db.customer_data import CustomerData as CustomerDataDbObj
from q2_sdk.hq.db.form import Form as FormDbObj
from q2_sdk.hq.db.user_data import UserData as UserDataDbObj
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_marketplace import configuration
MARKETPLACE_API_URL = os.environ.get(
"Q2SDK_MARKETPLACE_API", "http://localhost:1980"
).rstrip("/")
SUBSCRIPTION_STATUS_URL = f"{MARKETPLACE_API_URL}/api/subscription/status"
SUBSCRIPTION_CANCEL_URL = f"{MARKETPLACE_API_URL}/api/subscription"
PRODUCT_BASE_URL = f"{MARKETPLACE_API_URL}/api/product"
PRODUCTS_BY_EXTENSION = configuration.products_by_extension
CANCELATION_SUCCESS = "Subscription canceled successfully"
CANCELATION_NOT_FOUND = "The subscription was not canceled since it does not exist or has already been cancled"
CANCELATION_ERROR_MESSAGE = (
"There was an error when attempting to cancel the subscription."
)
CUSTOMER_KEY_ERROR_MESSAGE = "Failed to find value for customer key. Verify that Marketplace Store is installed and configured correctly."
[docs]
class EntitlementType(Enum):
user = "user"
customer = "customer"
[docs]
class CancelationException(Exception):
def __init__(self):
self.message = CANCELATION_ERROR_MESSAGE
async def _get_customer_key(
logger: logging.Logger, hq_credentials: Optional[HqCredentials]
) -> Optional[str]:
FormDecodeError = (KeyError, TypeError, JSONDecodeError)
# Check hq_credentials
if hasattr(hq_credentials, "customer_key") and hq_credentials.customer_key:
return hq_credentials.customer_key
# Then, check environment
customer_key = os.getenv("VAULT_KEY", None)
if customer_key:
return customer_key
form_db_obj = FormDbObj(logger, hq_credentials, ret_table_obj=True)
# If that doesn't work, check the Marketplace extension (both versions)
store_form, legacy_store_form = await asyncio.gather(
form_db_obj.get_by_name("MarketplaceStore"),
form_db_obj.get_by_name("AppDirectSSO"),
return_exceptions=True,
)
if not isinstance(store_form, BaseException):
try:
customer_key = json.loads(str(store_form.Config))["_overrides"][
"customer_key"
]
return customer_key
except FormDecodeError:
pass
if not isinstance(legacy_store_form, BaseException):
try:
customer_key = json.loads(str(legacy_store_form.Config))["CUSTOMER_KEY"]
return customer_key
except FormDecodeError:
pass
# We've exhausted our options. Give up
return None
[docs]
async def is_valid(
logger: logging.Logger,
extension_name: str,
user_id: str,
hq_credentials: Optional[HqCredentials] = None,
) -> bool:
subscription_valid = False
if extension_name not in PRODUCTS_BY_EXTENSION:
return True
this_product_id = PRODUCTS_BY_EXTENSION[extension_name]["product"]["id"]
customer_key = await _get_customer_key(logger, hq_credentials)
if not customer_key:
logger.error(CUSTOMER_KEY_ERROR_MESSAGE)
return False
try:
http_response = await q2_requests.get(
logger,
SUBSCRIPTION_STATUS_URL,
{
"customer_key": customer_key,
"user_id": user_id,
"product_identifier": this_product_id,
},
)
# If we got an 200 then check the payload; otherwise we probably got back a 404 which mean service is up but this
# user doesn't have a subscription id saved in user data so consider this not a valid subscription.
if http_response.ok:
subscription = http_response.json()
status = subscription.get("status", "INACTIVE")
if status in ["ACTIVE", "FREE_TRIAL"]:
subscription_valid = True
except Exception:
# Should be falling into here when service is not up. We'll consider this an active subscription as we're not
# wanting to block if we're not up.
subscription_valid = True
return subscription_valid
async def _lookup_product_entitlement(logger: logging.Logger, extension_name: str):
http_response = await q2_requests.get(logger, f"{PRODUCT_BASE_URL}")
if http_response.ok:
filtered_products = [
x
for x in http_response.json()["data"]
if x["extension_name"] == extension_name
]
if len(filtered_products) != 0:
return filtered_products.pop()["entitlement_type"]
logger.error(
f'failed to lookup product through "{extension_name}" due to an error.'
)
raise CancelationException
async def _lookup_subscription_ids(
logger: logging.Logger,
hq_credentials: HqCredentials,
customer_or_user_id: str,
extension_name: str,
this_product_id: str,
):
product_entitlement = await _lookup_product_entitlement(logger, extension_name)
customer_rows = []
if EntitlementType(product_entitlement) == EntitlementType.customer:
logger.debug("fetching subscription from the customer data table")
customer_data_db_obj = CustomerDataDbObj(logger, hq_credentials)
customer_rows = await customer_data_db_obj.get(
customer_id=customer_or_user_id, short_name=this_product_id
)
else:
logger.debug("fetching subscription from the user data table")
user_data_db_obj = UserDataDbObj(logger, hq_credentials)
customer_rows = await user_data_db_obj.get(
user_id=customer_or_user_id, short_name=this_product_id
)
if len(customer_rows) == 0:
logger.error(
f'No subscription ids found for {product_entitlement} id: "{customer_or_user_id}" under product id: {this_product_id}'
)
return [row.GTDataValue.text for row in customer_rows]
[docs]
async def cancel(
logger: logging.Logger,
extension_name: str,
customer_key: str,
customer_or_user_id: int,
hq_credentials: Optional[HqCredentials] = None,
) -> None:
"""
This method will cancel a q2 marketplace subscription by canceling it with app direct.
:param extension_name: The name of an extension
:param customer_key: the q2 marketplace customer key from which to remove the extension
:param customer_or_user_id: the id (integer) corresponding to the q2 customer or user that owns the subscription, \
depending on the extension product entitlement belonging to customer or users at the FI
"""
if extension_name not in PRODUCTS_BY_EXTENSION:
logger.error(f"{extension_name} was not found in PRODUCTS_BY_EXTENSION")
raise CancelationException
this_product_id = PRODUCTS_BY_EXTENSION[extension_name]["product"]["id"]
subscription_ids = await _lookup_subscription_ids(
logger, hq_credentials, customer_or_user_id, extension_name, this_product_id
)
for subscription_id in subscription_ids:
http_response = await q2_requests.delete(
logger,
SUBSCRIPTION_CANCEL_URL,
params={"customer_key": customer_key, "subscription_id": subscription_id},
)
logger.info(f"{customer_key} {subscription_id}")
if http_response.ok:
logger.info(CANCELATION_SUCCESS)
elif http_response.status_code in (404, 410):
logger.warn(CANCELATION_NOT_FOUND)
else:
raise CancelationException
return