Coverage for gwcelery/tasks/superevents.py: 87%
249 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Module containing the functionality for creation and management of
2superevents.
4* There is serial processing of triggers from low latency pipelines.
5* Dedicated **superevent** queue for this purpose.
6* Primary logic to respond to low latency triggers contained in
7 :meth:`process` function.
8"""
9from itertools import filterfalse, groupby
11from celery.utils.log import get_task_logger
12from ligo.segments import segment, segmentlist
14from .. import app
15from . import gracedb, igwn_alert
17log = get_task_logger(__name__)
19REQUIRED_LABELS_BY_GROUP_SEARCH = {
20 'cbc': {
21 'allsky': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
22 'earlywarning': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
23 'mdc': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
24 'bbh': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'}, # required by cwb-bbh # noqa: E501
25 'ssm': {'EMBRIGHT_READY', 'SKYMAP_READY'}
26 },
27 'burst': {
28 'allsky': {'SKYMAP_READY'}
29 }
30}
31"""These labels should be present on an event to consider it to
32be complete.
33"""
35FROZEN_LABEL = 'LOW_SIGNIF_LOCKED'
36"""This label indicates that the superevent manager should make no further
37changes to the preferred event."""
39SIGNIFICANT_LABEL = 'SIGNIF_LOCKED'
40"""This label indicates that the superevent is elevated to significant"""
42EARLY_WARNING_LABEL = 'EARLY_WARNING'
43"""This label indicates that the superevent contains a significant
44early warning event."""
46EARLY_WARNING_SEARCH_NAME = 'EarlyWarning'
47"""Search name for Early Warning searches. Only significant events
48result in consideration by the superevent manager."""
50READY_LABEL = 'EM_READY'
51"""This label indicates that a preferred event has been assigned and it
52has all data products required to make it ready for annotations."""
54VT_SEARCH_NAME = 'VTInjection'
55"""Search name for events uploaded as a part of estimating the spacetime
56sensitive volume. Events from this search do not form superevents, and
57are not annotated."""
60@igwn_alert.handler('cbc_gstlal',
61 'cbc_spiir',
62 'cbc_pycbc',
63 'cbc_mbta',
64 'burst_olib',
65 'burst_cwb',
66 'burst_mly',
67 shared=False)
68def handle(payload):
69 """Respond to IGWN alert topics from low-latency search pipelines and
70 delegate to :meth:`process` for superevent management.
71 """
72 alert_type = payload['alert_type']
73 gid = payload['object']['graceid']
74 alert_search = payload['object']['search']
76 ifos = get_instruments(payload['object'])
77 # Ignore inclusion of events involving KAGRA; revert when policy is changed
78 if "K1" in ifos:
79 log.info('Skipping %s because it involves KAGRA data', gid)
80 return
81 # Ignore inclusion of events from VT search
82 if alert_search == VT_SEARCH_NAME:
83 log.info("Skipping {} event {}".format(VT_SEARCH_NAME, gid))
84 return
86 try:
87 far = payload['object']['far']
88 except KeyError:
89 log.info('Skipping %s because it lacks a FAR', gid)
90 return
92 if far > app.conf['superevent_far_threshold']:
93 log.info("Skipping processing of %s because of high FAR", gid)
94 return
96 priority = 1
97 if alert_type == 'label_added':
98 priority = 0
99 label = payload['data']['name']
100 group = payload['object']['group'].lower()
101 search = payload['object']['search'].lower()
102 pipeline = payload['object']['pipeline'].lower()
103 # Special case of cWB-BBH that require the same label of CBC
104 if search == 'bbh' and pipeline == 'cwb':
105 group = 'cbc'
106 if label == 'RAVEN_ALERT':
107 log.info('Label %s added to %s', label, gid)
108 elif label not in REQUIRED_LABELS_BY_GROUP_SEARCH[group][search]:
109 return
110 elif not is_complete(payload['object']):
111 log.info("Ignoring since %s has %s labels. "
112 "It is not complete", gid, payload['object']['labels'])
113 return
114 elif alert_type != 'new':
115 return
116 process.si(payload).apply_async(priority=priority)
119@gracedb.task(queue='superevent', shared=False, time_limit=600)
120@gracedb.catch_retryable_http_errors
121def process(payload):
122 """
123 Respond to `payload` and serially processes them to create new superevents,
124 add events to existing ones.
126 Parameters
127 ----------
128 payload : dict
129 IGWN alert payload
131 """
132 event_info = payload['object']
133 gid = event_info['graceid']
134 category = get_category(event_info)
135 t_0, t_start, t_end = get_ts(event_info)
137 if event_info.get('superevent'):
138 sid = event_info['superevent']
139 log.info('Event %s already belongs to superevent %s', gid, sid)
140 # superevent_neighbours has current and nearby superevents
141 s = event_info['superevent_neighbours'][sid]
142 superevent = _SuperEvent(s['t_start'],
143 s['t_end'],
144 s['t_0'],
145 s['superevent_id'])
146 _update_superevent(superevent.superevent_id,
147 event_info,
148 t_0=t_0,
149 t_start=None,
150 t_end=None)
151 else:
152 log.info('Event %s does not yet belong to a superevent', gid)
153 superevents = gracedb.get_superevents('category: {} {} .. {}'.format(
154 category,
155 event_info['gpstime'] - app.conf['superevent_query_d_t_start'],
156 event_info['gpstime'] + app.conf['superevent_query_d_t_end']))
157 for s in superevents:
158 if gid in s['gw_events']:
159 sid = s['superevent_id']
160 log.info('Event %s found assigned to superevent %s', gid, sid)
161 if payload['alert_type'] == 'label_added':
162 log.info('Label %s added to %s',
163 payload['data']['name'], gid)
164 elif payload['alert_type'] == 'new':
165 log.info('new alert type for %s with '
166 'existing superevent %s. '
167 'No action required', gid, sid)
168 return
169 superevent = _SuperEvent(s['t_start'],
170 s['t_end'],
171 s['t_0'],
172 s['superevent_id'])
173 _update_superevent(superevent.superevent_id,
174 event_info,
175 t_0=t_0,
176 t_start=None,
177 t_end=None)
178 break
179 else: # s not in superevents
180 event_segment = _Event(t_start, t_end, t_0, event_info['graceid'])
182 superevent = _partially_intersects(superevents,
183 event_segment)
185 if superevent:
186 sid = superevent.superevent_id
187 log.info('Event %s in window of %s. '
188 'Adding event to superevent', gid, sid)
189 gracedb.add_event_to_superevent(sid, event_segment.gid)
190 # extend the time window of the superevent
191 new_superevent = superevent | event_segment
192 if new_superevent != superevent:
193 log.info('%s not completely contained in %s, '
194 'extending superevent window',
195 event_segment.gid, sid)
196 new_t_start, new_t_end = new_superevent
198 else: # new_superevent == superevent
199 log.info('%s is completely contained in %s',
200 event_segment.gid, sid)
201 new_t_start = new_t_end = None
202 _update_superevent(superevent.superevent_id,
203 event_info,
204 t_0=t_0,
205 t_start=new_t_start,
206 t_end=new_t_end)
207 else: # not superevent
208 log.info('New event %s with no superevent in GraceDB, '
209 'creating new superevent', gid)
210 sid = gracedb.create_superevent(event_info['graceid'],
211 t_0, t_start, t_end)
213 if should_publish(event_info, significant=True):
214 gracedb.create_label.delay('ADVREQ', sid)
215 if is_complete(event_info):
216 if EARLY_WARNING_LABEL in event_info['labels']:
217 gracedb.create_label(EARLY_WARNING_LABEL, sid)
218 else:
219 gracedb.create_label(SIGNIFICANT_LABEL, sid)
221 if should_publish(event_info, significant=False):
222 if is_complete(event_info):
223 gracedb.create_label(FROZEN_LABEL, sid)
226def get_category(event):
227 """Get the superevent category for an event.
229 Parameters
230 ----------
231 event : dict
232 Event dictionary (e.g., the return value from
233 :meth:`gwcelery.tasks.gracedb.get_event`).
235 Returns
236 -------
237 {'mdc', 'test', 'production'}
239 """
240 if event.get('search') == 'MDC':
241 return 'mdc'
242 elif event['group'] == 'Test':
243 return 'test'
244 else:
245 return 'production'
248def get_ts(event):
249 """Get time extent of an event, depending on pipeline-specific parameters.
251 * For CWB and MLy, use the event's ``duration`` field.
252 * For oLIB, use the ratio of the event's ``quality_mean`` and
253 ``frequency_mean`` fields.
254 * For all other pipelines, use the
255 :obj:`~gwcelery.conf.superevent_d_t_start` and
256 :obj:`~gwcelery.conf.superevent_d_t_end` configuration values.
258 Parameters
259 ----------
260 event : dict
261 Event dictionary (e.g., the return value from
262 :meth:`gwcelery.tasks.gracedb.get_event` or
263 ``preferred_event_data`` in igwn-alert packet.)
265 Returns
266 -------
267 t_0: float
268 Segment center time in GPS seconds.
269 t_start : float
270 Segment start time in GPS seconds.
272 t_end : float
273 Segment end time in GPS seconds.
275 """
276 pipeline = event['pipeline'].lower()
277 if pipeline == 'cwb':
278 attribs = event['extra_attributes']['MultiBurst']
279 d_t_start = d_t_end = attribs['duration']
280 elif pipeline == 'olib':
281 attribs = event['extra_attributes']['LalInferenceBurst']
282 d_t_start = d_t_end = (attribs['quality_mean'] /
283 attribs['frequency_mean'])
284 elif pipeline == 'mly':
285 attribs = event['extra_attributes']['MLyBurst']
286 d_t_start = d_t_end = attribs['duration']
287 else:
288 d_t_start = app.conf['superevent_d_t_start'].get(
289 pipeline, app.conf['superevent_default_d_t_start'])
290 d_t_end = app.conf['superevent_d_t_end'].get(
291 pipeline, app.conf['superevent_default_d_t_end'])
292 return (event['gpstime'], event['gpstime'] - d_t_start,
293 event['gpstime'] + d_t_end)
296def get_snr(event):
297 """Get the SNR from the LVAlert packet.
299 Different groups and pipelines store the SNR in different fields.
301 Parameters
302 ----------
303 event : dict
304 Event dictionary (e.g., the return value from
305 :meth:`gwcelery.tasks.gracedb.get_event`, or
306 ``preferred_event_data`` in igwn-alert packet.)
308 Returns
309 -------
310 snr : float
311 The SNR.
313 """
314 group = event['group'].lower()
315 pipeline = event['pipeline'].lower()
316 if group == 'cbc':
317 attribs = event['extra_attributes']['CoincInspiral']
318 return attribs['snr']
319 elif pipeline == 'cwb':
320 attribs = event['extra_attributes']['MultiBurst']
321 return attribs['snr']
322 elif pipeline == 'olib':
323 attribs = event['extra_attributes']['LalInferenceBurst']
324 return attribs['omicron_snr_network']
325 elif pipeline == 'mly':
326 attribs = event['extra_attributes']['MLyBurst']
327 return attribs['SNR']
328 else:
329 raise NotImplementedError('SNR attribute not found')
332def get_instruments(event):
333 """Get the instruments that contributed data to an event.
335 Parameters
336 ----------
337 event : dict
338 Event dictionary (e.g., the return value from
339 :meth:`gwcelery.tasks.gracedb.get_event`, or
340 ``preferred_event_data`` in igwn-alert packet.)
342 Returns
343 -------
344 set
345 The set of instruments that contributed to the event.
346 The instruments that contributed to the event are
347 generally stored in the instrument field of the G-event
348 as a comma separated list. For pipeline that
349 provide the 'SingleInspiral' use the list
350 of the detector there.
351 """
352 if (('extra_attributes' in event) and
353 ('SingleInspiral' in event['extra_attributes'])):
354 attribs = event['extra_attributes']['SingleInspiral']
355 ifos = {single['ifo'] for single in attribs}
356 else:
357 ifos = set(event['instruments'].split(','))
358 return ifos
361def get_instruments_in_ranking_statistic(event):
362 """Get the instruments that contribute to the false alarm rate.
364 Parameters
365 ----------
366 event : dict
367 Event dictionary (e.g., the return value from
368 :meth:`gwcelery.tasks.gracedb.get_event`, or
369 ``preferred_event_data`` in igwn-alert packet.)
371 Returns
372 -------
373 set
374 The set of instruments that contributed to the ranking statistic for
375 the event.
377 Notes
378 -----
379 The number of instruments that contributed *data* to an event is given by
380 the ``instruments`` key of the GraceDB event JSON structure. However, some
381 pipelines (e.g. gstlal) have a distinction between which instruments
382 contributed *data* and which were considered in the *ranking* of the
383 candidate. All the pipeline should conform to this rule.
385 Unmodeled searches do not provide a SingleInspiral table, In such case
386 use the event information.
387 """
388 if (('extra_attributes' in event) and
389 ('SingleInspiral' in event['extra_attributes'])):
390 attribs = event['extra_attributes']['SingleInspiral']
391 ifos = {single['ifo'] for single in attribs
392 if single.get('chisq') is not None}
393 else:
394 ifos = set(event['instruments'].split(','))
395 return ifos
398@app.task(shared=False)
399def select_pipeline_preferred_event(events):
400 """Group list of G events by pipeline, and apply :meth:`keyfunc`
401 to select the pipeline preferred events.
403 Parameters
404 ----------
405 events : list
406 list of event dictionaries
408 Returns
409 -------
410 dict
411 pipeline, graceid pairs
412 """
413 g_events = list(
414 filterfalse(lambda x: x['group'] == "External", events))
415 g_events_by_pipeline = groupby(
416 sorted(g_events, key=lambda x: x['pipeline']),
417 key=lambda x: x['pipeline']
418 )
420 return dict(
421 (k, select_preferred_event(g)) for k, g in g_events_by_pipeline)
424@app.task(shared=False)
425def select_preferred_event(events):
426 """Select the preferred event out of a list of G events, typically
427 contents of a superevent, based on :meth:`keyfunc`.
429 Parameters
430 ----------
431 events : list
432 list of event dictionaries
434 """
435 g_events = list(
436 filterfalse(lambda x: x['group'] == "External", events))
437 return max(g_events, key=keyfunc)
440def is_complete(event):
441 """
442 Determine if a G event is complete in the sense of the event
443 has its data products complete i.e. labels mentioned under
444 :data:`REQUIRED_LABELS_BY_GROUP_SEARCH`. Test events are not
445 processed by low-latency infrastructure and are always labeled
446 complete.
448 Parameters
449 ----------
450 event : dict
451 Event dictionary (e.g., the return value from
452 :meth:`gwcelery.tasks.gracedb.get_event`, or
453 ``preferred_event_data`` in igwn-alert packet.)
455 """
456 group = event['group'].lower()
457 pipeline = event['pipeline'].lower()
458 search = event['search'].lower()
459 label_set = set(event['labels'])
460 # Define the special case burst-cwb-bbh that is a CBC
461 if pipeline == 'cwb' and search == 'bbh':
462 group = 'cbc'
464 required_labels = REQUIRED_LABELS_BY_GROUP_SEARCH[group][search]
465 return required_labels.issubset(label_set)
468def should_publish(event, significant=True):
469 """Determine whether an event should be published as a public alert.
471 All of the following conditions must be true for a public alert:
473 * The event's ``offline`` flag is not set.
474 * The event's false alarm rate, weighted by the group-specific trials
475 factor as specified by the
476 :obj:`~gwcelery.conf.preliminary_alert_trials_factor`
477 (:obj:`~gwcelery.conf.significant_alert_trials_factor`)
478 configuration setting, is less than or equal to
479 :obj:`~gwcelery.conf.preliminary_alert_far_threshold`
480 (:obj:`~gwcelery.conf.significant_alert_far_threshold`)
482 Parameters
483 ----------
484 event : dict
485 Event dictionary (e.g., the return value from
486 :meth:`gwcelery.tasks.gracedb.get_event`, or
487 ``preferred_event_data`` in igwn-alert packet.)
488 significant : bool
489 Flag to use significant
490 (:obj:`~gwcelery.conf.significant_alert_far_threshold`),
491 or less-significant
492 (:obj:`~gwcelery.conf.preliminary_alert_far_threshold`)
493 FAR threshold.
495 Returns
496 -------
497 should_publish : bool
498 :obj:`True` if the event meets the criteria for a public alert or
499 :obj:`False` if it does not.
501 """
502 return all(_should_publish(event, significant=significant))
505def _should_publish(event, significant=False):
506 """Wrapper around :meth:`should_publish`. Returns the boolean returns of
507 the publishability criteria as a tuple for later use.
508 """
509 group = event['group'].lower()
510 search = event.get('search', '').lower()
511 if search in app.conf['significant_alert_far_threshold'][group]:
512 low_signif_far_threshold = \
513 app.conf['preliminary_alert_far_threshold'][group][search]
514 low_signif_trials_factor = \
515 app.conf['preliminary_alert_trials_factor'][group][search]
516 signif_far_threshold = \
517 app.conf['significant_alert_far_threshold'][group][search]
518 signif_trials_factor = \
519 app.conf['significant_alert_trials_factor'][group][search]
521 low_signif_far = low_signif_trials_factor * event['far']
522 signif_far = signif_trials_factor * event['far']
523 else:
524 # Fallback in case an event is uploaded to an unlisted search
525 low_signif_far = -1 * float('inf')
526 signif_far = -1 * float('inf')
528 raven_coincidence = ('RAVEN_ALERT' in event['labels'])
530 # Ensure that anything that returns True for significant=True also returns
531 # True for significant=False. For example, a significant EarlyWarning event
532 # should return True for both significant=True and significant=False.
533 if significant or signif_far < signif_far_threshold:
534 far = signif_far
535 far_threshold = signif_far_threshold
536 else:
537 far = low_signif_far
538 far_threshold = low_signif_far_threshold
540 return (not event['offline'] and 'INJ' not in event['labels'],
541 far <= far_threshold or raven_coincidence)
544def keyfunc(event):
545 """Key function for selection of the preferred event.
547 Return a value suitable for identifying the preferred event. Given events
548 ``a`` and ``b``, ``a`` is preferred over ``b`` if
549 ``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred.
551 Parameters
552 ----------
553 event : dict
554 Event dictionary (e.g., the return value from
555 :meth:`gwcelery.tasks.gracedb.get_event`).
557 Returns
558 -------
559 key : tuple
560 The comparison key.
562 Notes
563 -----
564 Tuples are compared lexicographically in Python: they are compared
565 element-wise until an unequal pair of elements is found.
567 """
568 group = event['group'].lower()
569 search = event['search'].lower()
570 try:
571 group_rank = app.conf['superevent_candidate_preference'][group].get(
572 search, -1
573 )
574 except KeyError:
575 group_rank = -1
576 if group == 'cbc':
577 n_ifos = len(get_instruments(event))
578 snr_or_far_ranking = get_snr(event)
579 else:
580 # We don't care about the number of detectors for burst events.
581 n_ifos = -1
582 # Smaller FAR -> higher IFAR -> more significant.
583 # Use -FAR instead of IFAR=1/FAR so that rank for FAR=0 is defined.
584 snr_or_far_ranking = -event['far']
586 # Conditions that determine choice of the preferred event
587 # event completeness comes first
588 # then, publishability criteria for significant events
589 # then, publishability criteria for less-significant events
590 # then, CBC group is given higher priority over Burst
591 # then, prioritize more number of detectors
592 # finally, use SNR (FAR) between two CBC (Burst) events
593 # See https://rtd.igwn.org/projects/gwcelery/en/latest/gwcelery.tasks.superevents.html#selection-of-the-preferred-event # noqa: E501
594 return (
595 is_complete(event),
596 *_should_publish(event, significant=True),
597 *_should_publish(event, significant=False),
598 group_rank,
599 n_ifos,
600 snr_or_far_ranking
601 )
604def _update_superevent(superevent_id, new_event_dict,
605 t_0, t_start, t_end):
606 """Update preferred event and/or change time window. Events with multiple
607 detectors take precedence over single-detector events, then CBC events take
608 precedence over burst events, and any remaining tie is broken by SNR/FAR
609 values for CBC/Burst. Single detector are not promoted to preferred event
610 status, if existing preferred event is multi-detector
612 Parameters
613 ----------
614 superevent_id : str
615 the superevent_id
616 new_event_dict : dict
617 event info of the new trigger as a dictionary
618 t_0 : float
619 center time of `superevent_id`, None for no change
620 t_start : float
621 start time of `superevent_id`, None for no change
622 t_end : float
623 end time of `superevent_id`, None for no change
625 """
626 # labels and preferred event in the IGWN alert are not the latest
627 superevent_dict = gracedb.get_superevent(superevent_id)
629 superevent_labels = superevent_dict['labels']
630 preferred_event_dict = superevent_dict['preferred_event_data']
631 kwargs = {}
632 if t_start is not None:
633 kwargs['t_start'] = t_start
634 if t_end is not None:
635 kwargs['t_end'] = t_end
636 if FROZEN_LABEL not in superevent_labels:
637 if keyfunc(new_event_dict) > keyfunc(preferred_event_dict):
638 # update preferred event when LOW_SIGNIF_LOCKED is not applied
639 kwargs['t_0'] = t_0
640 kwargs['preferred_event'] = new_event_dict['graceid']
642 if kwargs:
643 gracedb.update_superevent(superevent_id, **kwargs)
645 # completeness takes first precedence in deciding preferred event
646 # necessary and suffiecient condition to superevent as ready
647 if is_complete(new_event_dict):
648 gracedb.create_label.delay(READY_LABEL, superevent_id)
651def _superevent_segment_list(superevents):
652 """Ingests a list of superevent dictionaries, and returns a segmentlist
653 with start and end times as the duration of each segment.
655 Parameters
656 ----------
657 superevents : list
658 List of superevent dictionaries (e.g., the values
659 of field ``superevent_neighbours`` in igwn-alert packet).
661 Returns
662 -------
663 superevent_list : segmentlist
664 superevents as a segmentlist object
666 """
667 return segmentlist(
668 [_SuperEvent(s['t_start'], s['t_end'], s['t_0'], s['superevent_id'])
669 for s in superevents])
672def _partially_intersects(superevents, event_segment):
673 """Similar to :meth:`segmentlist.find` except it also returns the segment
674 of `superevents` which partially intersects argument. If there are more
675 than one intersections, first occurence is returned.
677 Parameters
678 ----------
679 superevents : list
680 list of superevents. Typical value of
681 ``superevent_neighbours.values()``.
682 event_segment : segment
683 segment object whose index is wanted
685 Returns
686 -------
687 match_segment : segment
688 segment in `self` which intersects. `None` if not found
690 """
691 # create a segmentlist using start and end times
692 superevents = _superevent_segment_list(superevents)
693 for s in superevents:
694 if s.intersects(event_segment):
695 return s
696 return None
699class _Event(segment):
700 """An event implemented as an extension of :class:`segment`."""
702 def __new__(cls, t_start, t_end, *args, **kwargs):
703 return super().__new__(cls, t_start, t_end)
705 def __init__(self, t_start, t_end, t_0, gid):
706 self.t_0 = t_0
707 self.gid = gid
710class _SuperEvent(segment):
711 """An superevent implemented as an extension of :class:`segment`."""
713 def __new__(cls, t_start, t_end, *args, **kwargs):
714 return super().__new__(cls, t_start, t_end)
716 def __init__(self, t_start, t_end, t_0, sid):
717 self.t_start = t_start
718 self.t_end = t_end
719 self.t_0 = t_0
720 self.superevent_id = sid