import asyncio
from base64 import b64decode, b64encode
from dataclasses import dataclass
import json
from importlib import import_module
import os
from typing import Optional, Union
import zlib
from google.protobuf.message import Message as ProtobufMessage
from google.protobuf.json_format import MessageToDict
from q2msg import security, GIT_REV, VERSION
import requests
from tornado.httputil import HTTPHeaders
from q2_sdk.core import q2_requests
from q2_sdk.core.exceptions import CorruptMessageError, MessageBusConfigurationError
from q2_sdk.core.configuration import settings
from q2_sdk.core.q2_logging.logger import Q2LoggerType
from q2_sdk.tools import utils
from q2_sdk.tools.testing.models import Q2RequestResponseMock
if settings.LOCAL_DEV and settings.REPO_ROOT:
os.environ["Q2MSG_KEYS_PATH"] = settings.Q2MSG_KEYS_PATH or str(
utils.get_repo_antilles_dir() / "q2msg_keys.json"
)
CIPHER = security.Q2MsgCipher()
TOPIC_BY_ENV = settings.TOPIC_BY_ENV
ERROR_TOPIC_BY_ENV = settings.ERROR_TOPIC_BY_ENV
DEFAULT_TOPIC = TOPIC_BY_ENV[settings.DEPLOY_ENV]
DEFAULT_ERROR_TOPIC = ERROR_TOPIC_BY_ENV[settings.DEPLOY_ENV]
MIN_ENCRYPT_SIZE = 4096
[docs]
@dataclass
class Message:
"""
Object that may grow over time as we add more fields to the standard SDK style message
that goes onto the Kafka bus
:param text: Body of the message
"""
text: str
def __post_init__(self):
if self.text is None:
raise ValueError("text cannot be None")
[docs]
@dataclass
class EncryptedQ2Msg:
message: bytes
headers: dict
[docs]
def decrypt(message: bytes, headers: HTTPHeaders) -> Message:
"""
Data that comes back from Kafka will be encrypted.
This converts back into a Message object
If the q2msg_type header starts with q2msg, we try to rehydrate from
the appropriate protobuf module in the q2msg library. If it starts with
sdk-generic, we just tread the message as a string. Other message types
are not supported.
:param message: Encrypted message
:param envstack: Name of the decryption key
:param crypto_ref: (Optional) Only necessary if the message was encrypted
using an older key
"""
envstack = headers["q2msg-envstack"]
crypto_key = headers["q2msg-cryptokey"]
msg_type = headers["q2msg-type"]
hmac_header = headers["q2msg-hmac"].encode()
compressed = headers.get("q2msg-compressed", "false").lower() == "true"
CIPHER.init(envstack) # Backwards compatibility in q2msg
protobuf_instance = None
if not msg_type.startswith("sdk-generic"):
if "q2msg" not in msg_type:
raise MessageBusConfigurationError("Unknown Message type: %s" % msg_type)
module_name, protobuf = msg_type.rsplit(".", maxsplit=1)
# All the q2msg things seem to end in pb2, but the other parts of the system don't pass it along
module_name.split("_pb2")[0]
try:
module = import_module(module_name + "_pb2")
except ImportError:
module = import_module(module_name)
protobuf_instance = getattr(module, protobuf)()
if CIPHER.calc_hmac(message, envstack=envstack) != b64decode(hmac_header):
raise CorruptMessageError("Calculated HMAC does not match that provided")
decrypted_msg = CIPHER.decrypt(crypto_key, message, envstack=envstack)
if compressed:
decrypted_msg = zlib.decompress(decrypted_msg)
if protobuf_instance:
protobuf_instance.ParseFromString(decrypted_msg)
raw_msg = MessageToDict(protobuf_instance)
msg_obj = Message(raw_msg)
else:
raw_msg = json.loads(decrypted_msg)
msg_obj = Message(raw_msg["msg"])
return msg_obj
[docs]
def encrypt(
message: Union[str, ProtobufMessage],
envstack=settings.KRAYT_ENVSTACK,
auto_compress=True,
) -> bytes:
"""
Encrypt a message
:param message: Message body to send
:param envstack: Name of the encryption key
:param auto_compress: Automatically zlib compress the message if it's > MIN_ENCRYPT_SIZE bytes
"""
CIPHER.init(envstack) # Backwards compatibility in q2msg
if isinstance(message, ProtobufMessage):
msg_bytes = message.SerializeToString()
compress = message.ByteSize() > MIN_ENCRYPT_SIZE
else:
msg = {"msg": message}
msg_bytes = json.dumps(msg).encode()
compress = len(message) > MIN_ENCRYPT_SIZE
if compress and auto_compress:
msg_bytes = zlib.compress(msg_bytes)
encrypted_msg = CIPHER.encrypt(msg_bytes, envstack=envstack)
return encrypted_msg
[docs]
async def push(
logger: Q2LoggerType,
message: Union[str, ProtobufMessage],
message_type: Optional[str] = "",
envstack=settings.KRAYT_ENVSTACK,
topic=DEFAULT_TOPIC,
extra_headers: Optional[dict] = None,
krayt_url=settings.KRAYT_URL,
auto_compress=True,
) -> Optional[requests.Response]:
"""
Put an encrypted message onto the bus
:param logger: Reference to calling request's logger (self.logger in your extension)
:param message: Message to send. Can either be the string body to use the sdk's dedicated
topic or an instance of a ProtobufMessage from the q2msg library if using across application stacks
:param message_type: Used to segment messages into groups so only approprate consumers will read them.
Required if message is a string
:param envstack: Name of the encryption key
:param topic: Where to send the message
:param extra_headers: Additional http headers to include on the message
:param krayt_url: Defaults to settings.KRAYT_URL
:param auto_compress: Automatically zlib compress the message if it's > MIN_ENCRYPT_SIZE bytes
"""
if settings.MOCK_KRAYT_CALLS is False and not krayt_url:
raise MessageBusConfigurationError(
"No KRAYT_URL defined (either in settings or passed in as a parameter)"
)
if not message_type:
if isinstance(message, str):
raise MessageBusConfigurationError('"message_type" cannot be empty')
message_type = f"{type(message).__module__}.{type(message).__name__}"
encrypted_msg = get_q2msg_with_headers(
message,
message_type,
envstack,
extra_headers=extra_headers,
auto_compress=auto_compress,
)
url = f"{krayt_url}/message/produce/{topic}"
if settings.MOCK_KRAYT_CALLS:
url = f"http://localhost:{settings.ANTILLES_SERVER_PORT}/{message_type}/message/process/sdk"
try:
local_dev_timeout = 60 * 10 # Only affects dev in DEBUG mode
result = q2_requests.post(
logger,
url,
headers=encrypted_msg.headers,
data=encrypted_msg.message,
timeout=local_dev_timeout,
proxies={"no_proxy": "localhost"},
)
if settings.MOCK_KRAYT_CALLS:
asyncio.create_task(result)
result = Q2RequestResponseMock(
"POST",
url,
headers=encrypted_msg.headers,
passthrough_kwargs={"message": message},
mock_response={"msgId": "60bf377d-89cf-17f4-7bac-2b9fad0bc36a"},
)
else:
result = await result
except requests.exceptions.ConnectionError:
logger.warning(
f"No receiver is set up or running for url: {url}, q2msg-type: {encrypted_msg.headers['q2msg-type']}"
)
return None
return result