Coverage for gwcelery/tasks/alerts.py: 83%
113 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1import json
3import numpy as np
4from astropy import time
5from celery import group
6from celery.utils.log import get_logger
8from .. import app
9from ..kafka.signals import kafka_record_consumed
10from . import gracedb
11from .core import DispatchHandler
13log = get_logger(__name__)
15# FIXME Remove this once cwb bbh is uploading to cbc instead of burst
16CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP = {
17 'Burst': {'BBH': 'CBC'}
18}
21class _KafkaDispatchHandler(DispatchHandler):
23 def process_args(self, name, record):
24 return name, (record,), {}
26 def __call__(self, *keys, **kwargs):
27 r"""Create a new task and register it as a callback for handling the
28 given keys.
30 Parameters
31 ----------
32 \*keys : list
33 Keys to match
34 \*\*kwargs
35 Additional keyword arguments for `celery.Celery.task`.
37 """
38 def wrap(f):
39 f = gracedb.task(ignore_result=True, **kwargs)(f)
40 for key in keys:
41 self.setdefault(key, []).append(f)
42 return f
44 return wrap
47handler = _KafkaDispatchHandler()
48r"""Function decorator to register a handler callback for specified Kafka URLs.
49The decorated function is turned into a Celery task, which will be
50automatically called whenever a message is received from a matching URL.
52Parameters
53----------
54\*keys
55 List of keys from :obj:`gwcelery.conf.kafka_consumer_config`
56 associated with Kafka topics to listen to messages to.
57\*\*kwargs
58 Additional keyword arguments for :meth:`celery.Celery.task`.
60Examples
61--------
62Declare a new handler like this::
64 # Assumes kafka_consumer_config dictionary has 'fermi_swift' key
65 @alerts.handler('fermi_swift')
66 def handle_swift(record):
67 # record is a dict that contains the contents of the message
68 # do work here...
69"""
72@kafka_record_consumed.connect
73def _on_kafka_record_consumed(name, record, **kwargs):
74 handler.dispatch(name, record)
77def _create_base_alert_dict(classification, superevent, alert_type):
78 '''Create the base of the alert dictionary, with all contents except the
79 skymap and the external coinc information.'''
80 # NOTE Everything that comes through this code path will be marked as
81 # public. However, MDC events with this flag are not made public on
82 # GraceDB-playground and GraceDB-test.
83 # Re time_created: Dont need better than second precision for alert times
85 # FIXME Dont change alert types internally
86 # NOTE less-significant alerts have alert_type as PRELIMINARY
87 alert_type_kafka = 'preliminary' if alert_type == 'less-significant' \
88 else alert_type
89 # NOTE the alert group is usually the same as the g-event group. Exceptions
90 # are recorded in the CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP definition
91 # above
92 superevent_group = superevent['preferred_event_data']['group']
93 superevent_search = superevent['preferred_event_data']['search']
94 if superevent_group in CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP and \
95 superevent_search in \
96 CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP[superevent_group]:
97 alert_group_kafka = \
98 CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP[
99 superevent_group
100 ][superevent_search]
101 else:
102 alert_group_kafka = superevent['preferred_event_data']['group']
104 alert_dict = {
105 'alert_type': alert_type_kafka.upper(),
106 'time_created': time.Time.now().utc.isot.split('.')[0] + 'Z',
107 'superevent_id': superevent['superevent_id'],
108 'urls': {'gracedb': superevent['links']['self'].replace('api/', '') +
109 'view/'},
110 'event': None,
111 'external_coinc': None
112 }
114 if alert_type == 'retraction':
115 return alert_dict
117 if classification and classification[0] is not None:
118 properties = json.loads(classification[0])
119 else:
120 properties = {}
122 if classification and classification[1] is not None:
123 classification = json.loads(classification[1])
124 else:
125 classification = {}
127 duration = None
128 central_frequency = None
130 if alert_group_kafka == 'Burst':
131 if superevent['preferred_event_data']['pipeline'].lower() == 'cwb':
132 duration = \
133 superevent['preferred_event_data']['extra_attributes'].get(
134 'MultiBurst', {}).get('duration', None)
135 central_frequency = \
136 superevent['preferred_event_data']['extra_attributes'].get(
137 'MultiBurst', {}).get('central_freq', None)
138 elif superevent['preferred_event_data']['pipeline'].lower() == 'mly':
139 duration = \
140 superevent['preferred_event_data']['extra_attributes'].get(
141 'MLyBurst', {}).get('duration', None)
142 central_frequency = \
143 superevent['preferred_event_data']['extra_attributes'].get(
144 'MLyBurst', {}).get('central_freq', None)
145 elif superevent['preferred_event_data']['pipeline'].lower() == 'olib':
146 quality_mean = \
147 superevent['preferred_event_data']['extra_attributes'].get(
148 'LalInferenceBurst', {}).get('quality_mean', None)
149 frequency_mean = \
150 superevent['preferred_event_data']['extra_attributes'].get(
151 'LalInferenceBurst', {}).get('frequency_mean', None)
152 central_frequency = \
153 superevent['preferred_event_data']['extra_attributes'].get(
154 'LalInferenceBurst', {}).get('frequency_mean', None)
155 duration = quality_mean / (2 * np.pi * frequency_mean)
156 else:
157 raise NotImplementedError(
158 'Duration and central_frequency not implemented for Burst '
159 'pipeline {}'.format(
160 superevent['preferred_event_data']['pipeline'].lower()
161 )
162 )
164 alert_dict['event'] = {
165 # set 'significant' field based on
166 # https://dcc.ligo.org/LIGO-G2300151/public
167 'significant': False if alert_type == 'less-significant' else True,
168 'time': time.Time(superevent['t_0'], format='gps').utc.isot + 'Z',
169 'far': superevent['far'],
170 'instruments': sorted(
171 superevent['preferred_event_data']['instruments'].split(',')
172 ),
173 'group': alert_group_kafka,
174 'pipeline': superevent['preferred_event_data']['pipeline'],
175 'search': superevent_search,
176 'properties': properties,
177 'classification': classification,
178 'duration': duration,
179 'central_frequency': central_frequency
180 }
182 return alert_dict
185@gracedb.task(shared=False)
186def _add_external_coinc_to_alert(alert_dict, superevent,
187 combined_skymap_filename):
188 external_event = gracedb.get_event(superevent['em_type'])
189 if combined_skymap_filename:
190 combined_skymap = gracedb.download(combined_skymap_filename,
191 superevent['superevent_id'])
192 else:
193 combined_skymap = None
194 alert_dict['external_coinc'] = {
195 'gcn_notice_id':
196 int(external_event['extra_attributes']['GRB']['trigger_id']),
197 'ivorn': external_event['extra_attributes']['GRB']['ivorn'],
198 'observatory': external_event['pipeline'],
199 'search': external_event['search'],
200 'time_difference': round(external_event['gpstime'] -
201 superevent['t_0'], 2),
202 'time_coincidence_far': superevent['time_coinc_far'],
203 'time_sky_position_coincidence_far': superevent['space_coinc_far']
204 }
206 return alert_dict, combined_skymap
209@app.task(bind=True, shared=False, queue='kafka', ignore_result=True)
210def _upload_notice(self, payload, brokerhost, superevent_id):
211 '''
212 Upload serialized alert notice to GraceDB
213 '''
214 config = self.app.conf['kafka_alert_config'][brokerhost]
215 kafka_writer = self.app.conf['kafka_streams'][brokerhost]
217 # FIXME Drop get_payload_content method once
218 # https://github.com/scimma/hop-client/pull/190 is merged
219 alert_dict = kafka_writer.get_payload_content(payload)
220 message = 'Kafka alert notice sent to {}'.format(config['url'])
222 filename = '{}-{}.{}'.format(
223 alert_dict['superevent_id'],
224 alert_dict['alert_type'].lower(),
225 config['suffix']
226 )
228 gracedb.upload.delay(payload.serialize()['content'], filename,
229 superevent_id, message, tags=['public', 'em_follow'])
232@app.task(bind=True, queue='kafka', shared=False)
233def _send(self, alert_dict, skymap, brokerhost, combined_skymap=None):
234 """Write the alert to the Kafka topic"""
235 # Copy the alert dictionary so we dont modify the original
236 payload_dict = alert_dict.copy()
237 # Add skymap to alert_dict
238 config = self.app.conf['kafka_alert_config'][brokerhost]
239 if alert_dict['event'] is not None:
240 # dict.copy is a shallow copy, so need to copy event dict as well since
241 # we plan to modify it
242 payload_dict['event'] = alert_dict['event'].copy()
244 # Encode the skymap
245 encoder = config['skymap_encoder']
246 payload_dict['event']['skymap'] = encoder(skymap)
248 if combined_skymap:
249 payload_dict['external_coinc']['combined_skymap'] = \
250 encoder(combined_skymap)
252 # Write to kafka topic
253 serialization_model = \
254 self.app.conf['kafka_streams'][brokerhost].serialization_model
255 payload = serialization_model(payload_dict)
256 self.app.conf['kafka_streams'][brokerhost].write(payload)
258 return payload
261@app.task(bind=True, queue='kafka', shared=False)
262def _send_with_combined(self, alert_dict_combined_skymap, skymap, brokerhost):
263 alert_dict, combined_skymap = alert_dict_combined_skymap
264 return _send(alert_dict, skymap, brokerhost,
265 combined_skymap=combined_skymap)
268@app.task(bind=True, ignore_result=True, queue='kafka', shared=False)
269def send(self, skymap_and_classification, superevent, alert_type,
270 raven_coinc=False, combined_skymap_filename=None):
271 """Send an public alert to all currently connected kafka brokers.
273 Parameters
274 ----------
275 skymap_and_classification : tuple, None
276 The filecontents of the skymap followed by a collection of JSON
277 strings. The former generated by
278 :meth:`gwcelery.tasks.gracedb.download`, the latter generated by
279 :meth:`gwcelery.tasks.em_bright.classifier` and
280 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or content of
281 ``{gstlal,mbta}.p_astro.json`` uploaded by {gstlal,mbta} respectively.
282 Can also be None.
283 superevent : dict
284 The superevent dictionary, typically obtained from an IGWN Alert or
285 from querying GraceDB.
286 alert_type : str
287 The alert type. Either of {`less-significant`, `earlywarning`,
288 `preliminary`, `initial`, `update`}.
289 raven_coinc: bool
290 Is there a coincident external event processed by RAVEN?
291 combined_skymap_filename : str
292 Combined skymap filename. Default None.
294 Notes
295 -----
296 The `alert_type` value is used to set the `significant` field in the
297 alert dictionary.
298 """
300 if skymap_and_classification is not None:
301 skymap, *classification = skymap_and_classification
302 else:
303 skymap = None
304 classification = None
306 alert_dict = _create_base_alert_dict(
307 classification,
308 superevent,
309 alert_type
310 )
312 if raven_coinc and alert_type != 'retraction':
313 canvas = (
314 _add_external_coinc_to_alert.si(
315 alert_dict,
316 superevent,
317 combined_skymap_filename
318 )
319 |
320 group(
321 (
322 _send_with_combined.s(skymap, brokerhost)
323 |
324 _upload_notice.s(brokerhost, superevent['superevent_id'])
325 ) for brokerhost in self.app.conf['kafka_streams'].keys()
326 )
327 )
328 else:
329 canvas = (
330 group(
331 (
332 _send.s(alert_dict, skymap, brokerhost)
333 |
334 _upload_notice.s(brokerhost, superevent['superevent_id'])
335 ) for brokerhost in self.app.conf['kafka_streams'].keys()
336 )
337 )
339 canvas.apply_async()
342@app.task(shared=False)
343def _create_skymap_classification_tuple(skymap, classification):
344 return (skymap, *classification)
347@app.task(shared=False, ignore_result=True)
348def download_skymap_and_send_alert(classification, superevent, alert_type,
349 skymap_filename=None, raven_coinc=False,
350 combined_skymap_filename=None):
351 """Wrapper for send function when caller has not already downloaded the
352 skymap.
354 Parameters
355 ----------
356 classification : tuple, None
357 A collection of JSON strings, generated by
358 :meth:`gwcelery.tasks.em_bright.classifier` and
359 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or
360 content of ``{gstlal,mbta}.p_astro.json`` uploaded by {gstlal,mbta}
361 respectively; or None
362 superevent : dict
363 The superevent dictionary, typically obtained from an IGWN Alert or
364 from querying GraceDB.
365 alert_type : {'earlywarning', 'preliminary', 'initial', 'update'}
366 The alert type.
367 skymap_filename : string
368 The skymap filename.
369 raven_coinc: bool
370 Is there a coincident external event processed by RAVEN?
371 combined_skymap_filename : str
372 The combined skymap filename. Default None
373 """
375 if skymap_filename is not None and alert_type != 'retraction':
376 canvas = (
377 gracedb.download.si(
378 skymap_filename,
379 superevent['superevent_id']
380 )
381 |
382 _create_skymap_classification_tuple.s(classification)
383 |
384 send.s(superevent, alert_type, raven_coinc=raven_coinc,
385 combined_skymap_filename=combined_skymap_filename)
386 )
387 else:
388 canvas = send.s(
389 (None, classification),
390 superevent,
391 alert_type,
392 raven_coinc=raven_coinc,
393 combined_skymap_filename=combined_skymap_filename
394 )
396 canvas.apply_async()