Source code for q2_sdk.core.message_bus

import asyncio
from base64 import b64decode, b64encode
from dataclasses import dataclass
import json
from importlib import import_module
import os
from typing import Optional, Union
import zlib

from google.protobuf.message import Message as ProtobufMessage
from google.protobuf.json_format import MessageToDict
from q2msg import security, GIT_REV, VERSION
import requests
from tornado.httputil import HTTPHeaders

from q2_sdk.core import q2_requests
from q2_sdk.core.exceptions import CorruptMessageError, MessageBusConfigurationError
from q2_sdk.core.configuration import settings
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.tools import utils
from q2_sdk.tools.testing.models import Q2RequestResponseMock

if settings.LOCAL_DEV and settings.REPO_ROOT:
    os.environ["Q2MSG_KEYS_PATH"] = settings.Q2MSG_KEYS_PATH or str(
        utils.get_repo_antilles_dir() / "q2msg_keys.json"
    )

CIPHER = security.Q2MsgCipher()

TOPIC_BY_ENV = settings.TOPIC_BY_ENV
ERROR_TOPIC_BY_ENV = settings.ERROR_TOPIC_BY_ENV
DEFAULT_TOPIC = TOPIC_BY_ENV[settings.DEPLOY_ENV]
DEFAULT_ERROR_TOPIC = ERROR_TOPIC_BY_ENV[settings.DEPLOY_ENV]
MIN_ENCRYPT_SIZE = 4096


