Message Bus Extension Tutorial

Message Busses are a set of queues which one or more consumers can pull messages from and act on. They are useful for asynchronous processing of data that is generated at an earlier time. Decoupling the creation of “there is work to do” from the processing of that work is a powerful ability that leads to flexible workflows.

In this tutorial, we will create a Message Bus extension that reads messages from a bus and the echos them back in all caps.

First, we’ll create a new extension named AllCaps with the q2 create_extension AllCaps command and choosing the “Message Bus Extension” type. when prompted for the type of messages the extension will consume, answer “Lowercase”:

$ q2 create_extension AllCaps

What type of extension are you creating?

    1) Online (default)
    2) SSO (Third Party Integration)
    3) Ardent (API)
    4) Q2Console (Backoffice)
    5) Central (Legacy Backoffice)
    6) Adapter
    7) Audit Action
    8) Custom Health Check
    9) Message Bus <---------
    10) Caliper API Custom Endpoint
    11) Base Extension

Please make a selection and press Return [1]: 9

What type of messages should this handler consume?
Only this type of message will be consumed by this bus: lowercase

This will generate similar files to any other extension type, most importantly an extension.py:

"""
AllCaps Extension
"""

from q2_sdk.core.http_handlers.message_bus_handler import Q2MessageBusRequestHandler
from q2_sdk.core.message_bus import Message


class AllCapsHandler(Q2MessageBusRequestHandler):
    MESSAGE_TYPE = 'Lowercase'

    async def handle(self, message: Message):
        """
        This is the hook where you can put your handling logic.

        message.text contains the message body posted from the bus.
        """

        pass

and a urls.py:

from tornado.web import url
from .extension import AllCapsHandler

# url('/regex_pattern', HandlerClass, kwargs=None, name=None)
# http://www.tornadoweb.org/en/stable/web.html?#tornado.web.URLSpec

URL_PATTERNS = [
    url(r'/AllCaps/?',
        AllCapsHandler),
    url(r'/Lowercase/message/process/sdk',
        AllCapsHandler),
]

The urls.py exposes two endpoints. /AllCaps, which will allow you to call your extension like a base extension, invoking get() on a GET call, post() on a POST call, etc. The second endpoint is the one the bus will post to when new messages arrive. In this case, that’s /Lowercase/message/process/sdk. In other words, when a message arrives on the Kafka bus, a short time after that endpoint will receive a POST request with the encrypted message body and a number of headers which aid in decrypting it.

When a message is posted to this extension, the handle() method will be invoked, with the message, as an object, passed as the first parameter message. Let’s log it out as a capitalized variant. Replace your handle method with the following:

    async def handle(self, message: Message):
        """
        This is the hook where you can put your handling logic.

        message.text contains the message body posted from the bus.
        """

        self.logger.info(message.text.upper())

That’s all there is to it! Now of course, it would be nice to test it by having a message posted to this extension from the bus. However, you likely don’t have a kafka bus running locally to test with. Luckily, when running in DEBUG mode (as your server is when you’re working on it), the message bus itself isn’t needed. Any message that you post to “the bus” will be immediately sent to your handler, bypassing the dev time need for an active bus.

To send messages, let’s set up an base extension that does just that:

$ q2 create_extension SendToBus -t base

Add an import line at the top:

from q2_sdk.core import message_bus

And add a get() method:

    async def get(self):
        await message_bus.push(self.logger, 'hello world', 'Lowercase')

A few things to note here:

  • The extension pushing a message onto the bus does not have to be the same as the one processing it

  • The third parameter of the push() method is the message type. This needs to match the type chosen in the earlier step of setting up the message_bus handler

With that, we can start our server with q2 run -l DEBUG and call <server_base_url>/SendToBus in a browser.

Note

By adding the DEBUG level logging flag, we can see what’s actually being sent and received at the network level

You should see some lines that look a bit like this:

... extension.SendToBus INFO     Sending HTTP request POST http://localhost:1980/Lowercase/message/process/sdk
... extension.SendToBus DEBUG    Request Headers: {'q2msg-type': 'sdk-generic.Lowercase', 'q2msg-hmac': b'sDRL1O3eQ5CUFAC84CuYw8YAvok=', 'q2msg-gitrev': '5ef4873', 'q2msg-version': '1.3.0', 'q2msg-envstack': 'KRAYT-DEMO', 'q2msg-cryptokey': '0', 'Content-Length': '46'}
... extension.SendToBus DEBUG    Request Body: b'\xd7\x12\\=M\x0c\xd0\xa0"\x0b\xdf1\xdc\xa3\xad\x05\xafFp\xb3\x18\xf5\xc7|\xfc\x07&`\x16#H\xe4\xa6\xc2<f,\xde\xa8j\xb6,\x07\xb1\xaa7'
... extension.SendToBus DEBUG    Effective URL: http://localhost:1980/Lowercase/message/process/sdk
... extension.Lowercase/message/process/sdk DEBUG    Message: hello world
... extension.Lowercase/message/process/sdk INFO     HELLO WORLD

In there you can see the encrypted message was sent to our BusHandler, which was logged out in all caps. Well done!