Coverage for gwcelery/voevent/subscriber.py: 71%
14 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Subclasses of :class:`comet.protocol.VOEventSubscriber` and
2:class:`comet.protocol.VOEventSubscriberFactory` that allow inspection of the
3list of active subscribers.
4"""
5from comet.protocol.subscriber import VOEventSubscriber as _VOEventSubscriber
6from comet.protocol.subscriber import \
7 VOEventSubscriberFactory as _VOEventSubscriberFactory
10class VOEventSubscriber(_VOEventSubscriber):
12 def connectionMade(self, *args): # noqa: N802
13 self.factory.subscribers.append(self)
14 super().connectionMade(*args)
16 def connectionLost(self, *args): # noqa: N802
17 self.factory.subscribers.remove(self)
18 return super().connectionLost(*args)
21class VOEventSubscriberFactory(_VOEventSubscriberFactory):
23 protocol = VOEventSubscriber
25 def __init__(self, *args, **kwargs):
26 super().__init__(*args, **kwargs)
27 self.subscribers = []