Coverage for gwcelery/tasks/raven.py: 98%
191 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Search for GRB-GW coincidences with ligo-raven."""
2import ligo.raven.search
3from celery import group
4from celery.utils.log import get_task_logger
6from .. import app
7from . import external_skymaps, gracedb
8from .core import identity
10log = get_task_logger(__name__)
13@gracedb.task(shared=False)
14def calculate_coincidence_far(superevent, exttrig, tl, th,
15 use_superevent_skymap=None):
16 """Compute coincidence FAR for external trigger and superevent coincidence
17 by calling ligo.raven.search.calc_signif_gracedb, using sky map info if
18 available.
20 Parameters
21 ----------
22 superevent : dict
23 Superevent dictionary
24 exttrig : dict
25 External event dictionary
26 tl : int
27 Lower bound of search window in seconds
28 th : int
29 Upper bound of search window in seconds
30 use_superevent_skymap : bool
31 If True/False, use/don't use skymap info from superevent.
32 Else if None, check SKYMAP_READY label in external event.
34 Returns
35 -------
36 joint_far : dict
37 Dictionary containing joint false alarm rate, including sky map info
38 if available
40 """
41 superevent_id = superevent['superevent_id']
42 exttrig_id = exttrig['graceid']
43 far_grb = exttrig['far']
45 # Don't compute coinc FAR for SNEWS coincidences
46 if exttrig['pipeline'] == 'SNEWS':
47 return {}
49 # Define max far thresholds for targeted subthreshold search
50 if exttrig['search'] == 'SubGRBTargeted':
51 far_thresholds = app.conf['raven_targeted_far_thresholds']
52 far_gw_thresh = far_thresholds['GW'][exttrig['pipeline']]
53 far_grb_thresh = far_thresholds['GRB'][exttrig['pipeline']]
54 else:
55 far_gw_thresh = None
56 far_grb_thresh = None
58 # Get rate for expected number of astrophysical external triggers if needed
59 if exttrig['search'] in {'GRB', 'SubGRB', 'MDC'}:
60 ext_rate = app.conf['raven_ext_rates'][exttrig['search']]
61 else:
62 ext_rate = None
64 if ({'EXT_SKYMAP_READY', 'SKYMAP_READY'}.issubset(exttrig['labels']) or
65 {'EXT_SKYMAP_READY', 'EM_READY'}.issubset(exttrig['labels'])):
66 # if both sky maps available, calculate spatial coinc far
67 use_preferred_event_skymap = (
68 not use_superevent_skymap
69 if use_superevent_skymap is not None else
70 'SKYMAP_READY' not in exttrig['labels'])
71 se_skymap = external_skymaps.get_skymap_filename(
72 (superevent['preferred_event'] if use_preferred_event_skymap
73 else superevent_id), is_gw=True)
74 ext_skymap = external_skymaps.get_skymap_filename(
75 exttrig_id, is_gw=False)
76 ext_moc = '.multiorder.fits' in ext_skymap
78 return ligo.raven.search.calc_signif_gracedb(
79 superevent_id, exttrig_id, tl, th,
80 se_dict=superevent, ext_dict=exttrig,
81 grb_search=exttrig['search'],
82 se_fitsfile=se_skymap, ext_fitsfile=ext_skymap,
83 se_moc=True, ext_moc=ext_moc,
84 incl_sky=True, gracedb=gracedb.client,
85 em_rate=ext_rate,
86 far_grb=far_grb,
87 far_gw_thresh=far_gw_thresh,
88 far_grb_thresh=far_grb_thresh,
89 use_preferred_event_skymap=use_preferred_event_skymap)
90 else:
91 return ligo.raven.search.calc_signif_gracedb(
92 superevent_id, exttrig_id, tl, th,
93 se_dict=superevent, ext_dict=exttrig,
94 grb_search=exttrig['search'],
95 incl_sky=False, gracedb=gracedb.client,
96 em_rate=ext_rate,
97 far_grb=far_grb,
98 far_gw_thresh=far_gw_thresh,
99 far_grb_thresh=far_grb_thresh)
102@app.task(shared=False)
103def coincidence_search(gracedb_id, alert_object, group=None, pipelines=[],
104 searches=[], se_searches=[]):
105 """Perform ligo-raven search for coincidences. Determines time window to
106 use. If events found, launches RAVEN pipeline.
108 Parameters
109 ----------
110 gracedb_id : str
111 GraceDB ID of the trigger that launched RAVEN
112 alert_object : dict
113 Alert dictionary
114 group : str
115 Burst or CBC
116 pipelines : list
117 List of external trigger pipeline names
118 searches : list
119 List of external trigger searches
120 se_searches : list
121 List of superevent searches
123 """
124 tl, th = _time_window(gracedb_id, group, pipelines, searches, se_searches)
126 (
127 search.si(gracedb_id, alert_object, tl, th, group, pipelines,
128 searches, se_searches)
129 |
130 raven_pipeline.s(gracedb_id, alert_object, tl, th, group)
131 ).delay()
134def _time_window(gracedb_id, group, pipelines, searches, se_searches):
135 """Determine the time window to use given the parameters of the search.
137 Parameters
138 ----------
139 gracedb_id : str
140 GraceDB ID of the trigger that launched RAVEN
141 group : str
142 Burst or CBC
143 pipelines : list
144 List of external trigger pipeline names
145 searches : list
146 List of external trigger searches
147 se_searches : list
148 List of superevent searches
150 Returns
151 -------
152 tl, th : tuple
153 Tuple of lower bound and upper bound of search window
155 """
156 tl_cbc, th_cbc = app.conf['raven_coincidence_windows']['GRB_CBC']
157 tl_subfermi, th_subfermi = \
158 app.conf['raven_coincidence_windows']['GRB_CBC_SubFermi']
159 tl_subswift, th_subswift = \
160 app.conf['raven_coincidence_windows']['GRB_CBC_SubSwift']
161 tl_burst, th_burst = app.conf['raven_coincidence_windows']['GRB_Burst']
162 tl_snews, th_snews = app.conf['raven_coincidence_windows']['SNEWS']
164 if 'SNEWS' in pipelines:
165 tl, th = tl_snews, th_snews
166 # Use Targeted search window if CBC or Burst
167 elif not {'SubGRB', 'SubGRBTargeted'}.isdisjoint(searches):
168 if 'Fermi' in pipelines:
169 tl, th = tl_subfermi, th_subfermi
170 elif 'Swift' in pipelines:
171 tl, th = tl_subswift, th_subswift
172 else:
173 raise ValueError('Specify Fermi or Swift as pipeline when ' +
174 'launching subthreshold search')
175 elif group == 'CBC' or 'BBH' in se_searches:
176 tl, th = tl_cbc, th_cbc
177 elif group == 'Burst':
178 tl, th = tl_burst, th_burst
179 else:
180 raise ValueError('Invalid RAVEN search request for {0}'.format(
181 gracedb_id))
182 if 'S' in gracedb_id:
183 # If triggering on a superevent, need to reverse the time window
184 tl, th = -th, -tl
186 return tl, th
189@gracedb.task(shared=False)
190def search(gracedb_id, alert_object, tl=-5, th=5, group=None,
191 pipelines=[], searches=[], se_searches=[]):
192 """Perform ligo-raven search to look for coincidences. This function
193 does a query of GraceDB and uploads a log message of the result(s).
195 Parameters
196 ----------
197 gracedb_id : str
198 GraceDB ID of the trigger that launched RAVEN
199 alert_object : dict
200 Alert dictionary
201 tl : int
202 Lower bound of search window in seconds
203 th : int
204 Upper bound of search window in seconds
205 group : str
206 Burst or CBC
207 pipelines : list
208 List of external trigger pipelines for performing coincidence search
209 against
210 searches : list
211 List of external trigger searches
212 se_searches : list
213 List of superevent searches
215 Returns
216 -------
217 results : list
218 List with the dictionaries of related GraceDB events
220 """
221 return ligo.raven.search.search(gracedb_id, tl, th,
222 event_dict=alert_object,
223 gracedb=gracedb.client,
224 group=group, pipelines=pipelines,
225 searches=searches,
226 se_searches=se_searches)
229@app.task(shared=False)
230def raven_pipeline(raven_search_results, gracedb_id, alert_object, tl, th,
231 gw_group, use_superevent_skymap=None):
232 """Executes the full RAVEN pipeline, including adding
233 the external trigger to the superevent, calculating the
234 coincidence false alarm rate, applying 'EM_COINC' to the
235 appropriate events, and checking whether the candidate(s) pass all
236 publishing conditions.
238 Parameters
239 ----------
240 raven_search_results : list
241 List of dictionaries of each related gracedb trigger
242 gracedb_id : str
243 GraceDB ID of the trigger that launched RAVEN
244 alert_object : dict
245 Alert dictionary, either a superevent or an external event
246 tl : int
247 Lower bound of search window in seconds
248 th : int
249 Upper bound of search window in seconds
250 gw_group : str
251 Burst or CBC
252 use_superevent_skymap : bool
253 If True/False, use/don't use skymap info from superevent.
254 Else if None, checks SKYMAP_READY label in external event.
256 """
257 if not raven_search_results:
258 return
259 if 'S' not in gracedb_id:
260 raven_search_results = preferred_superevent(raven_search_results)
261 for result in raven_search_results:
262 if 'S' in gracedb_id:
263 superevent_id = gracedb_id
264 exttrig_id = result['graceid']
265 superevent = alert_object
266 ext_event = result
267 else:
268 superevent_id = result['superevent_id']
269 exttrig_id = gracedb_id
270 superevent = result
271 ext_event = alert_object
272 # Don't continue if it is a different superevent than previous one.
273 if ext_event['superevent'] is not None \
274 and ext_event['superevent'] != superevent['superevent_id']:
275 return
277 # Always check publishing conditions and apply EM_COINC to external
278 # event so we can re-run the analysis if NOT_GRB is removed
279 group_canvas = \
280 trigger_raven_alert.s(superevent, gracedb_id, ext_event, gw_group),
282 # If the external event is not likely astrophysical and part of the
283 # targeted search, don't alert observers or redo calculations.
284 # This is to prevent large influxes of EM_COINC alerts that will never
285 # really be considered now or in the future
286 if 'NOT_GRB' not in ext_event['labels'] or \
287 ext_event['search'] != 'SubGRBTargeted':
288 group_canvas += gracedb.create_label.si('EM_COINC', superevent_id),
289 group_canvas += gracedb.create_label.si('EM_COINC', exttrig_id),
291 canvas = (
292 gracedb.add_event_to_superevent.si(superevent_id, exttrig_id)
293 |
294 calculate_coincidence_far.si(
295 superevent, ext_event, tl, th,
296 use_superevent_skymap=use_superevent_skymap
297 )
298 |
299 group(group_canvas)
300 )
301 canvas.delay()
304@app.task(shared=False)
305def preferred_superevent(raven_search_results):
306 """Chooses the superevent with the lowest FAR for an external
307 event to be added to. This is to prevent errors from trying to
308 add one external event to multiple superevents.
310 Parameters
311 ----------
312 raven_search_results : list
313 List of dictionaries of each related gracedb trigger
315 Returns
316 -------
317 superevent : list
318 List containing single chosen superevent
320 """
321 minfar, idx = min((result['far'], idx) for (idx, result) in
322 enumerate(raven_search_results))
323 return [raven_search_results[idx]]
326@app.task(queue='exttrig', shared=False)
327def update_coinc_far(coinc_far_dict, superevent, ext_event):
328 """Update joint info in superevent based on the current preferred
329 coincidence. In order of priority, the determing conditions are the
330 following:
332 * Likely astrophysical external candidates are preferred over likely
333 non-astrophysical candidates.
334 * Candidates that pass publishing thresholds are preferred over those
335 that do not.
336 * A SNEWS coincidence is preferred, then GRB/FRB/HEN, then subthreshold.
337 * A lower spacetime joint FAR is preferred over a higher spacetime joint
338 FAR.
339 * A lower temporal joint FAR is preferred over a higher temporal joint
340 FAR.
342 Parameters
343 ----------
344 coinc_far_dict : dict
345 Dictionary containing coincidence false alarm rate results from
346 RAVEN
347 superevent : dict
348 Superevent dictionary
349 ext_event: dict
350 External event dictionary
352 Returns
353 -------
354 coinc_far_far : dict
355 Dictionary containing joint false alarm rate passed to the function
356 as an initial argument
358 """
359 # Get graceids
360 superevent_id = superevent['superevent_id']
362 # Get the latest info to prevent race condition
363 superevent_latest = gracedb.get_superevent(superevent_id)
365 if superevent_latest['em_type']:
366 # If previous preferred external event, load to compare
367 emtype_event = gracedb.get_event(superevent_latest['em_type'])
369 # Load old joint FAR as dictionary
370 coinc_far_old = {
371 'temporal_coinc_far': superevent_latest['time_coinc_far'],
372 'spatiotemporal_coinc_far': superevent_latest['space_coinc_far']
373 }
375 events_fars = [(emtype_event, coinc_far_old),
376 (ext_event, coinc_far_dict)]
378 preferred_event_far = max(events_fars, key=keyfunc)
379 else:
380 preferred_event_far = (ext_event, coinc_far_dict)
382 # If preferred event is the new one, update the superevent
383 if preferred_event_far == (ext_event, coinc_far_dict):
384 gracedb.update_superevent(
385 superevent_id,
386 em_type=ext_event['graceid'],
387 time_coinc_far=coinc_far_dict.get('temporal_coinc_far'),
388 space_coinc_far=coinc_far_dict.get('spatiotemporal_coinc_far'))
389 return coinc_far_dict
392def keyfunc(event_far):
393 """Key function for selection of the preferred event.
395 Return a value suitable for identifying the preferred event. Given events
396 ``a`` and ``b``, ``a`` is preferred over ``b`` if
397 ``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred.
399 Parameters
400 ----------
401 event_far : tuple
402 Tuple of event dictionary and coinc far dictionary from RAVEN.
404 Returns
405 -------
406 key : tuple
407 The comparison key.
409 Notes
410 -----
411 Tuples are compared lexicographically in Python: they are compared
412 element-wise until an unequal pair of elements is found.
413 """
415 # Unpack input
416 event, coinc_far = event_far
417 # Prefer external event that is not vetoed by likely being
418 # non-astrophysical
419 likely_real = 'NOT_GRB' not in event['labels']
420 # Prefer external event that has passed publishing threshold
421 previous_alert = 'RAVEN_ALERT' in event['labels']
422 # Prefer higher threshold searches first
423 search_rank = app.conf['external_search_preference'].get(
424 event['search'], -1)
425 # Map so more significant FAR is a larger number
426 spacetime_far = coinc_far.get('spatiotemporal_coinc_far')
427 spacetime_rank = \
428 -spacetime_far if spacetime_far is not None else -float('inf')
429 temporal_far = coinc_far.get('temporal_coinc_far')
430 temporal_rank = \
431 -temporal_far if temporal_far is not None else -float('inf')
433 return (
434 likely_real,
435 previous_alert,
436 search_rank,
437 spacetime_rank,
438 temporal_rank
439 )
442@app.task(shared=False)
443def trigger_raven_alert(coinc_far_dict, superevent, gracedb_id,
444 ext_event, gw_group):
445 """Determine whether an event should be published as a preliminary alert.
446 If yes, then triggers an alert by applying `RAVEN_ALERT` to the preferred
447 event.
449 All of the following conditions must be true to either trigger an alert or
450 include coincidence info into the next alert include:
452 * The external event must be a threshold GRB or SNEWS event.
453 * If triggered on a SNEWS event, the GW false alarm rate must pass
454 :obj:`~gwcelery.conf.snews_gw_far_threshold`.
455 * The event's RAVEN coincidence false alarm rate, weighted by the
456 group-specific trials factor as specified by the
457 :obj:`~gwcelery.conf.preliminary_alert_trials_factor` configuration
458 setting, is less than or equal to
459 :obj:`~gwcelery.conf.preliminary_alert_far_threshold`. This FAR also
460 must not be negative.
461 * If the coincidence involves a GRB, then both sky maps must be present.
463 Parameters
464 ----------
465 coinc_far_dict : dict
466 Dictionary containing coincidence false alarm rate results from
467 RAVEN
468 superevent : dict
469 Superevent dictionary
470 gracedb_id : str
471 GraceDB ID of the trigger that launched RAVEN
472 ext_event : dict
473 External event dictionary
474 gw_group : str
475 Burst or CBC
477 """
478 preferred_gwevent_id = superevent['preferred_event']
479 superevent_id = superevent['superevent_id']
480 ext_id = ext_event['graceid']
481 # Specify group is not given, currently missing for subthreshold searches
482 gw_group = gw_group or superevent['preferred_event_data']['group']
483 gw_group = gw_group.lower()
484 gw_search = superevent['preferred_event_data']['search'].lower()
485 pipeline = ext_event['pipeline']
486 if gw_search in app.conf['significant_alert_trials_factor'][gw_group]:
487 trials_factor = \
488 app.conf['significant_alert_trials_factor'][gw_group][gw_search]
489 else:
490 trials_factor = 1
491 missing_skymap = True
492 comments = []
493 messages = []
495 # Since the significance of SNEWS triggers is so high, we will publish
496 # any trigger coincident with a decently significant GW candidate
497 if 'SNEWS' == pipeline:
498 gw_far = superevent['far']
499 far_type = 'gw'
500 far_threshold = app.conf['snews_gw_far_threshold']
501 pass_far_threshold = gw_far * trials_factor < far_threshold
502 is_far_negative = gw_far < 0
503 is_ext_subthreshold = False
504 missing_skymap = False
505 # Set coinc FAR to gw FAR only for the sake of a message below
506 time_coinc_far = space_coinc_far = coinc_far = None
507 coinc_far_f = gw_far
509 # The GBM team requested we not send automatic alerts from subthreshold
510 # GRBs. This checks that at least one threshold GRB present as well as
511 # the coinc far
512 else:
513 # check whether the GRB is threshold or sub-thresholds
514 is_ext_subthreshold = 'SubGRB' == ext_event['search']
516 # Use spatial FAR if available, otherwise use temporal
517 time_coinc_far = coinc_far_dict['temporal_coinc_far']
518 space_coinc_far = coinc_far_dict['spatiotemporal_coinc_far']
519 if space_coinc_far is not None:
520 coinc_far = space_coinc_far
521 missing_skymap = False
522 else:
523 coinc_far = time_coinc_far
525 far_type = 'joint'
526 if gw_search in app.conf['significant_alert_far_threshold'][gw_group]:
527 far_threshold = (
528 app.conf['significant_alert_far_threshold'][gw_group]
529 [gw_search]
530 )
531 else:
532 # Fallback in case an event is uploaded to an unlisted search
533 far_threshold = -1 * float('inf')
534 coinc_far_f = coinc_far * trials_factor * (trials_factor - 1.)
535 pass_far_threshold = coinc_far_f <= far_threshold
536 is_far_negative = coinc_far_f < 0
538 # Get most recent labels to prevent race conditions
539 ext_labels = gracedb.get_labels(ext_id)
540 no_previous_alert = {'RAVEN_ALERT'}.isdisjoint(ext_labels)
541 likely_real_ext_event = {'NOT_GRB'}.isdisjoint(ext_labels)
542 is_test_event = (superevent['preferred_event_data']['group'] == 'Test' or
543 ext_event['group'] == 'Test')
545 # If publishable, trigger an alert by applying `RAVEN_ALERT` label to
546 # preferred event
547 if pass_far_threshold and not is_ext_subthreshold and \
548 likely_real_ext_event and not missing_skymap and \
549 not is_test_event and no_previous_alert and \
550 not is_far_negative:
551 comments.append(('RAVEN: publishing criteria met for {0}-{1}. '
552 'Triggering RAVEN alert'.format(
553 preferred_gwevent_id, ext_id)))
554 # Add label to local dictionary and to event on GraceDB server
555 # NOTE: We may prefer to apply the superevent label first and the grab
556 # labels to refresh in the future
557 superevent['labels'] += 'RAVEN_ALERT'
558 # Add RAVEN_ALERT to preferred event last to avoid race conditions
559 # where superevent is expected to have it once alert is issued
560 alert_canvas = (
561 gracedb.create_label.si('RAVEN_ALERT', superevent_id)
562 |
563 gracedb.create_label.si('HIGH_PROFILE', superevent_id)
564 |
565 gracedb.create_label.si('RAVEN_ALERT', ext_id)
566 |
567 gracedb.create_label.si('RAVEN_ALERT', preferred_gwevent_id)
568 )
569 else:
570 alert_canvas = identity.si()
571 if not pass_far_threshold:
572 comments.append(('RAVEN: publishing criteria not met for {0}-{1},'
573 ' {2} FAR (w/ trials) too large '
574 '({3:.4g} > {4:.4g})'.format(
575 preferred_gwevent_id, ext_id, far_type,
576 coinc_far_f, far_threshold)))
577 if is_ext_subthreshold:
578 comments.append(('RAVEN: publishing criteria not met for {0}-{1},'
579 ' {1} is subthreshold'.format(preferred_gwevent_id,
580 ext_id)))
581 if not likely_real_ext_event:
582 ext_far = ext_event['far']
583 grb_far_threshold = \
584 app.conf['raven_targeted_far_thresholds']['GRB'][pipeline]
585 extra_sentence = ''
586 if ext_far is not None and grb_far_threshold < ext_far:
587 extra_sentence = (' This due to the GRB FAR being too high '
588 '({0:.4g} > {1:.4g})'.format(
589 ext_far, grb_far_threshold))
590 comments.append(('RAVEN: publishing criteria not met for {0}-{1},'
591 ' {1} is likely non-astrophysical.{2}'.format(
592 preferred_gwevent_id, ext_id, extra_sentence)))
593 if is_test_event:
594 comments.append('RAVEN: {0}-{1} is non-astrophysical, '
595 'at least one event is a Test event'.format(
596 preferred_gwevent_id, ext_id))
597 if missing_skymap:
598 comments.append('RAVEN: Will only publish GRB coincidence '
599 'if spatial-temporal FAR is present. '
600 'Waiting for both sky maps to be available '
601 'first.')
602 if is_far_negative:
603 comments.append(('RAVEN: publishing criteria not met for {0}-{1},'
604 ' {2} FAR is negative ({3:.4g})'.format(
605 preferred_gwevent_id, ext_id, far_type,
606 coinc_far_f)))
607 for comment in comments:
608 messages.append(gracedb.upload.si(None, None, superevent_id, comment,
609 tags=['ext_coinc']))
611 # Update coincidence FAR with latest info, including the application of
612 # RAVEN_ALERT, then issue alert
613 (
614 update_coinc_far.si(coinc_far_dict, superevent, ext_event)
615 |
616 group(
617 alert_canvas,
618 external_skymaps.plot_overlap_integral.s(superevent, ext_event),
619 *messages
620 )
621 ).delay()
624@app.task(shared=False)
625def sog_paper_pipeline(ext_event, superevent):
626 """Determine whether an a speed of gravity measurment manuscript should be
627 created for a given coincidence. This is denoted by applying the
628 ``SOG_READY`` label to a superevent.
630 All of the following conditions must be true for a SoG paper:
632 * The coincidence is significant and FARs more significant than in
633 :obj:`~sog_paper_far_threshold`.
634 * The external event is a high-significance GRB and from an MOU partner.
635 * The GW event is a CBC candidate.
637 Parameters
638 ----------
639 superevent : dict
640 Superevent dictionary
641 ext_event : dict
642 External event dictionary
644 """
645 gw_far = superevent['far']
646 coinc_far = superevent['space_coinc_far']
647 gw_far_threshold = app.conf['sog_paper_far_threshold']['gw']
648 joint_far_threshold = app.conf['sog_paper_far_threshold']['joint']
650 # Check publishing conditions
651 pass_gw_far_threshold = gw_far <= gw_far_threshold
652 pass_joint_far_threshold = coinc_far <= joint_far_threshold
653 is_grb = ext_event['search'] in ['GRB', 'MDC']
654 is_mou_partner = ext_event['pipeline'] in ['Fermi', 'Swift']
655 is_cbc = superevent['preferred_event_data']['group'] == 'CBC'
657 if is_grb and is_cbc and is_mou_partner and \
658 pass_gw_far_threshold and pass_joint_far_threshold:
659 # Trigger SOG_READY label alert to alert SOG analysts
660 gracedb.create_label.si('SOG_READY',
661 superevent['superevent_id']).delay()