Coverage for gwcelery/tasks/orchestrator.py: 95%
404 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Tasks that comprise the alert orchestrator.
3The orchestrator is responsible for the vetting and annotation workflow to
4produce preliminary, initial, and update alerts for gravitational-wave event
5candidates.
6"""
7import json
8import re
10from astropy.time import Time
11from celery import chain, group
12from ligo.rrt_chat import channel_creation
13from rapidpe_rift_pipe import pastro as rpe_pastro
15from .. import app
16from . import (alerts, bayestar, circulars, detchar, em_bright,
17 external_skymaps, gcn, gracedb, igwn_alert, inference, p_astro,
18 rrt_utils, skymaps, superevents)
19from .core import get_first, identity
22@igwn_alert.handler('superevent',
23 'mdc_superevent',
24 shared=False)
25def handle_superevent(alert):
26 """Schedule annotations for new superevents.
28 After waiting for a time specified by the
29 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the
30 choice of preferred event to settle down, this task performs data quality
31 checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls
32 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_alert` to send
33 a preliminary notice.
34 """
35 superevent_id = alert['uid']
36 # launch PE and detchar based on new type superevents
37 if alert['alert_type'] == 'new':
38 # launching rapidpe 30s after merger.
39 timeout = max(
40 alert['object']['t_0'] - Time.now().gps +
41 app.conf['rapidpe_timeout'], 0
42 )
43 (
44 gracedb.get_superevent.si(superevent_id).set(
45 countdown=timeout
46 )
47 |
48 _get_pe_far_and_event.s()
49 |
50 parameter_estimation.s(superevent_id, 'rapidpe')
51 ).apply_async()
53 (
54 gracedb.get_superevent.si(superevent_id).set(
55 countdown=app.conf['pe_timeout']
56 )
57 |
58 _get_pe_far_and_event.s()
59 |
60 parameter_estimation.s(superevent_id, 'bilby')
61 ).apply_async()
63 # run check_vectors. Create and upload omegascans
64 group(
65 detchar.omegascan.si(alert['object']['t_0'], superevent_id),
67 detchar.check_vectors.si(
68 alert['object']['preferred_event_data'],
69 superevent_id,
70 alert['object']['t_start'],
71 alert['object']['t_end']
72 )
73 ).delay()
75 elif alert['alert_type'] == 'label_added':
76 label_name = alert['data']['name']
77 query = f'superevent: {superevent_id} group: CBC Burst'
78 if alert['object']['category'] == 'MDC':
79 query += ' MDC'
80 elif alert['object']['category'] == 'Test':
81 query += ' Test'
83 # launch less-significant preliminary alerts on LOW_SIGNIF_LOCKED
84 if label_name == superevents.FROZEN_LABEL:
85 # don't launch if EARLY_WARNING or SIGNIF_LOCKED is present
86 skipping_labels = {
87 superevents.SIGNIFICANT_LABEL,
88 superevents.EARLY_WARNING_LABEL
89 }.intersection(alert['object']['labels'])
90 if skipping_labels:
91 gracedb.upload.delay(
92 None, None, superevent_id,
93 "The superevent already has a significant/EW event, "
94 "skipping launching less-significant alert"
95 )
96 return
97 (
98 gracedb.upload.s(
99 None,
100 None,
101 superevent_id,
102 "Automated DQ check before sending less-significant "
103 "preliminary alert. New results supersede old results.",
104 tags=['data_quality']
105 )
106 |
107 detchar.check_vectors.si(
108 alert['object']['preferred_event_data'],
109 superevent_id,
110 alert['object']['t_start'],
111 alert['object']['t_end']
112 )
113 |
114 earlywarning_preliminary_alert.s(
115 alert, alert_type='less-significant')
116 ).apply_async()
118 # launch significant alert on SIGNIF_LOCKED
119 elif label_name == superevents.SIGNIFICANT_LABEL:
120 # ensure superevent is locked before starting alert workflow
121 r = chain()
122 if superevents.FROZEN_LABEL not in alert['object']['labels']:
123 r |= gracedb.create_label.si(superevents.FROZEN_LABEL,
124 superevent_id)
126 r |= (
127 gracedb.get_events.si(query)
128 |
129 superevents.select_preferred_event.s()
130 |
131 _update_superevent_and_return_event_dict.s(superevent_id)
132 |
133 _leave_log_message_and_return_event_dict.s(
134 superevent_id,
135 "Superevent cleaned up after significant event. "
136 )
137 |
138 _leave_log_message_and_return_event_dict.s(
139 superevent_id,
140 "Automated DQ check before sending significant alert. "
141 "New results supersede old results.",
142 tags=['data_quality']
143 )
144 |
145 detchar.check_vectors.s(
146 superevent_id,
147 alert['object']['t_start'],
148 alert['object']['t_end']
149 )
150 |
151 earlywarning_preliminary_alert.s(
152 alert, alert_type='preliminary')
153 )
154 r.apply_async()
156 # launch second preliminary on GCN_PRELIM_SENT
157 elif label_name == 'GCN_PRELIM_SENT':
158 (
159 identity.si().set(
160 # https://git.ligo.org/emfollow/gwcelery/-/issues/478
161 # FIXME: remove this task once https://github.com/celery/celery/issues/7851 is resolved # noqa: E501
162 countdown=app.conf['superevent_clean_up_timeout']
163 )
164 |
165 gracedb.get_events.si(query)
166 |
167 superevents.select_preferred_event.s()
168 |
169 _update_superevent_and_return_event_dict.s(superevent_id)
170 |
171 group(
172 _leave_log_message_and_return_event_dict.s(
173 superevent_id,
174 "Superevent cleaned up after first preliminary alert"
175 ),
177 gracedb.create_label.si('DQR_REQUEST', superevent_id)
178 )
179 |
180 get_first.s()
181 |
182 earlywarning_preliminary_alert.s(
183 alert, alert_type='preliminary')
184 ).apply_async()
186 # set pipeline preferred events
187 # FIXME: Ideally this should combined with the previous canvas.
188 # However, incorporating that group prevents canvas from executing
189 # maybe related to https://github.com/celery/celery/issues/7851
190 (
191 gracedb.get_events.si(query)
192 |
193 superevents.select_pipeline_preferred_event.s()
194 |
195 _set_pipeline_preferred_events.s(superevent_id)
196 ).apply_async(countdown=app.conf['superevent_clean_up_timeout'])
198 elif label_name == 'LOW_SIGNIF_PRELIM_SENT':
199 # similar workflow as the GCN_PRELIM_SENT
200 # except block by condition evaluated at the end of timeout
201 _revise_and_send_second_less_significant_alert.si(
202 alert, query, superevent_id,
203 ).apply_async(countdown=app.conf['superevent_clean_up_timeout'])
205 elif label_name == superevents.EARLY_WARNING_LABEL:
206 if superevents.SIGNIFICANT_LABEL in alert['object']['labels']:
207 # stop if full BW significant event already present
208 gracedb.upload.delay(
209 None, None, superevent_id,
210 "Superevent superseded by full BW event, skipping EW."
211 )
212 return
213 # start the EW alert pipeline; is blocked by SIGNIF_LOCKED
214 # ensure superevent is locked before starting pipeline
215 r = chain()
216 if superevents.FROZEN_LABEL not in alert['object']['labels']:
217 r |= gracedb.create_label.si(superevents.FROZEN_LABEL,
218 superevent_id)
220 r |= (
221 gracedb.get_events.si(query)
222 |
223 superevents.select_preferred_event.s()
224 |
225 _update_superevent_and_return_event_dict.s(superevent_id)
226 |
227 _leave_log_message_and_return_event_dict.s(
228 superevent_id,
229 "Superevent cleaned up before sending EW alert."
230 )
231 |
232 earlywarning_preliminary_alert.s(
233 alert, alert_type='earlywarning')
234 )
235 r.apply_async()
237 # launch initial/retraction alert on ADVOK/ADVNO
238 elif label_name == 'ADVOK':
239 initial_alert((None, None, None), alert)
240 elif label_name == 'ADVNO':
241 retraction_alert(alert)
242 elif label_name == 'ADVREQ':
243 if app.conf['create_mattermost_channel']:
244 _create_mattermost_channel.si(superevent_id).delay()
246 # check DQV label on superevent, run check_vectors if required
247 elif alert['alert_type'] == 'event_added':
248 # FIXME Check if this should be changed to 'update' alert_types instead
249 # of 'event_added'. 'event_added' seems like it's just a new event in
250 # the superevent window, not necessarily an event that should be
251 # promoted to preferred event
252 start = alert['data']['t_start']
253 end = alert['data']['t_end']
255 if 'DQV' in gracedb.get_labels(superevent_id):
256 (
257 detchar.check_vectors.s(
258 alert['object']['preferred_event_data'],
259 superevent_id,
260 start,
261 end
262 )
263 |
264 _update_if_dqok.s(superevent_id)
265 ).apply_async()
268@igwn_alert.handler('cbc_gstlal',
269 'cbc_spiir',
270 'cbc_pycbc',
271 'cbc_mbta',
272 shared=False)
273def handle_cbc_event(alert):
274 """Perform annotations for CBC events that depend on pipeline-specific
275 matched-filter parameter estimates.
277 Notes
278 -----
279 This IGWN alert message handler is triggered by a new upload or by updates
280 that include the file ``pipeline.p_astro.json``. If also generates
281 pipeline.p_astro.json information for pipelines that do not provide
282 such information.
284 The table below lists which files are created as a result of a new upload,
285 and which tasks generate them.
287 ============================== ==================================================
288 File Task
289 ============================== ==================================================
290 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize`
291 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.source_properties`
292 ``pipeline.p_astro.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro`
293 ============================== ==================================================
295 """ # noqa: E501
296 graceid = alert['uid']
297 pipeline = alert['object']['pipeline'].lower()
298 search = alert['object']['search'].lower()
300 # no annotations for events used in VT analysis
301 if search == superevents.VT_SEARCH_NAME.lower():
302 return
304 priority = 0 if superevents.should_publish(alert['object']) else 1
306 # Pipelines that use the GWCelery p-astro method
307 # - spiir (all searches)
308 # - pycbc for EarlyWarning search
309 # - periodic MDC generated by first-two-years (based on gstlal)
310 # FIXME: Remove this once all pipelines compute their own p-astro
311 pipelines_stock_p_astro = {('spiir', 'earlywarning'),
312 ('pycbc', 'earlywarning'),
313 ('gstlal', 'mdc')}
315 # em_bright and p_astro calculation
316 if alert['alert_type'] == 'new':
317 instruments = superevents.get_instruments_in_ranking_statistic(
318 alert['object'])
319 extra_attributes = alert['object']['extra_attributes']
320 snr = superevents.get_snr(alert['object'])
321 far = alert['object']['far']
322 mass1 = extra_attributes['SingleInspiral'][0]['mass1']
323 mass2 = extra_attributes['SingleInspiral'][0]['mass2']
324 chi1 = extra_attributes['SingleInspiral'][0]['spin1z']
325 chi2 = extra_attributes['SingleInspiral'][0]['spin2z']
327 (
328 em_bright.source_properties.si(mass1, mass2, chi1, chi2, snr,
329 pipeline=pipeline, search=search)
330 |
331 gracedb.upload.s(
332 'em_bright.json', graceid,
333 'em bright complete', ['em_bright', 'public']
334 )
335 |
336 gracedb.create_label.si('EMBRIGHT_READY', graceid)
337 ).apply_async(priority=priority)
339 # p_astro calculation for pipelines that does not provide a
340 # stock p_astro (upload pipeline.p_astro.json)
341 if (pipeline, search) in pipelines_stock_p_astro:
342 (
343 p_astro.compute_p_astro.s(snr,
344 far,
345 mass1,
346 mass2,
347 pipeline,
348 instruments)
349 |
350 gracedb.upload.s(
351 f'{pipeline}.p_astro.json', graceid,
352 'p_astro computation complete', ['p_astro', 'public']
353 )
354 |
355 gracedb.create_label.si('PASTRO_READY', graceid)
356 ).apply_async(priority=priority)
358 # Start BAYESTAR for all CBC pipelines.
359 (
360 gracedb.download.s('coinc.xml', graceid)
361 |
362 bayestar.localize.s(graceid)
363 |
364 gracedb.upload.s(
365 'bayestar.multiorder.fits', graceid,
366 'sky localization complete', ['sky_loc', 'public']
367 )
368 |
369 gracedb.create_label.si('SKYMAP_READY', graceid)
370 ).apply_async(priority=priority)
373@igwn_alert.handler('burst_olib',
374 'burst_cwb',
375 'burst_mly',
376 shared=False)
377def handle_burst_event(alert):
378 """Perform annotations for burst events that depend on pipeline-specific
379 """ # noqa: E501
380 graceid = alert['uid']
381 pipeline = alert['object']['pipeline'].lower()
382 search = alert['object']['search'].lower()
383 priority = 0 if superevents.should_publish(alert['object']) else 1
385 # em_bright calculation for Burst-cWB-BBH
386 if alert['alert_type'] == 'new':
387 if (pipeline, search) in [('cwb', 'bbh')]:
388 extra_attributes = alert['object']['extra_attributes']
389 multiburst = extra_attributes['MultiBurst']
390 snr = multiburst.get('snr')
391 mchirp = multiburst.get('mchirp', 0.0)
392 # FIXME once ingestion of mchip is finalised
393 # In case mchirp is not there or zero
394 # produce em_brigth with all zero
395 if mchirp == 0.0:
396 m12 = 30.0
397 else:
398 m12 = 2**(0.2) * mchirp
399 (
400 em_bright.source_properties.si(m12, m12, 0.0, 0.0, snr)
401 |
402 gracedb.upload.s(
403 'em_bright.json', graceid,
404 'em bright complete', ['em_bright', 'public']
405 )
406 |
407 gracedb.create_label.si('EMBRIGHT_READY', graceid)
408 ).apply_async(priority=priority)
410 if alert['alert_type'] != 'log':
411 return
413 filename = alert['data']['filename']
415 # Pipeline is uploading a flat resultion skymap file
416 # Converting to a multiorder ones with proper name.
417 # FIXME: Remove block when CWB starts to upload skymaps
418 # in multiorder format
419 if filename.endswith('.fits.gz'):
420 new_filename = filename.replace('.fits.gz', '.multiorder.fits')
421 flatten_msg = (
422 'Multi-resolution FITS file created from '
423 '<a href="/api/events/{graceid}/files/'
424 '{filename}">{filename}</a>').format(
425 graceid=graceid, filename=filename)
426 tags = ['sky_loc', 'lvem', 'public']
427 (
428 gracedb.download.si(filename, graceid)
429 |
430 skymaps.unflatten.s(new_filename)
431 |
432 gracedb.upload.s(
433 new_filename, graceid, flatten_msg, tags)
434 |
435 gracedb.create_label.si('SKYMAP_READY', graceid)
436 ).apply_async(priority=priority)
439@igwn_alert.handler('superevent',
440 'mdc_superevent',
441 shared=False)
442def handle_posterior_samples(alert):
443 """Generate multi-resolution and flat-resolution FITS files and skymaps
444 from an uploaded HDF5 file containing posterior samples.
445 """
446 if alert['alert_type'] != 'log' or \
447 not alert['data']['filename'].endswith('.posterior_samples.hdf5'):
448 return
449 superevent_id = alert['uid']
450 filename = alert['data']['filename']
451 info = '{} {}'.format(alert['data']['comment'], filename)
452 prefix, _ = filename.rsplit('.posterior_samples.')
453 skymap_filename = f'{prefix}.multiorder.fits'
454 labels = ['pe', 'sky_loc']
455 ifos = superevents.get_instruments(alert['object']['preferred_event_data'])
457 (
458 gracedb.download.si(filename, superevent_id)
459 |
460 skymaps.skymap_from_samples.s(superevent_id, ifos)
461 |
462 group(
463 skymaps.annotate_fits.s(
464 skymap_filename, superevent_id, labels
465 ),
467 gracedb.upload.s(
468 skymap_filename, superevent_id,
469 'Multiresolution FITS file generated from "{}"'.format(info),
470 labels
471 )
472 )
473 ).delay()
475 # em_bright from LALInference posterior samples
476 (
477 gracedb.download.si(filename, superevent_id)
478 |
479 em_bright.em_bright_posterior_samples.s()
480 |
481 gracedb.upload.s(
482 '{}.em_bright.json'.format(prefix), superevent_id,
483 'em-bright computed from "{}"'.format(info),
484 'pe'
485 )
486 ).delay()
489@app.task(bind=True, shared=False)
490def _create_mattermost_channel(self, superevent_id):
491 """
492 Creates a mattermost channel when ADVREQ label is applied and
493 posts a cooresponding gracedb link of that event in the channel
495 Channel name : O4 RRT {superevent_id}
497 Parameters:
498 ------------
499 superevent_id: str
500 The superevent id
501 """
502 gracedb_url = self.app.conf['gracedb_host']
503 channel_creation.rrt_channel_creation(
504 superevent_id, gracedb_url)
507@app.task(shared=False)
508def _set_pipeline_preferred_events(pipeline_event, superevent_id):
509 """Return group for setting pipeline preferred event using
510 :meth:`gracedb.add_pipeline_preferred_event`.
512 Parameters
513 ----------
514 pipeline_event: dict
515 {pipeline: event_dict} key value pairs, returned by
516 :meth:`superevents.select_pipeline_preferred_event`.
518 superevent_id: str
519 The superevent id
520 """
521 return group(
522 gracedb.add_pipeline_preferred_event(superevent_id,
523 event['graceid'])
524 for event in pipeline_event.values()
525 )
528@app.task(shared=False, ignore_result=True)
529def _update_if_dqok(event, superevent_id):
530 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK`
531 label has been applied.
532 """
533 if 'DQOK' in gracedb.get_labels(superevent_id):
534 event_id = event['graceid']
535 gracedb.update_superevent(superevent_id,
536 preferred_event=event_id,
537 t_0=event["gpstime"])
538 gracedb.upload.delay(
539 None, None, superevent_id,
540 comment=f'DQOK applied based on new event {event_id}')
543@gracedb.task(shared=False)
544def _create_voevent(classification, *args, **kwargs):
545 r"""Create a VOEvent record from an EM bright JSON file.
547 Parameters
548 ----------
549 classification : tuple, None
550 A collection of JSON strings, generated by
551 :meth:`gwcelery.tasks.em_bright.source_properties` and
552 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or
553 content of ``{gstlal,mbta}.p_astro.json`` uploaded
554 by {gstlal,mbta} respectively; or None
555 \*args
556 Additional positional arguments passed to
557 :meth:`gwcelery.tasks.gracedb.create_voevent`.
558 \*\*kwargs
559 Additional keyword arguments passed to
560 :meth:`gwcelery.tasks.gracedb.create_voevent`.
562 Returns
563 -------
564 str
565 The filename of the newly created VOEvent.
567 """
568 kwargs = dict(kwargs)
570 if classification is not None:
571 # Merge source classification and source properties into kwargs.
572 for text in classification:
573 # Ignore filenames, only load dict in bytes form
574 if text is not None:
575 kwargs.update(json.loads(text))
577 # FIXME: These keys have differ between em_bright.json
578 # and the GraceDB REST API.
579 try:
580 kwargs['ProbHasNS'] = kwargs.pop('HasNS')
581 except KeyError:
582 pass
584 try:
585 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant')
586 except KeyError:
587 pass
589 skymap_filename = kwargs.get('skymap_filename')
590 if skymap_filename is not None:
591 skymap_type = re.sub(
592 r'(\.multiorder)?\.fits(\..+)?(,[0-9]+)?$', '', skymap_filename)
593 kwargs.setdefault('skymap_type', skymap_type)
595 # FIXME: remove ._orig_run when this bug is fixed:
596 # https://github.com/getsentry/sentry-python/issues/370
597 return gracedb.create_voevent._orig_run(*args, **kwargs)
600@app.task(shared=False)
601def _create_label_and_return_filename(filename, label, graceid):
602 gracedb.create_label.delay(label, graceid)
603 return filename
606@app.task(shared=False)
607def _leave_log_message_and_return_event_dict(event, superevent_id,
608 message, **kwargs):
609 """Wrapper around :meth:`gracedb.upload`
610 that returns the event dictionary.
611 """
612 gracedb.upload.delay(None, None, superevent_id, message, **kwargs)
613 return event
616@gracedb.task(shared=False)
617def _update_superevent_and_return_event_dict(event, superevent_id):
618 """Wrapper around :meth:`gracedb.update_superevent`
619 that returns the event dictionary.
620 """
621 gracedb.update_superevent(superevent_id,
622 preferred_event=event['graceid'],
623 t_0=event['gpstime'])
624 return event
627@gracedb.task(shared=False)
628def _proceed_if_not_blocked_by(files, superevent_id, block_by):
629 """Return files in case the superevent does not have labels `block_by`
631 Parameters
632 ----------
633 files : tuple
634 List of files
635 superevent_id : str
636 The superevent id corresponding to files
637 block_by : set
638 Set of blocking labels. E.g. `{'ADVOK', 'ADVNO'}`
639 """
640 superevent_labels = gracedb.get_labels(superevent_id)
641 blocking_labels = block_by.intersection(superevent_labels)
642 if blocking_labels:
643 gracedb.upload.delay(
644 None, None, superevent_id,
645 f"Blocking automated notice due to labels {blocking_labels}"
646 )
647 return None
648 else:
649 return files
652@gracedb.task(shared=False)
653def _revise_and_send_second_less_significant_alert(alert, query,
654 superevent_id):
655 superevent_labels = gracedb.get_labels(superevent_id)
656 blocking_labels = {
657 'ADVREQ', 'ADVOK', 'ADVNO',
658 superevents.SIGNIFICANT_LABEL,
659 superevents.EARLY_WARNING_LABEL,
660 }
661 if blocking_labels.intersection(superevent_labels):
662 return
664 (
665 gracedb.get_events.si(query)
666 |
667 superevents.select_preferred_event.s()
668 |
669 _update_superevent_and_return_event_dict.s(superevent_id)
670 |
671 _leave_log_message_and_return_event_dict.s(
672 superevent_id,
673 "Superevent cleaned up before second less-significant alert"
674 )
675 |
676 earlywarning_preliminary_alert.s(
677 alert, alert_type='less-significant')
678 ).delay()
680 # set pipeline preferred events
681 # FIXME: Ideally this should combined with the previous canvas.
682 # However, incorporating that group prevents canvas from executing
683 # maybe related to https://github.com/celery/celery/issues/7851
684 (
685 gracedb.get_events.si(query)
686 |
687 superevents.select_pipeline_preferred_event.s()
688 |
689 _set_pipeline_preferred_events.s(superevent_id)
690 ).delay()
693@app.task(shared=False)
694def _annotate_fits_and_return_input(input_list, superevent_id):
695 """Unpack the output of the skymap, embright, p-astro download group in the
696 beginning of the
697 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas
698 and call :meth:`~gwcelery.tasks.skymaps.annotate_fits`.
701 Parameters
702 ----------
703 input_list : list
704 The output of the group that downloads the skymap, embright, and
705 p-astro files. This list is in the form [skymap, skymap_filename],
706 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename],
707 though the em-bright and p-astro lists can be populated by Nones
708 superevent_id : str
709 A list of the sky map, em_bright, and p_astro filenames.
710 """
712 skymaps.annotate_fits_tuple(
713 input_list[0],
714 superevent_id,
715 ['sky_loc', 'public']
716 )
718 return input_list
721@gracedb.task(shared=False)
722def _unpack_args_and_send_earlywarning_preliminary_alert(input_list, alert,
723 alert_type):
724 """Unpack the output of the skymap, embright, p-astro download group in the
725 beginning of the
726 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas
727 and call
728 :meth:`gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`.
731 Parameters
732 ----------
733 input_list : list
734 The output of the group that downloads the skymap, embright, and
735 p-astro files. This list is in the form [skymap, skymap_filename],
736 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename],
737 though the em-bright and p-astro lists can be populated by Nones
738 alert : dict
739 IGWN-Alert dictionary
740 alert_type : str
741 alert_type passed to
742 :meth:`earlywarning_preliminary_initial_update_alert`
743 """
744 if input_list is None: # alert is blocked by blocking labels
745 return
747 [skymap, skymap_filename], [em_bright, em_bright_filename], \
748 [p_astro_dict, p_astro_filename] = input_list
750 # Update to latest state after downloading files
751 superevent = gracedb.get_superevent(alert['object']['superevent_id'])
753 earlywarning_preliminary_initial_update_alert.delay(
754 [skymap_filename, em_bright_filename, p_astro_filename],
755 superevent, alert_type,
756 filecontents=[skymap, em_bright, p_astro_dict]
757 )
760@app.task(ignore_result=True, shared=False)
761def earlywarning_preliminary_alert(event, alert, alert_type='preliminary',
762 initiate_voevent=True):
763 """Produce a preliminary alert by copying any sky maps.
765 This consists of the following steps:
767 1. Copy any sky maps and source classification from the preferred event
768 to the superevent.
769 2. Create standard annotations for sky maps including all-sky plots by
770 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`.
771 3. Create a preliminary VOEvent.
772 4. Send the VOEvent to GCN and notices to SCiMMA and GCN.
773 5. Apply the GCN_PRELIM_SENT or LOW_SIGNIF_PRELIM_SENT
774 depending on the significant or less-significant alert
775 respectively.
776 6. Create and upload a GCN Circular draft.
777 """
778 priority = 0 if superevents.should_publish(event) else 1
779 preferred_event_id = event['graceid']
780 superevent_id = alert['uid']
782 # Define alert payloads depending on group-pipeline-search
783 # cbc-*-* : p_astro/em_bright
784 # burst.cwb-bbh : p_astro/em_bright
785 # burst-*-* : NO p_astro/em_bright
786 alert_group = event['group'].lower()
787 alert_pipeline = event['pipeline'].lower()
788 alert_search = event['search'].lower()
789 if alert_pipeline == 'cwb' and alert_search == 'bbh':
790 skymap_filename = alert_pipeline + '.multiorder.fits'
791 p_astro_filename = alert_pipeline + '.p_astro.json'
792 em_bright_filename = 'em_bright.json'
793 elif alert_group == 'cbc':
794 skymap_filename = 'bayestar.multiorder.fits'
795 p_astro_filename = alert_pipeline + '.p_astro.json' \
796 if alert_search != 'ssm' else None
797 em_bright_filename = 'em_bright.json'
798 elif alert_group == 'burst':
799 skymap_filename = event['pipeline'].lower() + '.multiorder.fits'
800 p_astro_filename = None
801 em_bright_filename = None
802 else:
803 raise NotImplementedError(
804 'Valid skymap required for preliminary alert'
805 )
807 # Determine if the event should be made public.
808 is_publishable = (superevents.should_publish(
809 event, significant=alert_type != 'less-significant') and
810 {'DQV', 'INJ'}.isdisjoint(
811 event['labels']))
813 # Download files from events and upload to superevent with relevant
814 # annotations. Pass file contents down the chain so alerts task doesn't
815 # need to download them again.
816 # Note: this is explicitly made a chain to fix an issue described in #464.
817 canvas = chain(
818 group(
819 gracedb.download.si(skymap_filename, preferred_event_id)
820 |
821 group(
822 identity.s(),
824 gracedb.upload.s(
825 skymap_filename,
826 superevent_id,
827 message='Localization copied from {}'.format(
828 preferred_event_id),
829 tags=['sky_loc', 'public']
830 )
831 |
832 _create_label_and_return_filename.s('SKYMAP_READY',
833 superevent_id)
834 ),
836 (
837 gracedb.download.si(em_bright_filename, preferred_event_id)
838 |
839 group(
840 identity.s(),
842 gracedb.upload.s(
843 em_bright_filename,
844 superevent_id,
845 message='Source properties copied from {}'.format(
846 preferred_event_id),
847 tags=['em_bright', 'public']
848 )
849 |
850 _create_label_and_return_filename.s('EMBRIGHT_READY',
851 superevent_id)
852 )
853 ) if em_bright_filename is not None else
854 identity.s([None, None]),
856 (
857 gracedb.download.si(p_astro_filename, preferred_event_id)
858 |
859 group(
860 identity.s(),
862 gracedb.upload.s(
863 p_astro_filename,
864 superevent_id,
865 message='Source classification copied from {}'.format(
866 preferred_event_id),
867 tags=['p_astro', 'public']
868 )
869 |
870 _create_label_and_return_filename.s('PASTRO_READY',
871 superevent_id)
872 )
873 ) if p_astro_filename is not None else
874 identity.s([None, None])
875 )
876 |
877 # Need annotate skymap task in body of chord instead of header because
878 # this task simply calls another task, which is to be avoided in chord
879 # headers. Note that any group that chains to a task is automatically
880 # upgraded to a chord.
881 _annotate_fits_and_return_input.s(superevent_id)
882 )
884 # Switch for disabling all but MDC alerts.
885 if app.conf['only_alert_for_mdc']:
886 if event.get('search') != 'MDC':
887 canvas |= gracedb.upload.si(
888 None, None, superevent_id,
889 ("Skipping alert because gwcelery has been configured to only"
890 " send alerts for MDC events."))
891 canvas.apply_async(priority=priority)
892 return
894 # Send notice and upload GCN circular draft for online events.
895 if is_publishable and initiate_voevent:
896 # presence of advocate action blocks significant prelim alert
897 # presence of adv action or significant event blocks EW alert
898 # presence of adv action or significant event or EW event blocks
899 # less significant alert
900 if alert_type == 'earlywarning':
901 # Launch DQR for significant early warning events after a timeout.
902 # If a full BW significant trigger arrives before the end of the
903 # timeout, the latter will apply the label instead, and this call
904 # is a noop.
905 gracedb.create_label.si(
906 'DQR_REQUEST',
907 superevent_id
908 ).apply_async(countdown=600)
909 blocking_labels = (
910 {'ADVOK', 'ADVNO'} if alert_type == 'preliminary'
911 else
912 {superevents.SIGNIFICANT_LABEL, 'ADVOK', 'ADVNO'}
913 if alert_type == 'earlywarning'
914 else
915 {superevents.EARLY_WARNING_LABEL, superevents.SIGNIFICANT_LABEL,
916 'ADVOK', 'ADVNO'}
917 if alert_type == 'less-significant'
918 else
919 set()
920 )
921 canvas |= (
922 _proceed_if_not_blocked_by.s(superevent_id, blocking_labels)
923 |
924 _unpack_args_and_send_earlywarning_preliminary_alert.s(
925 alert, alert_type
926 )
927 )
929 canvas.apply_async(priority=priority)
932@gracedb.task(shared=False)
933def _get_pe_far_and_event(superevent):
934 """Return FAR and event input to PE workflow.
936 The input FAR is the lowest FAR among CBC and Burst-BBH triggers.
937 The input event is the preferred event if it is a CBC trigger, otherwise
938 the CBC trigger with the lowest FAR is returned.
939 """
940 # FIXME: remove ._orig_run when this bug is fixed:
941 # https://github.com/getsentry/sentry-python/issues/370
942 events = [
943 gracedb.get_event._orig_run(gid) for gid in superevent['gw_events']
944 ]
945 events = [
946 e for e in events if e['group'].lower() == 'cbc' or (
947 e['group'].lower() == 'burst' and
948 e.get('search', 'None').lower() == 'bbh'
949 )
950 ]
951 events.sort(key=lambda e: e['far'])
952 preferred_event = superevent['preferred_event_data']
954 if preferred_event['group'].lower() == 'cbc':
955 return events[0]['far'], preferred_event
956 for e in events:
957 if e['group'].lower() == 'cbc':
958 return events[0]['far'], e
959 return None
962@app.task(ignore_result=True, shared=False)
963def parameter_estimation(far_event, superevent_id, pe_pipeline):
964 """Parameter Estimation with Bilby and RapidPE-RIFT. Parameter estimation
965 runs are triggered for CBC triggers which pass the FAR threshold and are
966 not mock uploads. For those which do not pass these criteria, this task
967 uploads messages explaining why parameter estimation is not started.
968 """
969 if far_event is None:
970 gracedb.upload.delay(
971 filecontents=None, filename=None,
972 graceid=superevent_id,
973 message='Parameter estimation will not start since no CBC triggers'
974 ' are found.',
975 tags='pe'
976 )
977 return
978 far, event = far_event
979 search = event['search'].lower()
980 if search in app.conf['significant_alert_far_threshold']['cbc']:
981 threshold = (
982 app.conf['significant_alert_far_threshold']['cbc'][search] /
983 app.conf['significant_alert_trials_factor']['cbc'][search]
984 )
985 else:
986 # Fallback in case an event is uploaded to an unlisted search
987 threshold = -1 * float('inf')
988 if far > threshold:
989 gracedb.upload.delay(
990 filecontents=None, filename=None,
991 graceid=superevent_id,
992 message='Parameter estimation will not start since FAR is larger '
993 'than the PE threshold, {} Hz.'.format(threshold),
994 tags='pe'
995 )
996 elif search == 'mdc':
997 gracedb.upload.delay(
998 filecontents=None, filename=None,
999 graceid=superevent_id,
1000 message='Parameter estimation will not start since parameter '
1001 'estimation is disabled for mock uploads.',
1002 tags='pe'
1003 )
1004 elif event.get('offline', False):
1005 gracedb.upload.delay(
1006 filecontents=None, filename=None,
1007 graceid=superevent_id,
1008 message='Parameter estimation will not start since parameter '
1009 'estimation is disabled for OFFLINE events.',
1010 tags='pe'
1011 )
1012 elif (
1013 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org'
1014 and event['pipeline'] == 'MBTA'
1015 ):
1016 # FIXME: Remove this block once multiple channels can be handled on
1017 # one gracedb instance
1018 gracedb.upload.delay(
1019 filecontents=None, filename=None,
1020 graceid=superevent_id,
1021 message='Parameter estimation is disabled for MBTA triggers '
1022 'on playground as MBTA analyses live data + online '
1023 'injections not O3 replay data + MDC injections',
1024 tags='pe'
1025 )
1026 elif pe_pipeline == 'rapidpe' and search == 'earlywarning':
1027 # Remove this if rapidpe can ingest early warning events
1028 gracedb.upload.delay(
1029 filecontents=None, filename=None,
1030 graceid=superevent_id,
1031 message='Parameter estimation by RapidPE-RIFT is disabled for '
1032 'earlywarning triggers.',
1033 tags='pe'
1034 )
1035 else:
1036 inference.start_pe.delay(event, superevent_id, pe_pipeline)
1039@gracedb.task(shared=False)
1040def earlywarning_preliminary_initial_update_alert(
1041 filenames,
1042 superevent,
1043 alert_type,
1044 filecontents=None
1045):
1046 """
1047 Create a canvas that sends an earlywarning, preliminary, initial, or update
1048 notice.
1050 Parameters
1051 ----------
1052 filenames : tuple
1053 A list of the sky map, em_bright, and p_astro filenames.
1054 superevent : dict
1055 The superevent dictionary, typically obtained from an IGWN Alert or
1056 from querying GraceDB.
1057 alert_type : {'less-significant', 'earlywarning', 'preliminary', 'initial', 'update'} # noqa: E501
1058 The alert type.
1060 Notes
1061 -----
1062 Tasks that call this function should be decorated with
1063 :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so
1064 that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is
1065 retried in the event of GraceDB API failures. If `EM_COINC` is in labels
1066 will create a RAVEN circular.
1068 """
1069 labels = superevent['labels']
1070 superevent_id = superevent['superevent_id']
1071 search = superevent['preferred_event_data']['search'].lower()
1073 if 'INJ' in labels:
1074 return
1076 if filecontents:
1077 assert alert_type in {
1078 'less-significant', 'earlywarning', 'preliminary'}
1080 skymap_filename, em_bright_filename, p_astro_filename = filenames
1081 rapidpe_pastro_filename = None
1082 rapidpe_pastro_needed = True
1083 combined_skymap_filename = None
1084 combined_skymap_needed = False
1085 skymap_needed = (skymap_filename is None)
1086 em_bright_needed = (em_bright_filename is None)
1087 p_astro_needed = False if search == 'ssm' else (p_astro_filename is None)
1088 raven_coinc = ('RAVEN_ALERT' in labels and bool(superevent['em_type']))
1089 if raven_coinc:
1090 ext_labels = gracedb.get_labels(superevent['em_type'])
1091 combined_skymap_needed = \
1092 {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(ext_labels))
1094 # FIXME: This if statement is always True, we should get rid of it.
1095 if skymap_needed or em_bright_needed or p_astro_needed or \
1096 combined_skymap_needed or rapidpe_pastro_needed:
1097 for message in gracedb.get_log(superevent_id):
1098 t = message['tag_names']
1099 f = message['filename']
1100 v = message['file_version']
1101 fv = '{},{}'.format(f, v)
1102 if not f:
1103 continue
1104 if skymap_needed \
1105 and {'sky_loc', 'public'}.issubset(t) \
1106 and f.endswith('.multiorder.fits') \
1107 and 'combined' not in f:
1108 skymap_filename = fv
1109 if em_bright_needed \
1110 and 'em_bright' in t \
1111 and f.endswith('.json'):
1112 em_bright_filename = fv
1113 if p_astro_needed \
1114 and {'p_astro', 'public'}.issubset(t) \
1115 and f.endswith('.json'):
1116 p_astro_filename = fv
1117 if combined_skymap_needed \
1118 and {'sky_loc', 'ext_coinc'}.issubset(t) \
1119 and f.startswith('combined-ext.') \
1120 and 'fit' in f:
1121 combined_skymap_filename = fv
1122 if rapidpe_pastro_needed \
1123 and {'p_astro', 'public'}.issubset(t) \
1124 and f == 'RapidPE_RIFT.p_astro.json':
1125 rapidpe_pastro_filename = fv
1127 if combined_skymap_needed:
1128 # for every alert, copy combined sky map over if applicable
1129 # FIXME: use file inheritance once available
1130 ext_id = superevent['em_type']
1131 if combined_skymap_filename:
1132 # If previous sky map, increase version by 1
1133 combined_skymap_filename_base, v = \
1134 combined_skymap_filename.split(',')
1135 v = str(int(v) + 1)
1136 combined_skymap_filename = \
1137 combined_skymap_filename_base + ',' + v
1138 else:
1139 combined_skymap_filename_base = \
1140 (external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER
1141 if '.multiorder.fits' in skymap_filename else
1142 external_skymaps.COMBINED_SKYMAP_FILENAME_FLAT)
1143 combined_skymap_filename = combined_skymap_filename_base + ',0'
1144 message = 'Combined LVK-external sky map copied from {0}'.format(
1145 ext_id)
1146 message_png = (
1147 'Mollweide projection of <a href="/api/superevents/{se_id}/files/'
1148 '{filename}">{filename}</a>, copied from {ext_id}').format(
1149 se_id=superevent_id,
1150 ext_id=ext_id,
1151 filename=combined_skymap_filename)
1153 combined_skymap_canvas = group(
1154 gracedb.download.si(combined_skymap_filename_base, ext_id)
1155 |
1156 gracedb.upload.s(
1157 combined_skymap_filename_base, superevent_id,
1158 message, ['sky_loc', 'ext_coinc', 'public'])
1159 |
1160 gracedb.create_label.si('COMBINEDSKYMAP_READY', superevent_id),
1162 gracedb.download.si(external_skymaps.COMBINED_SKYMAP_FILENAME_PNG,
1163 ext_id)
1164 |
1165 gracedb.upload.s(
1166 external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, superevent_id,
1167 message_png, ['sky_loc', 'ext_coinc', 'public']
1168 )
1169 |
1170 # Pass None to download_anor_expose group
1171 identity.si()
1172 )
1174 # circular template not needed for less-significant alerts
1175 if alert_type in {'earlywarning', 'preliminary', 'initial'}:
1176 if raven_coinc:
1177 circular_task = circulars.create_emcoinc_circular.si(superevent_id)
1178 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type)
1179 tags = ['em_follow', 'ext_coinc']
1181 else:
1182 circular_task = circulars.create_initial_circular.si(superevent_id)
1183 circular_filename = '{}-circular.txt'.format(alert_type)
1184 tags = ['em_follow']
1186 circular_canvas = (
1187 circular_task
1188 |
1189 gracedb.upload.s(
1190 circular_filename,
1191 superevent_id,
1192 'Template for {} GCN Circular'.format(alert_type),
1193 tags=tags)
1194 )
1196 else:
1197 circular_canvas = identity.si()
1199 # less-significant alerts have "preliminary" voevent notice type
1200 alert_type_voevent = 'preliminary' if alert_type == 'less-significant' \
1201 else alert_type
1202 # set the significant field in the VOEvent based on
1203 # less-significant/significant alert.
1204 # For kafka alerts the analogous field is set in alerts.py.
1205 # (see comment before defining kafka_alert_canvas)
1206 voevent_significance = 0 if alert_type == 'less-significant' else 1
1208 if filecontents and not combined_skymap_filename:
1209 skymap, em_bright, p_astro_dict = filecontents
1211 # check high profile and apply label if true
1212 if alert_type == 'preliminary':
1213 high_profile_canvas = rrt_utils.check_high_profile.si(
1214 skymap, em_bright, p_astro_dict, superevent
1215 )
1216 else:
1217 high_profile_canvas = identity.si()
1219 download_andor_expose_group = []
1220 if rapidpe_pastro_filename is None:
1221 voevent_canvas = _create_voevent.si(
1222 (em_bright, p_astro_dict),
1223 superevent_id,
1224 alert_type_voevent,
1225 Significant=voevent_significance,
1226 skymap_filename=skymap_filename,
1227 internal=False,
1228 open_alert=True,
1229 raven_coinc=raven_coinc,
1230 combined_skymap_filename=combined_skymap_filename
1231 )
1232 rapidpe_canvas = _update_rapidpe_pastro_shouldnt_run.s()
1234 # kafka alerts have a field called "significant" based on
1235 # https://dcc.ligo.org/LIGO-G2300151/public
1236 # The alert_type value passed to alerts.send is used to
1237 # set this field in the alert dictionary
1238 kafka_alert_canvas = alerts.send.si(
1239 (skymap, em_bright, p_astro_dict),
1240 superevent,
1241 alert_type,
1242 raven_coinc=raven_coinc
1243 )
1244 else:
1245 voevent_canvas = _create_voevent.s(
1246 superevent_id,
1247 alert_type_voevent,
1248 Significant=voevent_significance,
1249 skymap_filename=skymap_filename,
1250 internal=False,
1251 open_alert=True,
1252 raven_coinc=raven_coinc,
1253 combined_skymap_filename=combined_skymap_filename
1254 )
1255 download_andor_expose_group += [
1256 gracedb.download.si(rapidpe_pastro_filename, superevent_id)
1257 ]
1259 kafka_alert_canvas = _check_pastro_and_send_alert.s(
1260 skymap,
1261 em_bright,
1262 superevent,
1263 alert_type,
1264 raven_coinc=raven_coinc
1265 )
1267 rapidpe_canvas = (
1268 _update_rapidpe_pastro.s(
1269 em_bright=em_bright,
1270 pipeline_pastro=p_astro_dict)
1271 |
1272 _upload_rapidpe_pastro_json.s(
1273 superevent_id,
1274 rapidpe_pastro_filename
1275 )
1276 )
1277 else:
1278 # Download em_bright and p_astro files here for voevent
1279 download_andor_expose_group = [
1280 gracedb.download.si(em_bright_filename, superevent_id) if
1281 em_bright_filename is not None else identity.s(None),
1282 ]
1283 if search != 'ssm':
1284 download_andor_expose_group += [
1285 gracedb.download.si(p_astro_filename, superevent_id) if
1286 p_astro_filename is not None else identity.s(None)
1287 ]
1288 else:
1289 # for SSM events skip downloading p-astro file
1290 download_andor_expose_group += [identity.s(None)]
1291 high_profile_canvas = identity.si()
1293 voevent_canvas = _create_voevent.s(
1294 superevent_id,
1295 alert_type_voevent,
1296 Significant=voevent_significance,
1297 skymap_filename=skymap_filename,
1298 internal=False,
1299 open_alert=True,
1300 raven_coinc=raven_coinc,
1301 combined_skymap_filename=combined_skymap_filename
1302 )
1304 if rapidpe_pastro_filename:
1305 download_andor_expose_group += [
1306 gracedb.download.si(rapidpe_pastro_filename, superevent_id)
1307 ]
1309 rapidpe_canvas = (
1310 _update_rapidpe_pastro.s()
1311 |
1312 _upload_rapidpe_pastro_json.s(
1313 superevent_id,
1314 rapidpe_pastro_filename
1315 )
1316 )
1318 # The skymap has not been downloaded at this point, so we need to
1319 # download it before we can assemble the kafka alerts and send them
1320 kafka_alert_canvas = alerts.download_skymap_and_send_alert.s(
1321 superevent,
1322 alert_type,
1323 skymap_filename=skymap_filename,
1324 raven_coinc=raven_coinc,
1325 combined_skymap_filename=combined_skymap_filename
1326 )
1328 to_expose = [skymap_filename, em_bright_filename, p_astro_filename]
1329 # Since PE skymap images, HTML, and gzip FITS are not made public when they
1330 # are uploaded, we need to expose them here.
1331 if (
1332 skymap_filename is not None and 'bilby' in skymap_filename.lower()
1333 ):
1334 prefix, _, _ = skymap_filename.partition('.multiorder.fits')
1335 to_expose += [f'{prefix}.html', f'{prefix}.png',
1336 f'{prefix}.volume.png', f'{prefix}.fits.gz']
1337 download_andor_expose_group += [
1338 gracedb.expose.si(superevent_id),
1339 *(
1340 gracedb.create_tag.si(filename, 'public', superevent_id)
1341 for filename in to_expose if filename is not None
1342 )
1343 ]
1345 voevent_canvas |= group(
1346 gracedb.download.s(superevent_id)
1347 |
1348 gcn.send.s(),
1350 gracedb.create_tag.s('public', superevent_id)
1351 )
1353 if combined_skymap_needed:
1354 download_andor_expose_group += [combined_skymap_canvas]
1356 sent_label_canvas = identity.si()
1357 if alert_type == 'less-significant':
1358 sent_label_canvas = gracedb.create_label.si(
1359 'LOW_SIGNIF_PRELIM_SENT',
1360 superevent_id
1361 )
1362 elif alert_type == 'preliminary':
1363 sent_label_canvas = gracedb.create_label.si(
1364 'GCN_PRELIM_SENT',
1365 superevent_id
1366 )
1368 # NOTE: The following canvas structure was used to fix #480
1369 canvas = (
1370 group(download_andor_expose_group)
1371 )
1372 if rapidpe_pastro_filename:
1373 canvas |= rapidpe_canvas
1375 canvas |= (
1376 group(
1377 voevent_canvas
1378 |
1379 group(
1380 circular_canvas,
1382 sent_label_canvas
1383 ),
1385 kafka_alert_canvas,
1387 high_profile_canvas
1388 )
1389 )
1391 canvas.apply_async()
1394@app.task(shared=False)
1395def _update_rapidpe_pastro(input_list, em_bright=None, pipeline_pastro=None):
1396 """
1397 If p_terr from rapidpe is different from the p_terr from the most
1398 recent preferred event, replaces rapidpe's p_terr with pipeline p_terr.
1399 Returns a tuple of em_bright, rapidpe pastro and a
1400 boolean(rapidpe_pastro_updated) indicating if rapidpe pastro has been
1401 updated. If p_terr in rapidpe has been updated, the return list contains
1402 the updated pastro and the rapidpe_pastro_updated is True. Else, the
1403 return list contains the rapidpe pastro from the input_list and
1404 rapidpe_pastro_updated is False.
1405 """
1406 # input_list is download_andor_expose_group in
1407 # function earlywarning_preliminary_initial_update_alert
1408 if pipeline_pastro is None:
1409 em_bright, pipeline_pastro, rapidpe_pastro, *_ = input_list
1410 else:
1411 rapidpe_pastro, *_ = input_list
1412 pipeline_pastro_contents = json.loads(pipeline_pastro)
1413 rapidpe_pastro_contents = json.loads(rapidpe_pastro)
1415 if (rapidpe_pastro_contents["Terrestrial"]
1416 == pipeline_pastro_contents["Terrestrial"]):
1417 rapidpe_pastro_updated = False
1418 else:
1419 rapidpe_pastro = json.dumps(
1420 rpe_pastro.renormalize_pastro_with_pipeline_pterr(
1421 rapidpe_pastro_contents, pipeline_pastro_contents
1422 )
1423 )
1424 rapidpe_pastro_updated = True
1426 return em_bright, rapidpe_pastro, rapidpe_pastro_updated
1429@app.task(shared=False)
1430def _update_rapidpe_pastro_shouldnt_run():
1431 raise RuntimeError(
1432 "The `rapidpe_canvas' was executed where it should"
1433 "not have been. A bug must have been introduced."
1434 )
1437@gracedb.task(shared=False)
1438def _upload_rapidpe_pastro_json(
1439 input_list,
1440 superevent_id,
1441 rapidpe_pastro_filename
1442):
1443 """
1444 Add public tag to RapidPE_RIFT.p_astro.json if p_terr from the
1445 preferred event is same as the p_terr in RapidPE_RIFT.p_astro.json.
1446 Else, uploads an updated version of RapidPE_RIFT.p_astro.json
1447 with file content from the task update_rapidpe_pastro.
1448 """
1449 # input_list is output from update_rapidpe_pastro
1450 *return_list, rapidpe_pastro_updated = input_list
1451 if rapidpe_pastro_updated is True:
1452 tags = ("pe", "p_astro", "public")
1454 upload_filename = "RapidPE_RIFT.p_astro.json"
1455 description = "RapidPE-RIFT Pastro results"
1456 content = input_list[1]
1457 gracedb.upload(
1458 content,
1459 upload_filename,
1460 superevent_id,
1461 description,
1462 tags
1463 )
1464 return return_list
1467@app.task(shared=False)
1468def _check_pastro_and_send_alert(input_classification, skymap, em_bright,
1469 superevent, alert_type, raven_coinc=False):
1470 """Wrapper for :meth:`~gwcelery.tasks.alerts.send` meant to take a
1471 potentially new p-astro as input from the preceding task.
1472 """
1473 _, p_astro = input_classification
1474 alerts.send.delay(
1475 (skymap, em_bright, p_astro),
1476 superevent,
1477 alert_type,
1478 raven_coinc=raven_coinc
1479 )
1482@gracedb.task(ignore_result=True, shared=False)
1483def initial_alert(filenames, alert):
1484 """Produce an initial alert.
1486 This does nothing more than call
1487 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`
1488 with ``alert_type='initial'``.
1490 Parameters
1491 ----------
1492 filenames : tuple
1493 A list of the sky map, em_bright, and p_astro filenames.
1494 alert : dict
1495 IGWN-Alert dictionary
1497 Notes
1498 -----
1499 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather
1500 than :obj:`gwcelery.app.task` so that a synchronous call to
1501 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB
1502 API failures.
1504 """
1505 earlywarning_preliminary_initial_update_alert(
1506 filenames,
1507 alert['object'],
1508 'initial'
1509 )
1512@gracedb.task(ignore_result=True, shared=False)
1513def update_alert(filenames, superevent_id):
1514 """Produce an update alert.
1516 This does nothing more than call
1517 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`
1518 with ``alert_type='update'``.
1520 Parameters
1521 ----------
1522 filenames : tuple
1523 A list of the sky map, em_bright, and p_astro filenames.
1524 superevent_id : str
1525 The superevent ID.
1527 Notes
1528 -----
1529 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather
1530 than :obj:`gwcelery.app.task` so that a synchronous call to
1531 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB
1532 API failures.
1534 """
1535 superevent = gracedb.get_superevent._orig_run(superevent_id)
1536 earlywarning_preliminary_initial_update_alert(
1537 filenames,
1538 superevent,
1539 'update'
1540 )
1543@app.task(ignore_result=True, shared=False)
1544def retraction_alert(alert):
1545 """Produce a retraction alert."""
1546 superevent_id = alert['uid']
1547 group(
1548 gracedb.expose.si(superevent_id)
1549 |
1550 group(
1551 _create_voevent.si(
1552 None, superevent_id, 'retraction',
1553 internal=False,
1554 open_alert=True
1555 )
1556 |
1557 group(
1558 gracedb.download.s(superevent_id)
1559 |
1560 gcn.send.s(),
1562 gracedb.create_tag.s('public', superevent_id)
1563 ),
1565 alerts.send.si(
1566 None,
1567 alert['object'],
1568 'retraction'
1569 )
1570 ),
1572 circulars.create_retraction_circular.si(superevent_id)
1573 |
1574 gracedb.upload.s(
1575 'retraction-circular.txt',
1576 superevent_id,
1577 'Template for retraction GCN Circular',
1578 tags=['em_follow']
1579 )
1580 ).apply_async()