import traceback
from typing import Union
import requests
from q2_sdk.core.http_handlers.base_handler import Q2BaseRequestHandler
from q2_sdk.core import message_bus
from q2_sdk.core.cache import StorageLevel
[docs]
class Q2MessageBusRequestHandler(Q2BaseRequestHandler):
"""
RequestHandler that handles messages posted from the Kafka message bus
"""
MESSAGE_TYPE: Union[list, str, None] = None
def __init__(self, application, request, **kwargs):
self._route = ""
self.accepted_message_types = self.MESSAGE_TYPE
if isinstance(self.accepted_message_types, str):
self.accepted_message_types = [self.accepted_message_types]
super().__init__(application, request, **kwargs)
self.default_cache_level = StorageLevel.Stack
[docs]
def initialize(self, route=""):
self._route = route
async def post(self, *args, **kwargs):
if self._route.lower() == "error":
await self.handle_error()
return
posted_msg_type = self.current_msg_type
if posted_msg_type not in self.accepted_message_types:
self.logger.warning(
"Msg-Type: %s != one of this handler's msg-types: (%s)"
% (posted_msg_type, self.accepted_message_types)
)
return
try:
msg = message_bus.decrypt(self.request.body, self.request.headers)
self.logger.debug("Message: %s" % msg.text)
await self.handle(msg)
except Exception:
self.logger.error(traceback.format_exc())
# Still need a 200 or Kafka will not move on to other messages
# Eventually we'll want to send these to a dead letter queue
self.set_status(200)
@property
def current_msg_type(self):
"""
Extracts the message type from the current message request
"""
return self.request.headers.get("q2msg-type", "").split("sdk-generic.")[-1]
[docs]
async def handle(self, message: message_bus.Message):
"""
This is the hook where you can put your handling logic
"""
raise NotImplementedError
[docs]
async def handle_error(self):
"""
This is the hook where you can put your error handling logic
"""
self.logger.error("Error from consumer: %s", self.request.body.decode())
self.logger.error("Headers:\n%s", str(self.request.headers))
[docs]
async def send_message(self, message: str) -> requests.Response:
"""
Put a message onto the bus (into sdk queue)
"""
message_type = self.MESSAGE_TYPE
if isinstance(message_type, list):
err_msg = (
"self.MESSAGE_TYPE is a list. Please use message_bus.push directly."
)
assert len(message_type) == 1, err_msg
message_type = message_type[0]
return await message_bus.push(self.logger, message, message_type)