Coverage for gwcelery/kafka/bootsteps.py: 68%

155 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1import json 

2from functools import cache 

3from os import path 

4from threading import Thread 

5 

6from celery import bootsteps 

7from celery.concurrency import solo 

8from celery.utils.log import get_logger 

9from confluent_kafka.error import KafkaException 

10from fastavro.schema import parse_schema 

11from hop import Stream, auth 

12from hop.io import list_topics 

13from hop.models import AvroBlob, JSONBlob, VOEvent 

14from xdg.BaseDirectory import xdg_config_home 

15 

16from ..util import read_json 

17from .signals import kafka_record_consumed 

18 

19__all__ = ('Producer', 'Consumer') 

20 

21log = get_logger(__name__) 

22 

23 

24@cache 

25def schema(): 

26 # The order does not matter other than the Alert schema must be loaded last 

27 # because it references the other schema. All of the schema are saved in 

28 # named_schemas, but we only need to save a reference to the the Alert 

29 # schema to write the packet. 

30 # NOTE Specifying expand=True when calling parse_schema is okay when only 

31 # one schema contains references to other schema, in our case only the 

32 # alerts schema contains references to other schema. More complicated 

33 # relationships between schema though can lead to behavior that does not 

34 # conform to the avro spec, and a different method will need to be used to 

35 # load the schema. See https://github.com/fastavro/fastavro/issues/624 for 

36 # more info. 

37 named_schemas = {} 

38 for s in ['igwn.alerts.v1_0.ExternalCoincInfo.avsc', 

39 'igwn.alerts.v1_0.EventInfo.avsc', 

40 'igwn.alerts.v1_0.AlertType.avsc', 

41 'igwn.alerts.v1_0.Alert.avsc']: 

42 schema = parse_schema(read_json('igwn_gwalert_schema', s), 

43 named_schemas, expand=True) 

44 

45 return schema 

46 

47 

48def _load_hopauth_map(): 

49 hop_auth = auth.load_auth() 

50 with open(path.join(xdg_config_home, 

51 'gwcelery/kafka_credential_map.json'), 

52 'r') as fo: 

53 kafka_credential_map = json.load(fo) 

54 

55 return hop_auth, kafka_credential_map 

56 

57 

58class AvroBlobWrapper(AvroBlob): 

59 

60 def __init__(self, payload): 

61 return super().__init__([payload], schema()) 

62 

63 

64class KafkaBase: 

65 

66 def __init__(self, name, config, prefix): 

67 self.name = name 

68 self._config = config 

69 if config.get('auth') is not False: 

70 # Users only add auth to config to disable authentication 

71 self._credential = self.get_auth(prefix) 

72 else: 

73 # Dont use credentials 

74 self._credential = False 

75 self._hop_stream = Stream(self._credential) 

76 

77 # FIXME Drop get_payload_content method once 

78 # https://github.com/scimma/hop-client/pull/190 is merged 

79 if config['suffix'] == 'avro': 

80 self.serialization_model = AvroBlobWrapper 

81 self.get_payload_content = lambda payload: payload.content[0] 

82 elif config['suffix'] == 'json': 

83 self.serialization_model = JSONBlob 

84 self.get_payload_content = lambda payload: payload.content 

85 elif config['suffix'] == 'xml': 

86 self.serialization_model = VOEvent 

87 self.get_payload_content = lambda payload: payload.content 

88 else: 

89 raise NotImplementedError( 

90 'Supported serialization method required for alert notices' 

91 ) 

92 

93 def get_auth(self, prefix): 

94 hop_auth, kafka_credential_map = _load_hopauth_map() 

95 

96 # kafka_credential_map contains map between logical name for broker 

97 # topic and username 

98 username = kafka_credential_map.get(prefix, {}).get(self.name) 

99 if username is None: 

100 raise ValueError('Unable to find {} entry in kafka credential map ' 

101 'for {}'.format(prefix, self.name)) 

102 

