Coverage for gwcelery/voevent/bootsteps.py: 83%
92 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1import threading
3from celery import bootsteps
4from celery.concurrency import solo
6from .logging import log
7from .signals import voevent_received
8from .util import get_host_port, get_local_ivo, get_network
10__all__ = ('Broadcaster', 'Reactor', 'Receiver')
13class VOEventBootStep(bootsteps.ConsumerStep):
14 """Generic boot step to limit us to appropriate kinds of workers."""
16 def include_if(self, consumer):
17 """Only include this bootstep in workers that are configured to listen
18 to the ``voevent`` queue.
19 """
20 return 'voevent' in consumer.app.amqp.queues
22 def create(self, consumer):
23 if not isinstance(consumer.pool, solo.TaskPool):
24 raise RuntimeError(
25 'The VOEvent broker only works with the "solo" task pool. '
26 'Start the worker with "--queues=voevent --pool=solo".')
28 def start(self, consumer):
29 log.info('Starting %s', self.name)
31 def stop(self, consumer):
32 log.info('Stopping %s', self.name)
35class Reactor(VOEventBootStep):
36 """Run the global Twisted reactor in background thread.
38 The Twisted reactor is a global run loop that drives all Twisted services
39 and operations. This boot step starts the Twisted reactor in a background
40 thread when the Celery consumer starts, and stops the thread when the
41 Consumer terminates.
42 """
44 name = 'Twisted reactor'
46 def __init__(self, consumer, **kwargs):
47 self._thread = None
49 def start(self, consumer):
50 super().start(consumer)
51 from twisted.internet import reactor
52 self._thread = threading.Thread(target=reactor.run, args=(False,),
53 name='TwistedReactorThread')
54 self._thread.start()
56 def stop(self, consumer):
57 from twisted.internet import reactor
59 super().stop(consumer)
60 reactor.callFromThread(reactor.stop)
61 self._thread.join()
64class TwistedService(VOEventBootStep):
65 """A generic bootstep to create, start, and stop a Twisted service."""
67 requires = VOEventBootStep.requires + (Reactor,)
69 def __init__(self, consumer, **kwargs):
70 self._service = None
72 def create_service(self, consumer):
73 raise NotImplementedError
75 def start(self, consumer):
76 from twisted.internet import reactor
78 super().start(consumer)
79 self._service = self.create_service(consumer)
80 reactor.callFromThread(self._service.startService)
82 def stop(self, consumer):
83 from twisted.internet import reactor
85 super().stop(consumer)
86 reactor.callFromThread(self._service.stopService)
89class Broadcaster(TwistedService):
90 """Comet-based VOEvent broadcaster.
92 Run a Comet-based VOEvent broadcaster
93 (:class:`comet.protocol.broadcaster.VOEventBroadcasterFactory`). Starts
94 after the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep.
96 A few :doc:`configuration options <configuration>` are available:
98 * ``voevent_broadcaster_address``: The address to bind to, in
99 :samp:`{host}:{port}` format.
100 * ``voevent_broadcaster_whitelist``: A list of hostnames, IP addresses, or
101 CIDR address ranges from which to accept connections.
103 The list of active connections is made available :ref:`inspection
104 <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under
105 the ``voevent-broker-peers`` key.
106 """
108 name = 'VOEvent broadcaster'
110 def create_service(self, consumer):
111 from comet.protocol.broadcaster import VOEventBroadcasterFactory
112 from comet.utility import WhitelistingFactory
113 from twisted.application.internet import TCPServer
115 conf = consumer.app.conf
116 local_ivo = get_local_ivo(consumer.app)
117 host, port = get_host_port(conf['voevent_broadcaster_address'])
118 allow = [get_network(a) for a in conf['voevent_broadcaster_whitelist']]
119 conf['voevent_broadcaster_factory'] = self._factory = factory = \
120 VOEventBroadcasterFactory(local_ivo, 0)
121 if allow:
122 factory = WhitelistingFactory(factory, allow, 'subscription')
123 return TCPServer(port, factory, interface=host)
125 def info(self, consumer):
126 try:
127 peers = [
128 b.transport.getPeer().host for b in self._factory.broadcasters]
129 except: # noqa: E722
130 log.exception('failed to get info for voevent-broker-peers')
131 peers = []
132 return {'voevent-broker-peers': peers}
135class Receiver(TwistedService):
136 """VOEvent receiver.
138 Run a Comet-based VOEvent receiver
139 (:class:`comet.protocol.subscriber.VOEventSubscriberFactory`). Starts after
140 the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep.
142 A few :doc:`configuration options <configuration>` are available:
144 * ``voevent_receiver_address``: The address to connect to, in
145 :samp:`{host}:{port}` format.
147 The list of active connections is made available :ref:`inspection
148 <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under
149 the ``voevent-receiver-peers`` key.
150 """
152 name = 'VOEvent receiver'
154 requires = TwistedService.requires + (
155 'celery.worker.consumer.tasks:Tasks',)
157 def create_service(self, consumer):
158 from comet.icomet import IHandler
159 from twisted.application.internet import TCPClient
160 from zope.interface import implementer
162 from .subscriber import VOEventSubscriberFactory
164 @implementer(IHandler)
165 class Handler:
167 def __call__(self, event):
168 from twisted.internet import reactor
169 reactor.callInThread(
170 voevent_received.send, sender=None, xml_document=event)
172 conf = consumer.app.conf
173 local_ivo = get_local_ivo(consumer.app)
174 host, port = get_host_port(conf['voevent_receiver_address'])
175 self._factory = factory = VOEventSubscriberFactory(
176 local_ivo=local_ivo, handlers=[Handler()])
177 return TCPClient(host, port, factory)
179 def info(self, consumer):
180 try:
181 peers = [
182 b.transport.getPeer().host for b in self._factory.subscribers]
183 except: # noqa: E722
184 log.exception('failed to get info for voevent-receiver-peers')
185 peers = []
186 return {'voevent-receiver-peers': peers}