Coverage for gwcelery/voevent/bootsteps.py: 89%

92 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1import threading 

2 

3from celery import bootsteps 

4from celery.concurrency import solo 

5 

6from .logging import log 

7from .signals import voevent_received 

8from .util import get_host_port, get_local_ivo, get_network 

9 

10__all__ = ('Broadcaster', 'Reactor', 'Receiver') 

11 

12 

13class VOEventBootStep(bootsteps.ConsumerStep): 

14 """Generic boot step to limit us to appropriate kinds of workers.""" 

15 

16 def include_if(self, consumer): 

17 """Only include this bootstep in workers that are configured to listen 

18 to the ``voevent`` queue. 

19 """ 

20 return 'voevent' in consumer.app.amqp.queues 

21 

22 def create(self, consumer): 

23 if not isinstance(consumer.pool, solo.TaskPool): 

24 raise RuntimeError( 

25 'The VOEvent broker only works with the "solo" task pool. ' 

26 'Start the worker with "--queues=voevent --pool=solo".') 

27 

28 def start(self, consumer): 

29 log.info('Starting %s', self.name) 

30 

31 def stop(self, consumer): 

32 log.info('Stopping %s', self.name) 

33 

34 

35class Reactor(VOEventBootStep): 

36 """Run the global Twisted reactor in background thread. 

37 

38 The Twisted reactor is a global run loop that drives all Twisted services 

39 and operations. This boot step starts the Twisted reactor in a background 

40 thread when the Celery consumer starts, and stops the thread when the 

41 Consumer terminates. 

42 """ 

43 

44 name = 'Twisted reactor' 

45 

46 def __init__(self, consumer, **kwargs): 

47 self._thread = None 

48 

49 def start(self, consumer): 

50 super().start(consumer) 

51 from twisted.internet import reactor 

52 self._thread = threading.Thread(target=reactor.run, args=(False,), 

53 name='TwistedReactorThread') 

54 self._thread.start() 

55 

56 def stop(self, consumer): 

57 from twisted.internet import reactor 

58 

59 super().stop(consumer) 

60 reactor.callFromThread(reactor.stop) 

61 self._thread.join() 

62 

63 

64class TwistedService(VOEventBootStep): 

65 """A generic bootstep to create, start, and stop a Twisted service.""" 

66 

67 requires = VOEventBootStep.requires + (Reactor,) 

68 

69 def __init__(self, consumer, **kwargs): 

70 self._service = None 

71 

72 def create_service(self, consumer): 

73 raise NotImplementedError 

74 

75 def start(self, consumer): 

76 from twisted.internet import reactor 

77 

78 super().start(consumer) 

79 self._service = self.create_service(consumer) 

80 reactor.callFromThread(self._service.startService) 

81 

82 def stop(self, consumer): 

83 from twisted.internet import reactor 

84 

85 super().stop(consumer) 

86 reactor.callFromThread(self._service.stopService) 

87 

88 

89class Broadcaster(TwistedService): 

90 """Comet-based VOEvent broadcaster. 

91 

92 Run a Comet-based VOEvent broadcaster 

93 (:class:`comet.protocol.broadcaster.VOEventBroadcasterFactory`). Starts 

94 after the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep. 

95 

96 A few :doc:`configuration options <configuration>` are available: 

97 

98 * ``voevent_broadcaster_address``: The address to bind to, in 

99 :samp:`{host}:{port}` format. 

100 * ``voevent_broadcaster_whitelist``: A list of hostnames, IP addresses, or 

101 CIDR address ranges from which to accept connections. 

102 

103 The list of active connections is made available :ref:`inspection 

104 <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under 

105 the ``voevent-broker-peers`` key. 

106 """ 

107 

108 name = 'VOEvent broadcaster' 

109 

110 def create_service(self, consumer): 

111 from comet.protocol.broadcaster import VOEventBroadcasterFactory 

112 from comet.utility import WhitelistingFactory 

113 from twisted.application.internet import TCPServer 

114 

115 conf = consumer.app.conf 

116 local_ivo = get_local_ivo(consumer.app) 

117 host, port = get_host_port(conf['voevent_broadcaster_address']) 

118 allow = [get_network(a) for a in conf['voevent_broadcaster_whitelist']] 

119 conf['voevent_broadcaster_factory'] = self._factory = factory = \ 

120 VOEventBroadcasterFactory(local_ivo, 0) 

121 if allow: 

122 factory = WhitelistingFactory(factory, allow, 'subscription') 

123 return TCPServer(port, factory, interface=host) 

124 

125 def info(self, consumer): 

126 try: 

127 peers = [ 

128 b.transport.getPeer().host for b in self._factory.broadcasters] 

129 except: # noqa: E722 

130 log.exception('failed to get info for voevent-broker-peers') 

131 peers = [] 

132 return {'voevent-broker-peers': peers} 

133 

134 

135class Receiver(TwistedService): 

136 """VOEvent receiver. 

137 

138 Run a Comet-based VOEvent receiver 

139 (:class:`comet.protocol.subscriber.VOEventSubscriberFactory`). Starts after 

140 the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep. 

141 

142 A few :doc:`configuration options <configuration>` are available: 

143 

144 * ``voevent_receiver_address``: The address to connect to, in 

145 :samp:`{host}:{port}` format. 

146 

147 The list of active connections is made available :ref:`inspection 

148 <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under 

149 the ``voevent-receiver-peers`` key. 

150 """ 

151 

152 name = 'VOEvent receiver' 

153 

154 requires = TwistedService.requires + ( 

155 'celery.worker.consumer.tasks:Tasks',) 

156 

157 def create_service(self, consumer): 

158 from comet.icomet import IHandler 

159 from twisted.application.internet import TCPClient 

160 from zope.interface import implementer 

161 

162 from .subscriber import VOEventSubscriberFactory 

163 

164 @implementer(IHandler) 

165 class Handler: 

166 

167 def __call__(self, event): 

168 from twisted.internet import reactor 

169 reactor.callInThread( 

170 voevent_received.send, sender=None, xml_document=event) 

171 

172 conf = consumer.app.conf 

173 local_ivo = get_local_ivo(consumer.app) 

174 host, port = get_host_port(conf['voevent_receiver_address']) 

175 self._factory = factory = VOEventSubscriberFactory( 

176 local_ivo=local_ivo, handlers=[Handler()]) 

177 return TCPClient(host, port, factory) 

178 

179 def info(self, consumer): 

180 try: 

181 peers = [ 

182 b.transport.getPeer().host for b in self._factory.subscribers] 

183 except: # noqa: E722 

184 log.exception('failed to get info for voevent-receiver-peers') 

185 peers = [] 

186 return {'voevent-receiver-peers': peers}