Message Bus

class q2_sdk.core.message_bus.Message(text)[source]

Object that may grow over time as we add more fields to the standard SDK style message that goes onto the Kafka bus

Parameters:

text (str) – Body of the message

class q2_sdk.core.message_bus.EncryptedQ2Msg(message, headers)[source]

EncryptedQ2Msg(message: bytes, headers: dict)

q2_sdk.core.message_bus.decrypt(message, headers)[source]

Data that comes back from Kafka will be encrypted. This converts back into a Message object

If the q2msg_type header starts with q2msg, we try to rehydrate from the appropriate protobuf module in the q2msg library. If it starts with sdk-generic, we just tread the message as a string. Other message types are not supported.

Parameters:
  • message (bytes) – Encrypted message

  • envstack – Name of the decryption key

  • crypto_ref – (Optional) Only necessary if the message was encrypted using an older key

Return type:

Message

q2_sdk.core.message_bus.encrypt(message, envstack='KRAYT-DEMO', auto_compress=True)[source]

Encrypt a message

Parameters:
  • message (Union[str, Message]) – Message body to send

  • envstack – Name of the encryption key

  • auto_compress – Automatically zlib compress the message if it’s > MIN_ENCRYPT_SIZE bytes

Return type:

bytes

q2_sdk.core.message_bus.get_q2msg_with_headers(message, message_type, envstack, extra_headers=None, auto_compress=True)[source]
Parameters:
  • extra_headers (Optional[dict]) – Additional http headers to include on the message

  • auto_compress – Automatically zlib compress the message if it’s > MIN_ENCRYPT_SIZE bytes

Return type:

EncryptedQ2Msg

async q2_sdk.core.message_bus.push(logger, message, message_type='', envstack='KRAYT-DEMO', topic='sdk', extra_headers=None, krayt_url=None, auto_compress=True)[source]

Put an encrypted message onto the bus

Parameters:
  • logger (TypeAliasType) – Reference to calling request’s logger (self.logger in your extension)

  • message (Union[str, Message]) – Message to send. Can either be the string body to use the sdk’s dedicated topic or an instance of a ProtobufMessage from the q2msg library if using across application stacks

  • message_type (Optional[str]) – Used to segment messages into groups so only approprate consumers will read them. Required if message is a string

  • envstack – Name of the encryption key

  • topic – Where to send the message

  • extra_headers (Optional[dict]) – Additional http headers to include on the message

  • krayt_url – Defaults to settings.KRAYT_URL

  • auto_compress – Automatically zlib compress the message if it’s > MIN_ENCRYPT_SIZE bytes

Return type:

Optional[Response]