Source code for q2_sdk.models.unique_stack

from q2_sdk.core.non_http_handlers.udp import writer as udp_writer


[docs] class UniqueStack(list): """ Behaves like a normal Python list but will only keep items once and will only fill to a certain size """ def __init__(self, max_size): super().__init__() self.max_size = max_size self._fork_requests = None @property def fork_requests(self): try: from q2_sdk.core.configuration import settings except ImportError: return False # During startup this can be partially initialized if not self._fork_requests: self._fork_requests = settings.FORK_REQUESTS return self._fork_requests
[docs] def append(self, item): if item in self: index_of_item = self.index(item) del self[index_of_item] if len(self) == self.max_size: del self[0] super().append(item)
[docs] def extend(self, iterable): for item in iterable: self.append(item)
[docs] class ForkedUniqueStack(UniqueStack): """ Behaves like UniqueStack but will post back updates to the forked parent server if appropriate Set UDP_MESSAGE_TYPE to an appropriate reference from q2_sdk.core.non_http_handlers.udp.writer.MsgType """ UDP_MESSAGE_TYPE = None
[docs] def append(self, item): super().append(item) self._alert_forked_parent()
[docs] def extend(self, iterable): super().extend(iterable) self._alert_forked_parent()
[docs] def remove(self, item): super().remove(item) self._alert_forked_parent()
[docs] def insert(self, index, item): super().insert(index, item) self._alert_forked_parent()
[docs] def pop(self, index): super().pop(index) self._alert_forked_parent()
[docs] def clear(self): super().clear() self._alert_forked_parent()
def _alert_forked_parent(self): # pragma: no cover if self.fork_requests: udp_writer.send_msg(self.UDP_MESSAGE_TYPE, self)