Coverage for gwcelery/kafka/bootsteps.py: 68%
155 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1import json
2from functools import cache
3from os import path
4from threading import Thread
6from celery import bootsteps
7from celery.concurrency import solo
8from celery.utils.log import get_logger
9from confluent_kafka.error import KafkaException
10from fastavro.schema import parse_schema
11from hop import Stream, auth
12from hop.io import list_topics
13from hop.models import AvroBlob, JSONBlob, VOEvent
14from xdg.BaseDirectory import xdg_config_home
16from ..util import read_json
17from .signals import kafka_record_consumed
19__all__ = ('Producer', 'Consumer')
21log = get_logger(__name__)
24@cache
25def schema():
26 # The order does not matter other than the Alert schema must be loaded last
27 # because it references the other schema. All of the schema are saved in
28 # named_schemas, but we only need to save a reference to the the Alert
29 # schema to write the packet.
30 # NOTE Specifying expand=True when calling parse_schema is okay when only
31 # one schema contains references to other schema, in our case only the
32 # alerts schema contains references to other schema. More complicated
33 # relationships between schema though can lead to behavior that does not
34 # conform to the avro spec, and a different method will need to be used to
35 # load the schema. See https://github.com/fastavro/fastavro/issues/624 for
36 # more info.
37 named_schemas = {}
38 for s in ['igwn.alerts.v1_0.ExternalCoincInfo.avsc',
39 'igwn.alerts.v1_0.EventInfo.avsc',
40 'igwn.alerts.v1_0.AlertType.avsc',
41 'igwn.alerts.v1_0.Alert.avsc']:
42 schema = parse_schema(read_json('igwn_gwalert_schema', s),
43 named_schemas, expand=True)
45 return schema
48def _load_hopauth_map():
49 hop_auth = auth.load_auth()
50 with open(path.join(xdg_config_home,
51 'gwcelery/kafka_credential_map.json'),
52 'r') as fo:
53 kafka_credential_map = json.load(fo)
55 return hop_auth, kafka_credential_map
58class AvroBlobWrapper(AvroBlob):
60 def __init__(self, payload):
61 return super().__init__([payload], schema())
64class KafkaBase:
66 def __init__(self, name, config, prefix):
67 self.name = name
68 self._config = config
69 if config.get('auth') is not False:
70 # Users only add auth to config to disable authentication
71 self._credential = self.get_auth(prefix)
72 else:
73 # Dont use credentials
74 self._credential = False
75 self._hop_stream = Stream(self._credential)
77 # FIXME Drop get_payload_content method once
78 # https://github.com/scimma/hop-client/pull/190 is merged
79 if config['suffix'] == 'avro':
80 self.serialization_model = AvroBlobWrapper
81 self.get_payload_content = lambda payload: payload.content[0]
82 elif config['suffix'] == 'json':
83 self.serialization_model = JSONBlob
84 self.get_payload_content = lambda payload: payload.content
85 elif config['suffix'] == 'xml':
86 self.serialization_model = VOEvent
87 self.get_payload_content = lambda payload: payload.content
88 else:
89 raise NotImplementedError(
90 'Supported serialization method required for alert notices'
91 )
93 def get_auth(self, prefix):
94 hop_auth, kafka_credential_map = _load_hopauth_map()
96 # kafka_credential_map contains map between logical name for broker
97 # topic and username
98 username = kafka_credential_map.get(prefix, {}).get(self.name)
99 if username is None:
100 raise ValueError('Unable to find {} entry in kafka credential map '
101 'for {}'.format(prefix, self.name))
103 # hop auth contains map between username and password/hostname
104 target_auth = None
105 for cred in hop_auth:
106 if cred.username != username:
107 continue
108 target_auth = cred
109 break
110 else:
111 raise ValueError('Unable to find entry in hop auth file for '
112 'username {}'.format(username))
113 return target_auth
116class KafkaListener(KafkaBase):
118 def __init__(self, name, config):
119 super().__init__(name, config, 'consumer')
120 self._open_hop_stream = None
121 self.running = False
122 # Don't kill worker if listener can't connect
123 try:
124 self._open_hop_stream = self._hop_stream.open(config['url'], 'r')
125 except KafkaException:
126 log.exception('Connection to %s failed', self._config["url"])
127 except ValueError:
128 # Hop client will return a ValueError if the topic doesn't exist on
129 # the broker
130 log.exception('Connection to %s failed', self._config["url"])
132 def listen(self):
133 self.running = True
134 # Restart the consumer when non-fatal errors come up, similar to
135 # gwcelery.igwn_alert.IGWNAlertClient
136 while self.running:
137 try:
138 for message in self._open_hop_stream:
139 # Send signal
140 kafka_record_consumed.send(
141 None,
142 name=self.name,
143 record=self.get_payload_content(message)
144 )
145 except KafkaException as exception:
146 err = exception.args[0]
147 if self.running is False:
148 # The close attempt in the KafkaListener stop method throws
149 # a KafkaException that's caught by this try except, so we
150 # just have to catch this case for the worker to shut down
151 # gracefully
152 pass
153 elif err.fatal():
154 # stop running and close before raising error
155 self.running = False
156 self._open_hop_stream.close()
157 raise
158 else:
159 log.warning(
160 "non-fatal error from kafka: {}".format(err.name))
163class KafkaWriter(KafkaBase):
164 """Write Kafka topics and monitor health."""
166 def __init__(self, name, config):
167 super().__init__(name, config, 'producer')
168 self._open_hop_stream = self._hop_stream.open(
169 config['url'], 'w',
170 message_max_bytes=1024 * 1024 * 8,
171 compression_type='zstd')
173 # Set up flag for failed delivery of messages
174 self.kafka_delivery_failures = False
176 def kafka_topic_up(self):
177 '''Check for problems in broker and topic. Returns True is broker and
178 topic appear to be up, returns False otherwise.'''
179 kafka_url = self._config['url']
180 _, _, broker, topic = kafka_url.split('/')
181 try:
182 topics = list_topics(kafka_url, auth=self._credential, timeout=5)
183 if topics[topic].error is None:
184 log.info(f'{kafka_url} appears to be functioning properly')
185 return True
186 else:
187 log.error(f'{topic} at {broker} appears to be down')
188 return False
189 except KafkaException:
190 log.error(f'{broker} appears to be down')
191 return False
193 def _delivery_cb(self, kafka_error, message):
194 # FIXME Get rid of if-else logic once
195 # https://github.com/scimma/hop-client/pull/190 is merged
196 if self._config['suffix'] == 'avro':
197 record = AvroBlob.deserialize(message.value()).content[0]
198 else:
199 record = JSONBlob.deserialize(message.value()).content
200 kafka_url = self._config['url']
201 if kafka_error is None:
202 self.kafka_delivery_failures = False
203 else:
204 log.error(f'Received error code {kafka_error.code()} '
205 f'({kafka_error.name()}, {kafka_error.str()}) '
206 f'when delivering {record["superevent_id"]} '
207 f'{record["alert_type"]} alert to {kafka_url}')
208 self.kafka_delivery_failures = True
210 def write(self, payload):
211 self._open_hop_stream.write(payload,
212 delivery_callback=self._delivery_cb)
213 self._open_hop_stream.flush()
216class KafkaBootStep(bootsteps.ConsumerStep):
217 """Generic boot step to limit us to appropriate kinds of workers.
219 Only include this bootstep in workers that are configured to listen to the
220 ``kafka`` queue.
221 """
223 def create(self, consumer):
224 if not isinstance(consumer.pool, solo.TaskPool):
225 raise RuntimeError(
226 'The Kafka broker only works with the "solo" task pool. '
227 'Start the worker with "--queues=kafka --pool=solo".')
230class Consumer(KafkaBootStep):
231 """Run MOU Kafka consumers in background threads.
232 """
234 name = 'Kafka consumer'
236 def include_if(self, consumer):
237 return 'kafka' in consumer.app.amqp.queues
239 def start(self, consumer):
240 log.info(f'Starting {self.name}, topics: ' +
241 ' '.join(config['url'] for config in
242 consumer.app.conf['kafka_consumer_config'].values()))
243 self._listeners = {
244 key: KafkaListener(key, config) for key, config in
245 consumer.app.conf['kafka_consumer_config'].items()
246 }
247 self.threads = [
248 Thread(target=s.listen, name=f'{key}_KafkaConsumerThread') for key,
249 s in self._listeners.items() if s._open_hop_stream is not None
250 ]
251 for thread in self.threads:
252 thread.start()
254 def stop(self, consumer):
255 log.info('Closing connection to topics: ' +
256 ' '.join(listener._config['url'] for listener in
257 self._listeners.values() if listener._open_hop_stream
258 is not None))
259 for s in self._listeners.values():
260 s.running = False
261 if s._open_hop_stream is not None:
262 s._open_hop_stream.close()
264 for thread in self.threads:
265 thread.join()
267 def info(self, consumer):
268 return {
269 'active_kafka_consumers': {listener.name for listener in
270 self._listeners.values() if
271 listener._open_hop_stream is not
272 None}
273 }
276class Producer(KafkaBootStep):
277 """Run the global Kafka producers in a background thread.
279 Flags that document the health of the connections are made available
280 :ref:`inspection <celery:worker-inspect>` with the ``gwcelery inspect
281 stats`` command under the ``kafka_topic_up`` and
282 ``kafka_delivery_failures`` keys.
283 """
285 name = 'Kafka producer'
287 def include_if(self, consumer):
288 return 'kafka' in consumer.app.amqp.queues
290 def start(self, consumer):
291 log.info(f'Starting {self.name}, topics: ' +
292 ' '.join(config['url'] for config in
293 consumer.app.conf['kafka_alert_config'].values()))
294 consumer.app.conf['kafka_streams'] = self._writers = {
295 brokerhost: KafkaWriter(brokerhost, config) for brokerhost, config
296 in consumer.app.conf['kafka_alert_config'].items()
297 }
299 def stop(self, consumer):
300 log.info('Closing connection to topics: ' +
301 ' '.join(config['url'] for config in
302 consumer.app.conf['kafka_alert_config'].values()))
303 for s in self._writers.values():
304 s._open_hop_stream.close()
306 def info(self, consumer):
307 return {'kafka_topic_up': {
308 brokerhost: writer.kafka_topic_up() for brokerhost,
309 writer in self._writers.items()
310 },
311 'kafka_delivery_failures': {
312 brokerhost: writer.kafka_delivery_failures for
313 brokerhost, writer in self._writers.items()
314 }}