Message Bus Extension Tutorial
Message Busses are a set of queues which one or more consumers can pull messages from and act on. They are useful for asynchronous processing of data that is generated at an earlier time. Decoupling the creation of “there is work to do” from the processing of that work is a powerful ability that leads to flexible workflows.
In this tutorial, we will create a Message Bus extension that reads messages from a bus and the echos them back in all caps.
First, we’ll create a new extension named AllCaps
with the q2 create_extension AllCaps
command and choosing the “Message Bus Extension” type. when prompted for the type
of messages the extension will consume, answer “Lowercase”:
$ q2 create_extension AllCaps What type of extension are you creating? 1) Online (default) 2) SSO (Third Party Integration) 3) Ardent (API) 4) Q2Console (Backoffice) 5) Central (Legacy Backoffice) 6) Adapter 7) Audit Action 8) Custom Health Check 9) Message Bus <--------- 10) Caliper API Custom Endpoint 11) Base Extension Please make a selection and press Return [1]: 9 What type of messages should this handler consume? Only this type of message will be consumed by this bus: lowercase
This will generate similar files to any other extension type, most importantly an
extension.py
:
""" AllCaps Extension """ from q2_sdk.core.http_handlers.message_bus_handler import Q2MessageBusRequestHandler from q2_sdk.core.message_bus import Message class AllCapsHandler(Q2MessageBusRequestHandler): MESSAGE_TYPE = 'Lowercase' async def handle(self, message: Message): """ This is the hook where you can put your handling logic. message.text contains the message body posted from the bus. """ pass
and a urls.py
:
from tornado.web import url from .extension import AllCapsHandler # url('/regex_pattern', HandlerClass, kwargs=None, name=None) # http://www.tornadoweb.org/en/stable/web.html?#tornado.web.URLSpec URL_PATTERNS = [ url(r'/AllCaps/?', AllCapsHandler), url(r'/Lowercase/message/process/sdk', AllCapsHandler), ]
The urls.py
exposes two endpoints. /AllCaps
, which will allow you to call your extension like a base extension,
invoking get()
on a GET call, post()
on a POST call, etc. The second endpoint is the one the bus will
post to when new messages arrive. In this case, that’s /Lowercase/message/process/sdk
. In other words, when
a message arrives on the Kafka bus, a short time after that endpoint will receive a POST request with the
encrypted message body and a number of headers which aid in decrypting it.
When a message is posted to this extension, the handle()
method will be invoked, with the message, as an object,
passed as the first parameter message
. Let’s log it out as a capitalized variant. Replace your
handle method with the following:
async def handle(self, message: Message): """ This is the hook where you can put your handling logic. message.text contains the message body posted from the bus. """ self.logger.info(message.text.upper())
That’s all there is to it! Now of course, it would be nice to test it by having a message posted to this extension from the bus. However, you likely don’t have a kafka bus running locally to test with. Luckily, when running in DEBUG mode (as your server is when you’re working on it), the message bus itself isn’t needed. Any message that you post to “the bus” will be immediately sent to your handler, bypassing the dev time need for an active bus.
To send messages, let’s set up an base extension that does just that:
$ q2 create_extension SendToBus -t base
Add an import line at the top:
from q2_sdk.core import message_bus
And add a get()
method:
async def get(self): await message_bus.push(self.logger, 'hello world', 'Lowercase')
A few things to note here:
The extension pushing a message onto the bus does not have to be the same as the one processing it
The third parameter of the push() method is the message type. This needs to match the type chosen in the earlier step of setting up the message_bus handler
With that, we can start our server with q2 run -l DEBUG
and call <server_base_url>/SendToBus
in a browser.
Note
By adding the DEBUG level logging flag, we can see what’s actually being sent and received at the network level
You should see some lines that look a bit like this:
... extension.SendToBus INFO Sending HTTP request POST http://localhost:1980/Lowercase/message/process/sdk ... extension.SendToBus DEBUG Request Headers: {'q2msg-type': 'sdk-generic.Lowercase', 'q2msg-hmac': b'sDRL1O3eQ5CUFAC84CuYw8YAvok=', 'q2msg-gitrev': '5ef4873', 'q2msg-version': '1.3.0', 'q2msg-envstack': 'KRAYT-DEMO', 'q2msg-cryptokey': '0', 'Content-Length': '46'} ... extension.SendToBus DEBUG Request Body: b'\xd7\x12\\=M\x0c\xd0\xa0"\x0b\xdf1\xdc\xa3\xad\x05\xafFp\xb3\x18\xf5\xc7|\xfc\x07&`\x16#H\xe4\xa6\xc2<f,\xde\xa8j\xb6,\x07\xb1\xaa7' ... extension.SendToBus DEBUG Effective URL: http://localhost:1980/Lowercase/message/process/sdk ... extension.Lowercase/message/process/sdk DEBUG Message: hello world ... extension.Lowercase/message/process/sdk INFO HELLO WORLD
In there you can see the encrypted message was sent to our BusHandler, which was logged out in all caps. Well done!