103 # hop auth contains map between username and password/hostname 

104 target_auth = None 

105 for cred in hop_auth: 

106 if cred.username != username: 

107 continue 

108 target_auth = cred 

109 break 

110 else: 

111 raise ValueError('Unable to find entry in hop auth file for ' 

112 'username {}'.format(username)) 

113 return target_auth 

114 

115 

116class KafkaListener(KafkaBase): 

117 

118 def __init__(self, name, config): 

119 super().__init__(name, config, 'consumer') 

120 self._open_hop_stream = None 

121 self.running = False 

122 # Don't kill worker if listener can't connect 

123 try: 

124 self._open_hop_stream = self._hop_stream.open(config['url'], 'r') 

125 except KafkaException: 

126 log.exception('Connection to %s failed', self._config["url"]) 

127 except ValueError: 

128 # Hop client will return a ValueError if the topic doesn't exist on 

129 # the broker 

130 log.exception('Connection to %s failed', self._config["url"]) 

131 

132 def listen(self): 

133 self.running = True 

134 # Restart the consumer when non-fatal errors come up, similar to 

135 # gwcelery.igwn_alert.IGWNAlertClient 

136 while self.running: 

137 try: 

138 for message in self._open_hop_stream: 

139 # Send signal 

140 kafka_record_consumed.send( 

141 None, 

142 name=self.name, 

143 record=self.get_payload_content(message) 

144 ) 

145 except KafkaException as exception: 

146 err = exception.args[0] 

147 if self.running is False: 

148 # The close attempt in the KafkaListener stop method throws 

149 # a KafkaException that's caught by this try except, so we 

150 # just have to catch this case for the worker to shut down 

151 # gracefully 

152 pass 

153 elif err.fatal(): 

154 # stop running and close before raising error 

155 self.running = False 

156 self._open_hop_stream.close() 

157 raise 

158 else: 

159 log.warning( 

160 "non-fatal error from kafka: {}".format(err.name)) 

161 

162 

163class KafkaWriter(KafkaBase): 

164 """Write Kafka topics and monitor health.""" 

165 

166 def __init__(self, name, config): 

167 super().__init__(name, config, 'producer') 

168 self._open_hop_stream = self._hop_stream.open( 

169 config['url'], 'w', 

170 message_max_bytes=1024 * 1024 * 8, 

171 compression_type='zstd') 

172 

173 # Set up flag for failed delivery of messages 

174 self.kafka_delivery_failures = False 

175 

176 def kafka_topic_up(self): 

177 '''Check for problems in broker and topic. Returns True is broker and 

178 topic appear to be up, returns False otherwise.''' 

179 kafka_url = self._config['url'] 

180 _, _, broker, topic = kafka_url.split('/') 

181 try: 

182 topics = list_topics(kafka_url, auth=self._credential, timeout=5) 

183 if topics[topic].error is None: 

184 log.info(f'{kafka_url} appears to be functioning properly') 

185 return True 

186 else: 

187 log.error(f'{topic} at {broker} appears to be down') 

188 return False 

189 except KafkaException: 

190 log.error(f'{broker} appears to be down') 

191 return False 

192 

193 def _delivery_cb(self, kafka_error, message): 

194 # FIXME Get rid of if-else logic once 

195 # https://github.com/scimma/hop-client/pull/190 is merged 

196 if self._config['suffix'] == 'avro': 

197 record = AvroBlob.deserialize(message.value()).content[0] 

198 else: 

199 record = JSONBlob.deserialize(message.value()).content 

200 kafka_url = self._config['url'] 

201 if kafka_error is None: 

202 self.kafka_delivery_failures = False 

203 else: 

204 log.error(f'Received error code {kafka_error.code()} ' 

205 f'({kafka_error.name()}, {kafka_error.str()}) ' 

206 f'when delivering {record["superevent_id"]} ' 

207 f'{record["alert_type"]} alert to {kafka_url}') 

208 self.kafka_delivery_failures = True 

