Source code for q2_sdk.core.http_handlers.message_bus_handler

import traceback
from typing import Union

import requests

from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core import message_bus
from q2_sdk.core.cache import StorageLevel


[docs] class Q2MessageBusRequestHandler(Q2BaseRequestHandler): """ RequestHandler that handles messages posted from the Kafka message bus """ MESSAGE_TYPE: Union[list, str, None] = None def __init__(self, application, request, **kwargs): self._route = "" self.accepted_message_types = self.MESSAGE_TYPE if isinstance(self.accepted_message_types, str): self.accepted_message_types = [self.accepted_message_types] super().__init__(application, request, **kwargs) self.default_cache_level = StorageLevel.Stack
[docs] def initialize(self, route=""): self._route = route
async def post(self, *args, **kwargs): if self._route.lower() == "error": await self.handle_error() return posted_msg_type = self.current_msg_type if posted_msg_type not in self.accepted_message_types: self.logger.warning( "Msg-Type: %s != one of this handler's msg-types: (%s)" % (posted_msg_type, self.accepted_message_types) ) return try: msg = message_bus.decrypt(self.request.body, self.request.headers) self.logger.debug("Message: %s" % msg.text) await self.handle(msg) except Exception: self.logger.error(traceback.format_exc()) # Still need a 200 or Kafka will not move on to other messages # Eventually we'll want to send these to a dead letter queue self.set_status(200) @property def current_msg_type(self): """ Extracts the message type from the current message request """ return self.request.headers.get("q2msg-type", "").split("sdk-generic.")[-1]
[docs] async def handle(self, message: message_bus.Message): """ This is the hook where you can put your handling logic """ raise NotImplementedError
[docs] async def handle_error(self): """ This is the hook where you can put your error handling logic """ self.logger.error("Error from consumer: %s", self.request.body.decode()) self.logger.error("Headers:\n%s", str(self.request.headers))
[docs] async def send_message(self, message: str) -> requests.Response: """ Put a message onto the bus (into sdk queue) """ message_type = self.MESSAGE_TYPE if isinstance(message_type, list): err_msg = ( "self.MESSAGE_TYPE is a list. Please use message_bus.push directly." ) assert len(message_type) == 1, err_msg message_type = message_type[0] return await message_bus.push(self.logger, message, message_type)