Source code for q2_cores.XP2.queries.message_maker

import logging
from typing import Optional
from lxml import etree
from lxml.builder import E

from q2_sdk.models.cores.queries.base_query import BaseQuery


[docs] class MessageMaker(BaseQuery): """ Builds the message wrapper that is used by all XP2 passthru calls """ def __init__( self, logger: logging.Logger, class_name: Optional[str] = None, namespace: Optional[str] = None, ): if not class_name: class_name = "Apex.ApexMemberServices211.MemberServices" self._class_name = class_name if not namespace: namespace = "http://xpsystems.com/services/apex/messages" self._namespace = namespace super().__init__(logger)
[docs] def make_message(self, method_name: str, message_nodes: list) -> str: """ Wraps message nodes in the message structure to be sent to the core :param method_name: The name of the method being called on XP2 :param message_nodes: A list of xml nodes :return: The proper message to the core """ xml = etree.tostring( E.Passthru( E.MethodContent(*message_nodes), Class=self._class_name, Method=method_name, ), encoding="unicode", ) return xml