from q2_sdk.core.non_http_handlers.udp import writer as udp_writer
[docs]
class UniqueStack(list):
"""
Behaves like a normal Python list but will only
keep items once and will only fill to a certain size
"""
def __init__(self, max_size):
super().__init__()
self.max_size = max_size
self._fork_requests = None
@property
def fork_requests(self):
try:
from q2_sdk.core.configuration import settings
except ImportError:
return False # During startup this can be partially initialized
if not self._fork_requests:
self._fork_requests = settings.FORK_REQUESTS
return self._fork_requests
[docs]
def append(self, item):
if item in self:
index_of_item = self.index(item)
del self[index_of_item]
if len(self) == self.max_size:
del self[0]
super().append(item)
[docs]
def extend(self, iterable):
for item in iterable:
self.append(item)
[docs]
class ForkedUniqueStack(UniqueStack):
"""
Behaves like UniqueStack but will post back
updates to the forked parent server if appropriate
Set UDP_MESSAGE_TYPE to an appropriate reference from
q2_sdk.core.non_http_handlers.udp.writer.MsgType
"""
UDP_MESSAGE_TYPE = None
[docs]
def append(self, item):
super().append(item)
self._alert_forked_parent()
[docs]
def extend(self, iterable):
super().extend(iterable)
self._alert_forked_parent()
[docs]
def remove(self, item):
super().remove(item)
self._alert_forked_parent()
[docs]
def insert(self, index, item):
super().insert(index, item)
self._alert_forked_parent()
[docs]
def pop(self, index):
super().pop(index)
self._alert_forked_parent()
[docs]
def clear(self):
super().clear()
self._alert_forked_parent()
def _alert_forked_parent(self): # pragma: no cover
if self.fork_requests:
udp_writer.send_msg(self.UDP_MESSAGE_TYPE, self)