209 

210 def write(self, payload): 

211 self._open_hop_stream.write(payload, 

212 delivery_callback=self._delivery_cb) 

213 self._open_hop_stream.flush() 

214 

215 

216class KafkaBootStep(bootsteps.ConsumerStep): 

217 """Generic boot step to limit us to appropriate kinds of workers. 

218 

219 Only include this bootstep in workers that are configured to listen to the 

220 ``kafka`` queue. 

221 """ 

222 

223 def create(self, consumer): 

224 if not isinstance(consumer.pool, solo.TaskPool): 

225 raise RuntimeError( 

226 'The Kafka broker only works with the "solo" task pool. ' 

227 'Start the worker with "--queues=kafka --pool=solo".') 

228 

229 

230class Consumer(KafkaBootStep): 

231 """Run MOU Kafka consumers in background threads. 

232 """ 

233 

234 name = 'Kafka consumer' 

235 

236 def include_if(self, consumer): 

237 return 'kafka' in consumer.app.amqp.queues 

238 

239 def start(self, consumer): 

240 log.info(f'Starting {self.name}, topics: ' + 

241 ' '.join(config['url'] for config in 

242 consumer.app.conf['kafka_consumer_config'].values())) 

243 self._listeners = { 

244 key: KafkaListener(key, config) for key, config in 

245 consumer.app.conf['kafka_consumer_config'].items() 

246 } 

247 self.threads = [ 

248 Thread(target=s.listen, name=f'{key}_KafkaConsumerThread') for key, 

249 s in self._listeners.items() if s._open_hop_stream is not None 

250 ] 

251 for thread in self.threads: 

252 thread.start() 

253 

254 def stop(self, consumer): 

255 log.info('Closing connection to topics: ' + 

256 ' '.join(listener._config['url'] for listener in 

257 self._listeners.values() if listener._open_hop_stream 

258 is not None)) 

259 for s in self._listeners.values(): 

260 s.running = False 

261 if s._open_hop_stream is not None: 

262 s._open_hop_stream.close() 

263 

264 for thread in self.threads: 

265 thread.join() 

266 

267 def info(self, consumer): 

268 return { 

269 'active_kafka_consumers': {listener.name for listener in 

270 self._listeners.values() if 

271 listener._open_hop_stream is not 

272 None} 

273 } 

274 

275 

276class Producer(KafkaBootStep): 

277 """Run the global Kafka producers in a background thread. 

278 

279 Flags that document the health of the connections are made available 

280 :ref:`inspection <celery:worker-inspect>` with the ``gwcelery inspect 

281 stats`` command under the ``kafka_topic_up`` and 

282 ``kafka_delivery_failures`` keys. 

283 """ 

284 

285 name = 'Kafka producer' 

286 

287 def include_if(self, consumer): 

288 return 'kafka' in consumer.app.amqp.queues 

289 

290 def start(self, consumer): 

291 log.info(f'Starting {self.name}, topics: ' + 

292 ' '.join(config['url'] for config in 

293 consumer.app.conf['kafka_alert_config'].values())) 

294 consumer.app.conf['kafka_streams'] = self._writers = { 

295 brokerhost: KafkaWriter(brokerhost, config) for brokerhost, config 

296 in consumer.app.conf['kafka_alert_config'].items() 

297 } 

298 

299 def stop(self, consumer): 

300 log.info('Closing connection to topics: ' + 

301 ' '.join(config['url'] for config in 

302 consumer.app.conf['kafka_alert_config'].values())) 

303 for s in self._writers.values(): 

304 s._open_hop_stream.close() 

305 

306 def info(self, consumer): 

307 return {'kafka_topic_up': { 

308 brokerhost: writer.kafka_topic_up() for brokerhost, 

309 writer in self._writers.items() 

310 }, 

311 'kafka_delivery_failures': { 

312 brokerhost: writer.kafka_delivery_failures for 

313 brokerhost, writer in self._writers.items() 

314 }}