Message Bus
- class q2_sdk.core.message_bus.Message(text)[source]
Object that may grow over time as we add more fields to the standard SDK style message that goes onto the Kafka bus
- Parameters:
text (
str) – Body of the message
- class q2_sdk.core.message_bus.EncryptedQ2Msg(message, headers)[source]
EncryptedQ2Msg(message: bytes, headers: dict)
- q2_sdk.core.message_bus.decrypt(message, headers, logger=None)[source]
Data that comes back from Kafka will be encrypted. This converts back into a Message object
If the q2msg_type header starts with q2msg, we try to rehydrate from the appropriate protobuf module in the q2msg library. If it starts with sdk-generic, we just tread the message as a string. Other message types are not supported.
- Parameters:
message (
bytes) – Encrypted messageenvstack – Name of the decryption key
crypto_ref – (Optional) Only necessary if the message was encrypted using an older key
- Return type:
Currently, HQ sends the audit record with q2msg-type of “q2msg.shared.AuditRecord” The existing logic for importing the profobuf schema with a message of this type is below
module_name, protobuf = msg_type.rsplit(“.”, maxsplit=1) # All the q2msg things seem to end in pb2, but the other parts of the system don’t pass it along module_name.split(“_pb2”)[0] try:
module = import_module(module_name + “_pb2”)
- except ImportError:
module = import_module(module_name)
protobuf_instance = getattr(module, protobuf)()
As you can see the split happens and we end up with module_name = q2msg.shared protobuf = AuditRecord
we are unable to import q2msg.shared_pb2 because that doesn’t exist, so then we import q2msg.shared and that works. However, AuditRecord does not exist at that level, so we get an error when calling protobuf_instance = getattr(module, protobuf)().
AttributeError: module ‘q2msg.shared’ has no attribute ‘AuditRecord’
- q2_sdk.core.message_bus.encrypt(message, envstack='KRAYT-DEMO', auto_compress=True)[source]
Encrypt a message
- q2_sdk.core.message_bus.get_q2msg_with_headers(message, message_type, envstack, extra_headers=None, auto_compress=True)[source]
- Parameters:
- Return type:
- async q2_sdk.core.message_bus.push(logger, message, message_type='', envstack='KRAYT-DEMO', topic='sdk', extra_headers=None, krayt_url=None, auto_compress=True, **kwargs)[source]
Put an encrypted message onto the bus
- Parameters:
logger (
TypeAliasType) – Reference to calling request’s logger (self.logger in your extension)message (
Union[str,Message,Message]) – Message to send. Can either be the string body to use the sdk’s dedicated topic or an instance of a ProtobufMessage from the q2msg library if using across application stacksmessage_type (
Optional[str]) – Used to segment messages into groups so only approprate consumers will read them. Required if message is a stringenvstack – Name of the encryption key
topic – Where to send the message
extra_headers (
Optional[dict]) – Additional http headers to include on the messagekrayt_url – Defaults to settings.KRAYT_URL
auto_compress – Automatically zlib compress the message if it’s > MIN_ENCRYPT_SIZE bytes
- Return type:
Optional[Response]