Source code for q2_marketplace.subscription

import asyncio
import json
import logging
import os
from enum import Enum
from json.decoder import JSONDecodeError
from typing import Optional

from q2_sdk.core import q2_requests
from q2_sdk.hq.db.customer_data import CustomerData as CustomerDataDbObj
from q2_sdk.hq.db.form import Form as FormDbObj
from q2_sdk.hq.db.user_data import UserData as UserDataDbObj
from q2_sdk.hq.models.hq_credentials import HqCredentials
from q2_marketplace import configuration

MARKETPLACE_API_URL = os.environ.get(
    "Q2SDK_MARKETPLACE_API", "http://localhost:1980"
).rstrip("/")
SUBSCRIPTION_STATUS_URL = f"{MARKETPLACE_API_URL}/api/subscription/status"
SUBSCRIPTION_CANCEL_URL = f"{MARKETPLACE_API_URL}/api/subscription"
PRODUCT_BASE_URL = f"{MARKETPLACE_API_URL}/api/product"
PRODUCTS_BY_EXTENSION = configuration.products_by_extension
CANCELATION_SUCCESS = "Subscription canceled successfully"
CANCELATION_NOT_FOUND = "The subscription was not canceled since it does not exist or has already been cancled"
CANCELATION_ERROR_MESSAGE = (
    "There was an error when attempting to cancel the subscription."
)
CUSTOMER_KEY_ERROR_MESSAGE = "Failed to find value for customer key. Verify that Marketplace Store is installed and configured correctly."


[docs] class EntitlementType(Enum): user = "user" customer = "customer"
[docs] class CancelationException(Exception): def __init__(self): self.message = CANCELATION_ERROR_MESSAGE
async def _get_customer_key( logger: logging.Logger, hq_credentials: Optional[HqCredentials] ) -> Optional[str]: FormDecodeError = (KeyError, TypeError, JSONDecodeError) # Check hq_credentials if hasattr(hq_credentials, "customer_key") and hq_credentials.customer_key: return hq_credentials.customer_key # Then, check environment customer_key = os.getenv("VAULT_KEY", None) if customer_key: return customer_key form_db_obj = FormDbObj(logger, hq_credentials, ret_table_obj=True) # If that doesn't work, check the Marketplace extension (both versions) store_form, legacy_store_form = await asyncio.gather( form_db_obj.get_by_name("MarketplaceStore"), form_db_obj.get_by_name("AppDirectSSO"), return_exceptions=True, ) if not isinstance(store_form, BaseException): try: customer_key = json.loads(str(store_form.Config))["_overrides"][ "customer_key" ] return customer_key except FormDecodeError: pass if not isinstance(legacy_store_form, BaseException): try: customer_key = json.loads(str(legacy_store_form.Config))["CUSTOMER_KEY"] return customer_key except FormDecodeError: pass # We've exhausted our options. Give up return None
[docs] async def is_valid( logger: logging.Logger, extension_name: str, user_id: str, hq_credentials: Optional[HqCredentials] = None, ) -> bool: subscription_valid = False if extension_name not in PRODUCTS_BY_EXTENSION: return True this_product_id = PRODUCTS_BY_EXTENSION[extension_name]["product"]["id"] customer_key = await _get_customer_key(logger, hq_credentials) if not customer_key: logger.error(CUSTOMER_KEY_ERROR_MESSAGE) return False try: http_response = await q2_requests.get( logger, SUBSCRIPTION_STATUS_URL, { "customer_key": customer_key, "user_id": user_id, "product_identifier": this_product_id, }, ) # If we got an 200 then check the payload; otherwise we probably got back a 404 which mean service is up but this # user doesn't have a subscription id saved in user data so consider this not a valid subscription. if http_response.ok: subscription = http_response.json() status = subscription.get("status", "INACTIVE") if status in ["ACTIVE", "FREE_TRIAL"]: subscription_valid = True except Exception: # Should be falling into here when service is not up. We'll consider this an active subscription as we're not # wanting to block if we're not up. subscription_valid = True return subscription_valid
async def _lookup_product_entitlement(logger: logging.Logger, extension_name: str): http_response = await q2_requests.get(logger, f"{PRODUCT_BASE_URL}") if http_response.ok: filtered_products = [ x for x in http_response.json()["data"] if x["extension_name"] == extension_name ] if len(filtered_products) != 0: return filtered_products.pop()["entitlement_type"] logger.error( f'failed to lookup product through "{extension_name}" due to an error.' ) raise CancelationException async def _lookup_subscription_ids( logger: logging.Logger, hq_credentials: HqCredentials, customer_or_user_id: str, extension_name: str, this_product_id: str, ): product_entitlement = await _lookup_product_entitlement(logger, extension_name) customer_rows = [] if EntitlementType(product_entitlement) == EntitlementType.customer: logger.debug("fetching subscription from the customer data table") customer_data_db_obj = CustomerDataDbObj(logger, hq_credentials) customer_rows = await customer_data_db_obj.get( customer_id=customer_or_user_id, short_name=this_product_id ) else: logger.debug("fetching subscription from the user data table") user_data_db_obj = UserDataDbObj(logger, hq_credentials) customer_rows = await user_data_db_obj.get( user_id=customer_or_user_id, short_name=this_product_id ) if len(customer_rows) == 0: logger.error( f'No subscription ids found for {product_entitlement} id: "{customer_or_user_id}" under product id: {this_product_id}' ) return [row.GTDataValue.text for row in customer_rows]
[docs] async def cancel( logger: logging.Logger, extension_name: str, customer_key: str, customer_or_user_id: int, hq_credentials: Optional[HqCredentials] = None, ) -> None: """ This method will cancel a q2 marketplace subscription by canceling it with app direct. :param extension_name: The name of an extension :param customer_key: the q2 marketplace customer key from which to remove the extension :param customer_or_user_id: the id (integer) corresponding to the q2 customer or user that owns the subscription, \ depending on the extension product entitlement belonging to customer or users at the FI """ if extension_name not in PRODUCTS_BY_EXTENSION: logger.error(f"{extension_name} was not found in PRODUCTS_BY_EXTENSION") raise CancelationException this_product_id = PRODUCTS_BY_EXTENSION[extension_name]["product"]["id"] subscription_ids = await _lookup_subscription_ids( logger, hq_credentials, customer_or_user_id, extension_name, this_product_id ) for subscription_id in subscription_ids: http_response = await q2_requests.delete( logger, SUBSCRIPTION_CANCEL_URL, params={"customer_key": customer_key, "subscription_id": subscription_id}, ) logger.info(f"{customer_key} {subscription_id}") if http_response.ok: logger.info(CANCELATION_SUCCESS) elif http_response.status_code in (404, 410): logger.warn(CANCELATION_NOT_FOUND) else: raise CancelationException return