[docs] @dataclass class Message: """ Object that may grow over time as we add more fields to the standard SDK style message that goes onto the Kafka bus :param text: Body of the message """ text: str def __post_init__(self): if self.text is None: raise ValueError("text cannot be None")
[docs] @dataclass class EncryptedQ2Msg: message: bytes headers: dict
[docs] def decrypt(message: bytes, headers: HTTPHeaders) -> Message: """ Data that comes back from Kafka will be encrypted. This converts back into a Message object If the q2msg_type header starts with q2msg, we try to rehydrate from the appropriate protobuf module in the q2msg library. If it starts with sdk-generic, we just tread the message as a string. Other message types are not supported. :param message: Encrypted message :param envstack: Name of the decryption key :param crypto_ref: (Optional) Only necessary if the message was encrypted using an older key """ envstack = headers["q2msg-envstack"] crypto_key = headers["q2msg-cryptokey"] msg_type = headers["q2msg-type"] hmac_header = headers["q2msg-hmac"].encode() compressed = headers.get("q2msg-compressed", "false").lower() == "true" CIPHER.init(envstack) # Backwards compatibility in q2msg protobuf_instance = None if not msg_type.startswith("sdk-generic"): if "q2msg" not in msg_type: raise MessageBusConfigurationError("Unknown Message type: %s" % msg_type) module_name, protobuf = msg_type.rsplit(".", maxsplit=1) # All the q2msg things seem to end in pb2, but the other parts of the system don't pass it along module_name.split("_pb2")[0] try: module = import_module(module_name + "_pb2") except ImportError: module = import_module(module_name) protobuf_instance = getattr(module, protobuf)() if CIPHER.calc_hmac(message, envstack=envstack) != b64decode(hmac_header): raise CorruptMessageError("Calculated HMAC does not match that provided") decrypted_msg = CIPHER.decrypt(crypto_key, message, envstack=envstack) if compressed: decrypted_msg = zlib.decompress(decrypted_msg) if protobuf_instance: protobuf_instance.ParseFromString(decrypted_msg) raw_msg = MessageToDict(protobuf_instance) msg_obj = Message(raw_msg) else: raw_msg = json.loads(decrypted_msg) msg_obj = Message(raw_msg["msg"]) return msg_obj
[docs] def encrypt( message: Union[str, ProtobufMessage], envstack=settings.KRAYT_ENVSTACK, auto_compress=True, ) -> bytes: """ Encrypt a message :param message: Message body to send :param envstack: Name of the encryption key :param auto_compress: Automatically zlib compress the message if it's > MIN_ENCRYPT_SIZE bytes """ CIPHER.init(envstack) # Backwards compatibility in q2msg if isinstance(message, ProtobufMessage): msg_bytes = message.SerializeToString() compress = message.ByteSize() > MIN_ENCRYPT_SIZE else: msg = {"msg": message} msg_bytes = json.dumps(msg).encode() compress = len(message) > MIN_ENCRYPT_SIZE if compress and auto_compress: msg_bytes = zlib.compress(msg_bytes) encrypted_msg = CIPHER.encrypt(msg_bytes, envstack=envstack) return encrypted_msg
[docs] def get_q2msg_with_headers( message: str | ProtobufMessage, message_type: str, envstack: str, extra_headers: Optional[dict] = None, auto_compress=True, ) -> EncryptedQ2Msg: """ :param extra_headers: Additional http headers to include on the message :param auto_compress: Automatically zlib compress the message if it's > MIN_ENCRYPT_SIZE bytes """ encrypted_msg = encrypt(message, envstack, auto_compress=auto_compress) if not extra_headers: extra_headers = {} hmac = CIPHER.calc_hmac(encrypted_msg, envstack=envstack) hmac = b64encode(hmac) q2_msg_type = message_type if isinstance(message, str): q2_msg_type = f"sdk-generic.{q2_msg_type}" compressed = len(message) > MIN_ENCRYPT_SIZE else: compressed = message.ByteSize() > MIN_ENCRYPT_SIZE q2msg_compressed = str(all((compressed, auto_compress))).lower() headers = { "q2msg-type": q2_msg_type, "q2msg-hmac": hmac, "q2msg-gitrev": GIT_REV, "q2msg-version": VERSION, "q2msg-envstack": envstack, "q2msg-compressed": q2msg_compressed, "q2msg-cryptokey": CIPHER.active_crypto_ref(envstack=envstack), **extra_headers, } return EncryptedQ2Msg(encrypted_msg, headers)
[docs] async def push( logger: Q2LoggerType, message: Union[str, ProtobufMessage], message_type: Optional[str] = "", envstack=settings.KRAYT_ENVSTACK, topic=DEFAULT_TOPIC, extra_headers: Optional[dict] = None, krayt_url=settings.KRAYT_URL, auto_compress=True, ) -> Optional[requests.Response]: """ Put an encrypted message onto the bus :param logger: Reference to calling request's logger (self.logger in your extension) :param message: Message to send. Can either be the string body to use the sdk's dedicated topic or an instance of a ProtobufMessage from the q2msg library if using across application stacks :param message_type: Used to segment messages into groups so only approprate consumers will read them. Required if message is a string :param envstack: Name of the encryption key :param topic: Where to send the message :param extra_headers: Additional http headers to include on the message :param krayt_url: Defaults to settings.KRAYT_URL :param auto_compress: Automatically zlib compress the message if it's > MIN_ENCRYPT_SIZE bytes """ if settings.MOCK_KRAYT_CALLS is False and not krayt_url: raise MessageBusConfigurationError( "No KRAYT_URL defined (either in settings or passed in as a parameter)" ) if not message_type: if isinstance(message, str): raise MessageBusConfigurationError('"message_type" cannot be empty') message_type = f"{type(message).__module__}.{type(message).__name__}" encrypted_msg = get_q2msg_with_headers( message, message_type, envstack, extra_headers=extra_headers, auto_compress=auto_compress, ) url = f"{krayt_url}/message/produce/{topic}" if settings.MOCK_KRAYT_CALLS: url = f"http://localhost:{settings.ANTILLES_SERVER_PORT}/{message_type}/message/process/sdk" try: local_dev_timeout = 60 * 10 # Only affects dev in DEBUG mode result = q2_requests.post( logger, url, headers=encrypted_msg.headers, data=encrypted_msg.message, timeout=local_dev_timeout, proxies={"no_proxy": "localhost"}, ) if settings.MOCK_KRAYT_CALLS: asyncio.create_task(result) result = Q2RequestResponseMock( "POST", url, headers=encrypted_msg.headers, passthrough_kwargs={"message": message}, mock_response={"msgId": "60bf377d-89cf-17f4-7bac-2b9fad0bc36a"}, ) else: result = await result except requests.exceptions.ConnectionError: logger.warning( f"No receiver is set up or running for url: {url}, q2msg-type: {encrypted_msg.headers['q2msg-type']}" ) return None return result