Coverage for gwcelery/tasks/orchestrator.py: 94%
400 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Tasks that comprise the alert orchestrator.
3The orchestrator is responsible for the vetting and annotation workflow to
4produce preliminary, initial, and update alerts for gravitational-wave event
5candidates.
6"""
7import json
8import re
10from astropy.time import Time
11from celery import chain, group
12from ligo.rrt_chat import channel_creation
13from rapidpe_rift_pipe import pastro as rpe_pastro
15from .. import app
16from . import (alerts, bayestar, circulars, detchar, em_bright,
17 external_skymaps, gcn, gracedb, igwn_alert, inference, p_astro,
18 rrt_utils, skymaps, superevents)
19from .core import get_first, identity
22@igwn_alert.handler('superevent',
23 'mdc_superevent',
24 shared=False)
25def handle_superevent(alert):
26 """Schedule annotations for new superevents.
28 After waiting for a time specified by the
29 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the
30 choice of preferred event to settle down, this task performs data quality
31 checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls
32 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_alert` to send
33 a preliminary notice.
34 """
35 superevent_id = alert['uid']
36 # launch PE and detchar based on new type superevents
37 if alert['alert_type'] == 'new':
38 # launching rapidpe 30s after merger.
39 timeout = max(
40 alert['object']['t_0'] - Time.now().gps +
41 app.conf['rapidpe_timeout'], 0
42 )
43 (
44 gracedb.get_superevent.si(superevent_id).set(
45 countdown=timeout
46 )
47 |
48 _get_pe_far_and_event.s()
49 |
50 parameter_estimation.s(superevent_id, 'rapidpe')
51 ).apply_async()
53 (
54 gracedb.get_superevent.si(superevent_id).set(
55 countdown=app.conf['pe_timeout']
56 )
57 |
58 _get_pe_far_and_event.s()
59 |
60 parameter_estimation.s(superevent_id, 'bilby')
61 ).apply_async()
63 # run check_vectors. Create and upload omegascans
64 group(
65 detchar.omegascan.si(alert['object']['t_0'], superevent_id),
67 detchar.check_vectors.si(
68 alert['object']['preferred_event_data'],
69 superevent_id,
70 alert['object']['t_start'],
71 alert['object']['t_end']
72 )
73 ).delay()
75 elif alert['alert_type'] == 'label_added':
76 label_name = alert['data']['name']
77 query = f'superevent: {superevent_id} group: CBC Burst'
78 if alert['object']['category'] == 'MDC':
79 query += ' MDC'
80 elif alert['object']['category'] == 'Test':
81 query += ' Test'
83 # launch less-significant preliminary alerts on LOW_SIGNIF_LOCKED
84 if label_name == superevents.FROZEN_LABEL:
85 # don't launch if EARLY_WARNING or SIGNIF_LOCKED is present
86 skipping_labels = {
87 superevents.SIGNIFICANT_LABEL,
88 superevents.EARLY_WARNING_LABEL
89 }.intersection(alert['object']['labels'])
90 if skipping_labels:
91 gracedb.upload.delay(
92 None, None, superevent_id,
93 "The superevent already has a significant/EW event, "
94 "skipping launching less-significant alert"
95 )
96 return
97 (
98 gracedb.upload.s(
99 None,
100 None,
101 superevent_id,
102 "Automated DQ check before sending less-significant "
103 "preliminary alert. New results supersede old results.",
104 tags=['data_quality']
105 )
106 |
107 detchar.check_vectors.si(
108 alert['object']['preferred_event_data'],
109 superevent_id,
110 alert['object']['t_start'],
111 alert['object']['t_end']
112 )
113 |
114 earlywarning_preliminary_alert.s(
115 alert, alert_type='less-significant')
116 ).apply_async()
118 # launch significant alert on SIGNIF_LOCKED
119 elif label_name == superevents.SIGNIFICANT_LABEL:
120 # ensure superevent is locked before starting alert workflow
121 r = chain()
122 if superevents.FROZEN_LABEL not in alert['object']['labels']:
123 r |= gracedb.create_label.si(superevents.FROZEN_LABEL,
124 superevent_id)
126 r |= (
127 gracedb.get_events.si(query)
128 |
129 superevents.select_preferred_event.s()
130 |
131 _update_superevent_and_return_event_dict.s(superevent_id)
132 |
133 _leave_log_message_and_return_event_dict.s(
134 superevent_id,
135 "Superevent cleaned up after significant event. "
136 )
137 |
138 _leave_log_message_and_return_event_dict.s(
139 superevent_id,
140 "Automated DQ check before sending significant alert. "
141 "New results supersede old results.",
142 tags=['data_quality']
143 )
144 |
145 detchar.check_vectors.s(
146 superevent_id,
147 alert['object']['t_start'],
148 alert['object']['t_end']
149 )
150 |
151 earlywarning_preliminary_alert.s(
152 alert, alert_type='preliminary')
153 )
154 r.apply_async()
156 # launch second preliminary on GCN_PRELIM_SENT
157 elif label_name == 'GCN_PRELIM_SENT':
158 (
159 identity.si().set(
160 # https://git.ligo.org/emfollow/gwcelery/-/issues/478
161 # FIXME: remove this task once https://github.com/celery/celery/issues/7851 is resolved # noqa: E501
162 countdown=app.conf['superevent_clean_up_timeout']
163 )
164 |
165 gracedb.get_events.si(query)
166 |
167 superevents.select_preferred_event.s()
168 |
169 _update_superevent_and_return_event_dict.s(superevent_id)
170 |
171 group(
172 _leave_log_message_and_return_event_dict.s(
173 superevent_id,
174 "Superevent cleaned up after first preliminary alert"
175 ),
177 gracedb.create_label.si('DQR_REQUEST', superevent_id)
178 )
179 |
180 get_first.s()
181 |
182 earlywarning_preliminary_alert.s(
183 alert, alert_type='preliminary')
184 ).apply_async()
186 # set pipeline preferred events
187 # FIXME: Ideally this should combined with the previous canvas.
188 # However, incorporating that group prevents canvas from executing
189 # maybe related to https://github.com/celery/celery/issues/7851
190 (
191 gracedb.get_events.si(query)
192 |
193 superevents.select_pipeline_preferred_event.s()
194 |
195 _set_pipeline_preferred_events.s(superevent_id)
196 ).apply_async(countdown=app.conf['superevent_clean_up_timeout'])
198 elif label_name == 'LOW_SIGNIF_PRELIM_SENT':
199 # similar workflow as the GCN_PRELIM_SENT
200 # except block by condition evaluated at the end of timeout
201 _revise_and_send_second_less_significant_alert.si(
202 alert, query, superevent_id,
203 ).apply_async(countdown=app.conf['superevent_clean_up_timeout'])
205 elif label_name == superevents.EARLY_WARNING_LABEL:
206 if superevents.SIGNIFICANT_LABEL in alert['object']['labels']:
207 # stop if full BW significant event already present
208 gracedb.upload.delay(
209 None, None, superevent_id,
210 "Superevent superseded by full BW event, skipping EW."
211 )
212 return
213 # start the EW alert pipeline; is blocked by SIGNIF_LOCKED
214 # ensure superevent is locked before starting pipeline
215 r = chain()
216 if superevents.FROZEN_LABEL not in alert['object']['labels']:
217 r |= gracedb.create_label.si(superevents.FROZEN_LABEL,
218 superevent_id)
220 r |= (
221 gracedb.get_events.si(query)
222 |
223 superevents.select_preferred_event.s()
224 |
225 _update_superevent_and_return_event_dict.s(superevent_id)
226 |
227 _leave_log_message_and_return_event_dict.s(
228 superevent_id,
229 "Superevent cleaned up before sending EW alert."
230 )
231 |
232 earlywarning_preliminary_alert.s(
233 alert, alert_type='earlywarning')
234 )
235 r.apply_async()
237 # launch initial/retraction alert on ADVOK/ADVNO
238 elif label_name == 'ADVOK':
239 initial_alert((None, None, None), alert)
240 elif label_name == 'ADVNO':
241 retraction_alert(alert)
242 elif label_name == 'ADVREQ':
243 if app.conf['create_mattermost_channel']:
244 _create_mattermost_channel.si(superevent_id).delay()
246 # check DQV label on superevent, run check_vectors if required
247 elif alert['alert_type'] == 'event_added':
248 # FIXME Check if this should be changed to 'update' alert_types instead
249 # of 'event_added'. 'event_added' seems like it's just a new event in
250 # the superevent window, not necessarily an event that should be
251 # promoted to preferred event
252 start = alert['data']['t_start']
253 end = alert['data']['t_end']
255 if 'DQV' in gracedb.get_labels(superevent_id):
256 (
257 detchar.check_vectors.s(
258 alert['object']['preferred_event_data'],
259 superevent_id,
260 start,
261 end
262 )
263 |
264 _update_if_dqok.s(superevent_id)
265 ).apply_async()
268@igwn_alert.handler('cbc_gstlal',
269 'cbc_spiir',
270 'cbc_pycbc',
271 'cbc_mbta',
272 shared=False)
273def handle_cbc_event(alert):
274 """Perform annotations for CBC events that depend on pipeline-specific
275 matched-filter parameter estimates.
277 Notes
278 -----
279 This IGWN alert message handler is triggered by a new upload or by updates
280 that include the file ``pipeline.p_astro.json``. If also generates
281 pipeline.p_astro.json information for pipelines that do not provide
282 such information.
284 The table below lists which files are created as a result of a new upload,
285 and which tasks generate them.
287 ============================== ==================================================
288 File Task
289 ============================== ==================================================
290 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize`
291 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.source_properties`
292 ``pipeline.p_astro.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro`
293 ============================== ==================================================
295 """ # noqa: E501
296 graceid = alert['uid']
297 pipeline = alert['object']['pipeline'].lower()
298 search = alert['object']['search'].lower()
300 # no annotations for events used in VT analysis
301 if search == superevents.VT_SEARCH_NAME.lower():
302 return
304 priority = 0 if superevents.should_publish(alert['object']) else 1
306 # Pipelines that use the GWCelery p-astro method
307 # - spiir (all searches)
308 # - pycbc for EarlyWarning search
309 # - periodic MDC generated by first-two-years (based on gstlal)
310 # FIXME: Remove this once all pipelines compute their own p-astro
311 pipelines_stock_p_astro = {('spiir', 'earlywarning'),
312 ('pycbc', 'earlywarning'),
313 ('gstlal', 'mdc')}
315 # em_bright and p_astro calculation
316 if alert['alert_type'] == 'new':
317 instruments = superevents.get_instruments_in_ranking_statistic(
318 alert['object'])
319 extra_attributes = alert['object']['extra_attributes']
320 snr = superevents.get_snr(alert['object'])
321 far = alert['object']['far']
322 mass1 = extra_attributes['SingleInspiral'][0]['mass1']
323 mass2 = extra_attributes['SingleInspiral'][0]['mass2']
324 chi1 = extra_attributes['SingleInspiral'][0]['spin1z']
325 chi2 = extra_attributes['SingleInspiral'][0]['spin2z']
327 (
328 em_bright.source_properties.si(mass1, mass2, chi1, chi2, snr,
329 pipeline=pipeline, search=search)
330 |
331 gracedb.upload.s(
332 'em_bright.json', graceid,
333 'em bright complete', ['em_bright', 'public']
334 )
335 |
336 gracedb.create_label.si('EMBRIGHT_READY', graceid)
337 ).apply_async(priority=priority)
339 # p_astro calculation for pipelines that does not provide a
340 # stock p_astro (upload pipeline.p_astro.json)
341 if (pipeline, search) in pipelines_stock_p_astro:
342 (
343 p_astro.compute_p_astro.s(snr,
344 far,
345 mass1,
346 mass2,
347 pipeline,
348 instruments)
349 |
350 gracedb.upload.s(
351 f'{pipeline}.p_astro.json', graceid,
352 'p_astro computation complete', ['p_astro', 'public']
353 )
354 |
355 gracedb.create_label.si('PASTRO_READY', graceid)
356 ).apply_async(priority=priority)
358 # Start BAYESTAR for all CBC pipelines.
359 (
360 gracedb.download.s('coinc.xml', graceid)
361 |
362 bayestar.localize.s(graceid)
363 |
364 gracedb.upload.s(
365 'bayestar.multiorder.fits', graceid,
366 'sky localization complete', ['sky_loc', 'public']
367 )
368 |
369 gracedb.create_label.si('SKYMAP_READY', graceid)
370 ).apply_async(priority=priority)
373@igwn_alert.handler('burst_olib',
374 'burst_cwb',
375 'burst_mly',
376 shared=False)
377def handle_burst_event(alert):
378 """Perform annotations for burst events that depend on pipeline-specific
379 """ # noqa: E501
380 graceid = alert['uid']
381 pipeline = alert['object']['pipeline'].lower()
382 search = alert['object']['search'].lower()
383 priority = 0 if superevents.should_publish(alert['object']) else 1
385 # em_bright calculation for Burst-cWB-BBH
386 if alert['alert_type'] == 'new':
387 if (pipeline, search) in [('cwb', 'bbh')]:
388 extra_attributes = alert['object']['extra_attributes']
389 multiburst = extra_attributes['MultiBurst']
390 snr = multiburst.get('snr')
391 mchirp = multiburst.get('mchirp', 0.0)
392 # FIXME once ingestion of mchip is finalised
393 # In case mchirp is not there or zero
394 # produce em_brigth with all zero
395 if mchirp == 0.0:
396 m12 = 30.0
397 else:
398 m12 = 2**(0.2) * mchirp
399 (
400 em_bright.source_properties.si(m12, m12, 0.0, 0.0, snr)
401 |
402 gracedb.upload.s(
403 'em_bright.json', graceid,
404 'em bright complete', ['em_bright', 'public']
405 )
406 |
407 gracedb.create_label.si('EMBRIGHT_READY', graceid)
408 ).apply_async(priority=priority)
410 if alert['alert_type'] != 'log':
411 return
413 filename = alert['data']['filename']
415 # Pipeline is uploading a flat resultion skymap file
416 # Converting to a multiorder ones with proper name.
417 # FIXME: Remove block when CWB starts to upload skymaps
418 # in multiorder format
419 if filename.endswith('.fits.gz'):
420 new_filename = filename.replace('.fits.gz', '.multiorder.fits')
421 flatten_msg = (
422 'Multi-resolution FITS file created from '
423 '<a href="/api/events/{graceid}/files/'
424 '{filename}">{filename}</a>').format(
425 graceid=graceid, filename=filename)
426 tags = ['sky_loc', 'lvem', 'public']
427 (
428 gracedb.download.si(filename, graceid)
429 |
430 skymaps.unflatten.s(new_filename)
431 |
432 gracedb.upload.s(
433 new_filename, graceid, flatten_msg, tags)
434 |
435 gracedb.create_label.si('SKYMAP_READY', graceid)
436 ).apply_async(priority=priority)
439@igwn_alert.handler('superevent',
440 'mdc_superevent',
441 shared=False)
442def handle_posterior_samples(alert):
443 """Generate multi-resolution and flat-resolution FITS files and skymaps
444 from an uploaded HDF5 file containing posterior samples.
445 """
446 if alert['alert_type'] != 'log' or \
447 not alert['data']['filename'].endswith('.posterior_samples.hdf5'):
448 return
449 superevent_id = alert['uid']
450 filename = alert['data']['filename']
451 info = '{} {}'.format(alert['data']['comment'], filename)
452 prefix, _ = filename.rsplit('.posterior_samples.')
453 skymap_filename = f'{prefix}.multiorder.fits'
454 labels = ['pe', 'sky_loc']
455 ifos = superevents.get_instruments(alert['object']['preferred_event_data'])
457 (
458 gracedb.download.si(filename, superevent_id)
459 |
460 skymaps.skymap_from_samples.s(superevent_id, ifos)
461 |
462 group(
463 skymaps.annotate_fits.s(
464 skymap_filename, superevent_id, labels
465 ),
467 gracedb.upload.s(
468 skymap_filename, superevent_id,
469 'Multiresolution FITS file generated from "{}"'.format(info),
470 labels
471 )
472 )
473 ).delay()
475 # em_bright from LALInference posterior samples
476 (
477 gracedb.download.si(filename, superevent_id)
478 |
479 em_bright.em_bright_posterior_samples.s()
480 |
481 gracedb.upload.s(
482 '{}.em_bright.json'.format(prefix), superevent_id,
483 'em-bright computed from "{}"'.format(info),
484 'pe'
485 )
486 ).delay()
489@app.task(bind=True, shared=False)
490def _create_mattermost_channel(self, superevent_id):
491 """
492 Creates a mattermost channel when ADVREQ label is applied and
493 posts a cooresponding gracedb link of that event in the channel
495 Channel name : O4 RRT {superevent_id}
497 Parameters:
498 ------------
499 superevent_id: str
500 The superevent id
501 """
502 gracedb_url = self.app.conf['gracedb_host']
503 channel_creation.rrt_channel_creation(
504 superevent_id, gracedb_url)
507@app.task(shared=False)
508def _set_pipeline_preferred_events(pipeline_event, superevent_id):
509 """Return group for setting pipeline preferred event using
510 :meth:`gracedb.add_pipeline_preferred_event`.
512 Parameters
513 ----------
514 pipeline_event: dict
515 {pipeline: event_dict} key value pairs, returned by
516 :meth:`superevents.select_pipeline_preferred_event`.
518 superevent_id: str
519 The superevent id
520 """
521 return group(
522 gracedb.add_pipeline_preferred_event(superevent_id,
523 event['graceid'])
524 for event in pipeline_event.values()
525 )
528@app.task(shared=False, ignore_result=True)
529def _update_if_dqok(event, superevent_id):
530 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK`
531 label has been applied.
532 """
533 if 'DQOK' in gracedb.get_labels(superevent_id):
534 event_id = event['graceid']
535 gracedb.update_superevent(superevent_id,
536 preferred_event=event_id,
537 t_0=event["gpstime"])
538 gracedb.upload.delay(
539 None, None, superevent_id,
540 comment=f'DQOK applied based on new event {event_id}')
543@gracedb.task(shared=False)
544def _create_voevent(classification, *args, **kwargs):
545 r"""Create a VOEvent record from an EM bright JSON file.
547 Parameters
548 ----------
549 classification : tuple, None
550 A collection of JSON strings, generated by
551 :meth:`gwcelery.tasks.em_bright.source_properties` and
552 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or
553 content of ``{gstlal,mbta}.p_astro.json`` uploaded
554 by {gstlal,mbta} respectively; or None
555 \*args
556 Additional positional arguments passed to
557 :meth:`gwcelery.tasks.gracedb.create_voevent`.
558 \*\*kwargs
559 Additional keyword arguments passed to
560 :meth:`gwcelery.tasks.gracedb.create_voevent`.
562 Returns
563 -------
564 str
565 The filename of the newly created VOEvent.
567 """
568 kwargs = dict(kwargs)
570 if classification is not None:
571 # Merge source classification and source properties into kwargs.
572 for text in classification:
573 # Ignore filenames, only load dict in bytes form
574 if text is not None:
575 kwargs.update(json.loads(text))
577 # FIXME: These keys have differ between em_bright.json
578 # and the GraceDB REST API.
579 try:
580 kwargs['ProbHasNS'] = kwargs.pop('HasNS')
581 except KeyError:
582 pass
584 try:
585 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant')
586 except KeyError:
587 pass
589 skymap_filename = kwargs.get('skymap_filename')
590 if skymap_filename is not None:
591 skymap_type = re.sub(
592 r'(\.multiorder)?\.fits(\..+)?(,[0-9]+)?$', '', skymap_filename)
593 kwargs.setdefault('skymap_type', skymap_type)
595 # FIXME: remove ._orig_run when this bug is fixed:
596 # https://github.com/getsentry/sentry-python/issues/370
597 return gracedb.create_voevent._orig_run(*args, **kwargs)
600@app.task(shared=False)
601def _create_label_and_return_filename(filename, label, graceid):
602 gracedb.create_label.delay(label, graceid)
603 return filename
606@app.task(shared=False)
607def _leave_log_message_and_return_event_dict(event, superevent_id,
608 message, **kwargs):
609 """Wrapper around :meth:`gracedb.upload`
610 that returns the event dictionary.
611 """
612 gracedb.upload.delay(None, None, superevent_id, message, **kwargs)
613 return event
616@gracedb.task(shared=False)
617def _update_superevent_and_return_event_dict(event, superevent_id):
618 """Wrapper around :meth:`gracedb.update_superevent`
619 that returns the event dictionary.
620 """
621 gracedb.update_superevent(superevent_id,
622 preferred_event=event['graceid'],
623 t_0=event['gpstime'])
624 return event
627@gracedb.task(shared=False)
628def _proceed_if_not_blocked_by(files, superevent_id, block_by):
629 """Return files in case the superevent does not have labels `block_by`
631 Parameters
632 ----------
633 files : tuple
634 List of files
635 superevent_id : str
636 The superevent id corresponding to files
637 block_by : set
638 Set of blocking labels. E.g. `{'ADVOK', 'ADVNO'}`
639 """
640 superevent_labels = gracedb.get_labels(superevent_id)
641 blocking_labels = block_by.intersection(superevent_labels)
642 if blocking_labels:
643 gracedb.upload.delay(
644 None, None, superevent_id,
645 f"Blocking automated notice due to labels {blocking_labels}"
646 )
647 return None
648 else:
649 return files
652@gracedb.task(shared=False)
653def _revise_and_send_second_less_significant_alert(alert, query,
654 superevent_id):
655 superevent_labels = gracedb.get_labels(superevent_id)
656 blocking_labels = {
657 'ADVREQ', 'ADVOK', 'ADVNO',
658 superevents.SIGNIFICANT_LABEL,
659 superevents.EARLY_WARNING_LABEL,
660 }
661 if blocking_labels.intersection(superevent_labels):
662 return
664 (
665 gracedb.get_events.si(query)
666 |
667 superevents.select_preferred_event.s()
668 |
669 _update_superevent_and_return_event_dict.s(superevent_id)
670 |
671 _leave_log_message_and_return_event_dict.s(
672 superevent_id,
673 "Superevent cleaned up before second less-significant alert"
674 )
675 |
676 earlywarning_preliminary_alert.s(
677 alert, alert_type='less-significant')
678 ).delay()
680 # set pipeline preferred events
681 # FIXME: Ideally this should combined with the previous canvas.
682 # However, incorporating that group prevents canvas from executing
683 # maybe related to https://github.com/celery/celery/issues/7851
684 (
685 gracedb.get_events.si(query)
686 |
687 superevents.select_pipeline_preferred_event.s()
688 |
689 _set_pipeline_preferred_events.s(superevent_id)
690 ).delay()
693@app.task(shared=False)
694def _annotate_fits_and_return_input(input_list, superevent_id):
695 """Unpack the output of the skymap, embright, p-astro download group in the
696 beginning of the
697 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas
698 and call :meth:`~gwcelery.tasks.skymaps.annotate_fits`.
701 Parameters
702 ----------
703 input_list : list
704 The output of the group that downloads the skymap, embright, and
705 p-astro files. This list is in the form [skymap, skymap_filename],
706 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename],
707 though the em-bright and p-astro lists can be populated by Nones
708 superevent_id : str
709 A list of the sky map, em_bright, and p_astro filenames.
710 """
712 skymaps.annotate_fits_tuple(
713 input_list[0],
714 superevent_id,
715 ['sky_loc', 'public']
716 )
718 return input_list
721@gracedb.task(shared=False)
722def _unpack_args_and_send_earlywarning_preliminary_alert(input_list, alert,
723 alert_type):
724 """Unpack the output of the skymap, embright, p-astro download group in the
725 beginning of the
726 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas
727 and call
728 :meth:`gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`.
731 Parameters
732 ----------
733 input_list : list
734 The output of the group that downloads the skymap, embright, and
735 p-astro files. This list is in the form [skymap, skymap_filename],
736 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename],
737 though the em-bright and p-astro lists can be populated by Nones
738 alert : dict
739 IGWN-Alert dictionary
740 alert_type : str
741 alert_type passed to
742 :meth:`earlywarning_preliminary_initial_update_alert`
743 """
744 if input_list is None: # alert is blocked by blocking labels
745 return
747 [skymap, skymap_filename], [em_bright, em_bright_filename], \
748 [p_astro_dict, p_astro_filename] = input_list
750 # Update to latest state after downloading files
751 superevent = gracedb.get_superevent(alert['object']['superevent_id'])
753 earlywarning_preliminary_initial_update_alert.delay(
754 [skymap_filename, em_bright_filename, p_astro_filename],
755 superevent, alert_type,
756 filecontents=[skymap, em_bright, p_astro_dict]
757 )
760@app.task(ignore_result=True, shared=False)
761def earlywarning_preliminary_alert(event, alert, alert_type='preliminary',
762 initiate_voevent=True):
763 """Produce a preliminary alert by copying any sky maps.
765 This consists of the following steps:
767 1. Copy any sky maps and source classification from the preferred event
768 to the superevent.
769 2. Create standard annotations for sky maps including all-sky plots by
770 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`.
771 3. Create a preliminary VOEvent.
772 4. Send the VOEvent to GCN and notices to SCiMMA and GCN.
773 5. Apply the GCN_PRELIM_SENT or LOW_SIGNIF_PRELIM_SENT
774 depending on the significant or less-significant alert
775 respectively.
776 6. Create and upload a GCN Circular draft.
777 """
778 priority = 0 if superevents.should_publish(event) else 1
779 preferred_event_id = event['graceid']
780 superevent_id = alert['uid']
782 # Define alert payloads depending on group-pipeline-search
783 # cbc-*-* : p_astro/em_bright
784 # burst.cwb-bbh : p_astro/em_bright
785 # burst-*-* : NO p_astro/em_bright
786 alert_group = event['group'].lower()
787 alert_pipeline = event['pipeline'].lower()
788 alert_search = event['search'].lower()
789 if alert_pipeline == 'cwb' and alert_search == 'bbh':
790 skymap_filename = alert_pipeline + '.multiorder.fits'
791 p_astro_filename = alert_pipeline + '.p_astro.json'
792 em_bright_filename = 'em_bright.json'
793 elif alert_group == 'cbc':
794 skymap_filename = 'bayestar.multiorder.fits'
795 p_astro_filename = alert_pipeline + '.p_astro.json'
796 em_bright_filename = 'em_bright.json'
797 elif alert_group == 'burst':
798 skymap_filename = event['pipeline'].lower() + '.multiorder.fits'
799 p_astro_filename = None
800 em_bright_filename = None
801 else:
802 raise NotImplementedError(
803 'Valid skymap required for preliminary alert'
804 )
806 # Determine if the event should be made public.
807 is_publishable = (superevents.should_publish(
808 event, significant=alert_type != 'less-significant') and
809 {'DQV', 'INJ'}.isdisjoint(
810 event['labels']))
812 # Download files from events and upload to superevent with relevant
813 # annotations. Pass file contents down the chain so alerts task doesn't
814 # need to download them again.
815 # Note: this is explicitly made a chain to fix an issue described in #464.
816 canvas = chain(
817 group(
818 gracedb.download.si(skymap_filename, preferred_event_id)
819 |
820 group(
821 identity.s(),
823 gracedb.upload.s(
824 skymap_filename,
825 superevent_id,
826 message='Localization copied from {}'.format(
827 preferred_event_id),
828 tags=['sky_loc', 'public']
829 )
830 |
831 _create_label_and_return_filename.s('SKYMAP_READY',
832 superevent_id)
833 ),
835 (
836 gracedb.download.si(em_bright_filename, preferred_event_id)
837 |
838 group(
839 identity.s(),
841 gracedb.upload.s(
842 em_bright_filename,
843 superevent_id,
844 message='Source properties copied from {}'.format(
845 preferred_event_id),
846 tags=['em_bright', 'public']
847 )
848 |
849 _create_label_and_return_filename.s('EMBRIGHT_READY',
850 superevent_id)
851 )
852 ) if em_bright_filename is not None else
853 identity.s([None, None]),
855 (
856 gracedb.download.si(p_astro_filename, preferred_event_id)
857 |
858 group(
859 identity.s(),
861 gracedb.upload.s(
862 p_astro_filename,
863 superevent_id,
864 message='Source classification copied from {}'.format(
865 preferred_event_id),
866 tags=['p_astro', 'public']
867 )
868 |
869 _create_label_and_return_filename.s('PASTRO_READY',
870 superevent_id)
871 )
872 ) if p_astro_filename is not None else
873 identity.s([None, None])
874 )
875 |
876 # Need annotate skymap task in body of chord instead of header because
877 # this task simply calls another task, which is to be avoided in chord
878 # headers. Note that any group that chains to a task is automatically
879 # upgraded to a chord.
880 _annotate_fits_and_return_input.s(superevent_id)
881 )
883 # Switch for disabling all but MDC alerts.
884 if app.conf['only_alert_for_mdc']:
885 if event.get('search') != 'MDC':
886 canvas |= gracedb.upload.si(
887 None, None, superevent_id,
888 ("Skipping alert because gwcelery has been configured to only"
889 " send alerts for MDC events."))
890 canvas.apply_async(priority=priority)
891 return
893 # Send notice and upload GCN circular draft for online events.
894 if is_publishable and initiate_voevent:
895 # presence of advocate action blocks significant prelim alert
896 # presence of adv action or significant event blocks EW alert
897 # presence of adv action or significant event or EW event blocks
898 # less significant alert
899 if alert_type == 'earlywarning':
900 # Launch DQR for significant early warning events after a timeout.
901 # If a full BW significant trigger arrives before the end of the
902 # timeout, the latter will apply the label instead, and this call
903 # is a noop.
904 gracedb.create_label.si(
905 'DQR_REQUEST',
906 superevent_id
907 ).apply_async(countdown=600)
908 blocking_labels = (
909 {'ADVOK', 'ADVNO'} if alert_type == 'preliminary'
910 else
911 {superevents.SIGNIFICANT_LABEL, 'ADVOK', 'ADVNO'}
912 if alert_type == 'earlywarning'
913 else
914 {superevents.EARLY_WARNING_LABEL, superevents.SIGNIFICANT_LABEL,
915 'ADVOK', 'ADVNO'}
916 if alert_type == 'less-significant'
917 else
918 set()
919 )
920 canvas |= (
921 _proceed_if_not_blocked_by.s(superevent_id, blocking_labels)
922 |
923 _unpack_args_and_send_earlywarning_preliminary_alert.s(
924 alert, alert_type
925 )
926 )
928 canvas.apply_async(priority=priority)
931@gracedb.task(shared=False)
932def _get_pe_far_and_event(superevent):
933 """Return FAR and event input to PE workflow.
935 The input FAR is the lowest FAR among CBC and Burst-BBH triggers.
936 The input event is the preferred event if it is a CBC trigger, otherwise
937 the CBC trigger with the lowest FAR is returned.
938 """
939 # FIXME: remove ._orig_run when this bug is fixed:
940 # https://github.com/getsentry/sentry-python/issues/370
941 events = [
942 gracedb.get_event._orig_run(gid) for gid in superevent['gw_events']
943 ]
944 events = [
945 e for e in events if e['group'].lower() == 'cbc' or (
946 e['group'].lower() == 'burst' and
947 e.get('search', 'None').lower() == 'bbh'
948 )
949 ]
950 events.sort(key=lambda e: e['far'])
951 preferred_event = superevent['preferred_event_data']
953 if preferred_event['group'].lower() == 'cbc':
954 return events[0]['far'], preferred_event
955 for e in events:
956 if e['group'].lower() == 'cbc':
957 return events[0]['far'], e
958 return None
961@app.task(ignore_result=True, shared=False)
962def parameter_estimation(far_event, superevent_id, pe_pipeline):
963 """Parameter Estimation with Bilby and RapidPE-RIFT. Parameter estimation
964 runs are triggered for CBC triggers which pass the FAR threshold and are
965 not mock uploads. For those which do not pass these criteria, this task
966 uploads messages explaining why parameter estimation is not started.
967 """
968 if far_event is None:
969 gracedb.upload.delay(
970 filecontents=None, filename=None,
971 graceid=superevent_id,
972 message='Parameter estimation will not start since no CBC triggers'
973 ' are found.',
974 tags='pe'
975 )
976 return
977 far, event = far_event
978 search = event['search'].lower()
979 if search in app.conf['significant_alert_far_threshold']['cbc']:
980 threshold = (
981 app.conf['significant_alert_far_threshold']['cbc'][search] /
982 app.conf['significant_alert_trials_factor']['cbc'][search]
983 )
984 else:
985 # Fallback in case an event is uploaded to an unlisted search
986 threshold = -1 * float('inf')
987 if far > threshold:
988 gracedb.upload.delay(
989 filecontents=None, filename=None,
990 graceid=superevent_id,
991 message='Parameter estimation will not start since FAR is larger '
992 'than the PE threshold, {} Hz.'.format(threshold),
993 tags='pe'
994 )
995 elif search == 'mdc':
996 gracedb.upload.delay(
997 filecontents=None, filename=None,
998 graceid=superevent_id,
999 message='Parameter estimation will not start since parameter '
1000 'estimation is disabled for mock uploads.',
1001 tags='pe'
1002 )
1003 elif event.get('offline', False):
1004 gracedb.upload.delay(
1005 filecontents=None, filename=None,
1006 graceid=superevent_id,
1007 message='Parameter estimation will not start since parameter '
1008 'estimation is disabled for OFFLINE events.',
1009 tags='pe'
1010 )
1011 elif (
1012 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org'
1013 and event['pipeline'] == 'MBTA'
1014 ):
1015 # FIXME: Remove this block once multiple channels can be handled on
1016 # one gracedb instance
1017 gracedb.upload.delay(
1018 filecontents=None, filename=None,
1019 graceid=superevent_id,
1020 message='Parameter estimation is disabled for MBTA triggers '
1021 'on playground as MBTA analyses live data + online '
1022 'injections not O3 replay data + MDC injections',
1023 tags='pe'
1024 )
1025 elif pe_pipeline == 'rapidpe' and search == 'earlywarning':
1026 # Remove this if rapidpe can ingest early warning events
1027 gracedb.upload.delay(
1028 filecontents=None, filename=None,
1029 graceid=superevent_id,
1030 message='Parameter estimation by RapidPE-RIFT is disabled for '
1031 'earlywarning triggers.',
1032 tags='pe'
1033 )
1034 else:
1035 inference.start_pe.delay(event, superevent_id, pe_pipeline)
1038@gracedb.task(shared=False)
1039def earlywarning_preliminary_initial_update_alert(
1040 filenames,
1041 superevent,
1042 alert_type,
1043 filecontents=None
1044):
1045 """
1046 Create a canvas that sends an earlywarning, preliminary, initial, or update
1047 notice.
1049 Parameters
1050 ----------
1051 filenames : tuple
1052 A list of the sky map, em_bright, and p_astro filenames.
1053 superevent : dict
1054 The superevent dictionary, typically obtained from an IGWN Alert or
1055 from querying GraceDB.
1056 alert_type : {'less-significant', 'earlywarning', 'preliminary', 'initial', 'update'} # noqa: E501
1057 The alert type.
1059 Notes
1060 -----
1061 Tasks that call this function should be decorated with
1062 :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so
1063 that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is
1064 retried in the event of GraceDB API failures. If `EM_COINC` is in labels
1065 will create a RAVEN circular.
1067 """
1068 labels = superevent['labels']
1069 superevent_id = superevent['superevent_id']
1071 if 'INJ' in labels:
1072 return
1074 if filecontents:
1075 assert alert_type in {
1076 'less-significant', 'earlywarning', 'preliminary'}
1078 skymap_filename, em_bright_filename, p_astro_filename = filenames
1079 rapidpe_pastro_filename = None
1080 rapidpe_pastro_needed = True
1081 combined_skymap_filename = None
1082 combined_skymap_needed = False
1083 skymap_needed = (skymap_filename is None)
1084 em_bright_needed = (em_bright_filename is None)
1085 p_astro_needed = (p_astro_filename is None)
1086 raven_coinc = ('RAVEN_ALERT' in labels and bool(superevent['em_type']))
1087 if raven_coinc:
1088 ext_labels = gracedb.get_labels(superevent['em_type'])
1089 combined_skymap_needed = \
1090 {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(ext_labels))
1092 # FIXME: This if statement is always True, we should get rid of it.
1093 if skymap_needed or em_bright_needed or p_astro_needed or \
1094 combined_skymap_needed or rapidpe_pastro_needed:
1095 for message in gracedb.get_log(superevent_id):
1096 t = message['tag_names']
1097 f = message['filename']
1098 v = message['file_version']
1099 fv = '{},{}'.format(f, v)
1100 if not f:
1101 continue
1102 if skymap_needed \
1103 and {'sky_loc', 'public'}.issubset(t) \
1104 and f.endswith('.multiorder.fits') \
1105 and 'combined' not in f:
1106 skymap_filename = fv
1107 if em_bright_needed \
1108 and 'em_bright' in t \
1109 and f.endswith('.json'):
1110 em_bright_filename = fv
1111 if p_astro_needed \
1112 and {'p_astro', 'public'}.issubset(t) \
1113 and f.endswith('.json'):
1114 p_astro_filename = fv
1115 if combined_skymap_needed \
1116 and {'sky_loc', 'ext_coinc'}.issubset(t) \
1117 and f.startswith('combined-ext.') \
1118 and 'fit' in f:
1119 combined_skymap_filename = fv
1120 if rapidpe_pastro_needed \
1121 and {'p_astro', 'public'}.issubset(t) \
1122 and f == 'RapidPE_RIFT.p_astro.json':
1123 rapidpe_pastro_filename = fv
1125 if combined_skymap_needed:
1126 # for every alert, copy combined sky map over if applicable
1127 # FIXME: use file inheritance once available
1128 ext_id = superevent['em_type']
1129 if combined_skymap_filename:
1130 # If previous sky map, increase version by 1
1131 combined_skymap_filename_base, v = \
1132 combined_skymap_filename.split(',')
1133 v = str(int(v) + 1)
1134 combined_skymap_filename = \
1135 combined_skymap_filename_base + ',' + v
1136 else:
1137 combined_skymap_filename_base = \
1138 (external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER
1139 if '.multiorder.fits' in skymap_filename else
1140 external_skymaps.COMBINED_SKYMAP_FILENAME_FLAT)
1141 combined_skymap_filename = combined_skymap_filename_base + ',0'
1142 message = 'Combined LVK-external sky map copied from {0}'.format(
1143 ext_id)
1144 message_png = (
1145 'Mollweide projection of <a href="/api/events/{se_id}/files/'
1146 '{filename}">{filename}</a>, copied from {ext_id}').format(
1147 se_id=superevent_id,
1148 ext_id=ext_id,
1149 filename=combined_skymap_filename)
1151 combined_skymap_canvas = group(
1152 gracedb.download.si(combined_skymap_filename_base, ext_id)
1153 |
1154 gracedb.upload.s(
1155 combined_skymap_filename_base, superevent_id,
1156 message, ['sky_loc', 'ext_coinc', 'public'])
1157 |
1158 gracedb.create_label.si('COMBINEDSKYMAP_READY', superevent_id),
1160 gracedb.download.si(external_skymaps.COMBINED_SKYMAP_FILENAME_PNG,
1161 ext_id)
1162 |
1163 gracedb.upload.s(
1164 external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, superevent_id,
1165 message_png, ['sky_loc', 'ext_coinc', 'public']
1166 )
1167 |
1168 # Pass None to download_anor_expose group
1169 identity.si()
1170 )
1172 # circular template not needed for less-significant alerts
1173 if alert_type in {'earlywarning', 'preliminary', 'initial'}:
1174 if raven_coinc:
1175 circular_task = circulars.create_emcoinc_circular.si(superevent_id)
1176 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type)
1177 tags = ['em_follow', 'ext_coinc']
1179 else:
1180 circular_task = circulars.create_initial_circular.si(superevent_id)
1181 circular_filename = '{}-circular.txt'.format(alert_type)
1182 tags = ['em_follow']
1184 circular_canvas = (
1185 circular_task
1186 |
1187 gracedb.upload.s(
1188 circular_filename,
1189 superevent_id,
1190 'Template for {} GCN Circular'.format(alert_type),
1191 tags=tags)
1192 )
1194 else:
1195 circular_canvas = identity.si()
1197 # less-significant alerts have "preliminary" voevent notice type
1198 alert_type_voevent = 'preliminary' if alert_type == 'less-significant' \
1199 else alert_type
1200 # set the significant field in the VOEvent based on
1201 # less-significant/significant alert.
1202 # For kafka alerts the analogous field is set in alerts.py.
1203 # (see comment before defining kafka_alert_canvas)
1204 voevent_significance = 0 if alert_type == 'less-significant' else 1
1206 if filecontents and not combined_skymap_filename:
1207 skymap, em_bright, p_astro_dict = filecontents
1209 # check high profile and apply label if true
1210 if alert_type == 'preliminary':
1211 high_profile_canvas = rrt_utils.check_high_profile.si(
1212 skymap, em_bright, p_astro_dict, superevent
1213 )
1214 else:
1215 high_profile_canvas = identity.si()
1217 download_andor_expose_group = []
1218 if rapidpe_pastro_filename is None:
1219 voevent_canvas = _create_voevent.si(
1220 (em_bright, p_astro_dict),
1221 superevent_id,
1222 alert_type_voevent,
1223 Significant=voevent_significance,
1224 skymap_filename=skymap_filename,
1225 internal=False,
1226 open_alert=True,
1227 raven_coinc=raven_coinc,
1228 combined_skymap_filename=combined_skymap_filename
1229 )
1230 rapidpe_canvas = _update_rapidpe_pastro_shouldnt_run.s()
1232 # kafka alerts have a field called "significant" based on
1233 # https://dcc.ligo.org/LIGO-G2300151/public
1234 # The alert_type value passed to alerts.send is used to
1235 # set this field in the alert dictionary
1236 kafka_alert_canvas = alerts.send.si(
1237 (skymap, em_bright, p_astro_dict),
1238 superevent,
1239 alert_type,
1240 raven_coinc=raven_coinc
1241 )
1242 else:
1243 voevent_canvas = _create_voevent.s(
1244 superevent_id,
1245 alert_type_voevent,
1246 Significant=voevent_significance,
1247 skymap_filename=skymap_filename,
1248 internal=False,
1249 open_alert=True,
1250 raven_coinc=raven_coinc,
1251 combined_skymap_filename=combined_skymap_filename
1252 )
1253 download_andor_expose_group += [
1254 gracedb.download.si(rapidpe_pastro_filename, superevent_id)
1255 ]
1257 kafka_alert_canvas = _check_pastro_and_send_alert.s(
1258 skymap,
1259 em_bright,
1260 superevent,
1261 alert_type,
1262 raven_coinc=raven_coinc
1263 )
1265 rapidpe_canvas = (
1266 _update_rapidpe_pastro.s(
1267 em_bright=em_bright,
1268 pipeline_pastro=p_astro_dict)
1269 |
1270 _upload_rapidpe_pastro_json.s(
1271 superevent_id,
1272 rapidpe_pastro_filename
1273 )
1274 )
1275 else:
1276 # Download em_bright and p_astro files here for voevent
1277 download_andor_expose_group = [
1278 gracedb.download.si(em_bright_filename, superevent_id) if
1279 em_bright_filename is not None else identity.s(None),
1280 gracedb.download.si(p_astro_filename, superevent_id) if
1281 p_astro_filename is not None else identity.s(None),
1282 ]
1283 high_profile_canvas = identity.si()
1285 voevent_canvas = _create_voevent.s(
1286 superevent_id,
1287 alert_type_voevent,
1288 Significant=voevent_significance,
1289 skymap_filename=skymap_filename,
1290 internal=False,
1291 open_alert=True,
1292 raven_coinc=raven_coinc,
1293 combined_skymap_filename=combined_skymap_filename
1294 )
1296 if rapidpe_pastro_filename:
1297 download_andor_expose_group += [
1298 gracedb.download.si(rapidpe_pastro_filename, superevent_id)
1299 ]
1301 rapidpe_canvas = (
1302 _update_rapidpe_pastro.s()
1303 |
1304 _upload_rapidpe_pastro_json.s(
1305 superevent_id,
1306 rapidpe_pastro_filename
1307 )
1308 )
1310 # The skymap has not been downloaded at this point, so we need to
1311 # download it before we can assemble the kafka alerts and send them
1312 kafka_alert_canvas = alerts.download_skymap_and_send_alert.s(
1313 superevent,
1314 alert_type,
1315 skymap_filename=skymap_filename,
1316 raven_coinc=raven_coinc,
1317 combined_skymap_filename=combined_skymap_filename
1318 )
1320 to_expose = [skymap_filename, em_bright_filename, p_astro_filename]
1321 # Since PE skymap images, HTML, and gzip FITS are not made public when they
1322 # are uploaded, we need to expose them here.
1323 if (
1324 skymap_filename is not None and 'bilby' in skymap_filename.lower()
1325 ):
1326 prefix, _, _ = skymap_filename.partition('.multiorder.fits')
1327 to_expose += [f'{prefix}.html', f'{prefix}.png',
1328 f'{prefix}.volume.png', f'{prefix}.fits.gz']
1329 download_andor_expose_group += [
1330 gracedb.expose.si(superevent_id),
1331 *(
1332 gracedb.create_tag.si(filename, 'public', superevent_id)
1333 for filename in to_expose if filename is not None
1334 )
1335 ]
1337 voevent_canvas |= group(
1338 gracedb.download.s(superevent_id)
1339 |
1340 gcn.send.s(),
1342 gracedb.create_tag.s('public', superevent_id)
1343 )
1345 if combined_skymap_needed:
1346 download_andor_expose_group += [combined_skymap_canvas]
1348 sent_label_canvas = identity.si()
1349 if alert_type == 'less-significant':
1350 sent_label_canvas = gracedb.create_label.si(
1351 'LOW_SIGNIF_PRELIM_SENT',
1352 superevent_id
1353 )
1354 elif alert_type == 'preliminary':
1355 sent_label_canvas = gracedb.create_label.si(
1356 'GCN_PRELIM_SENT',
1357 superevent_id
1358 )
1360 # NOTE: The following canvas structure was used to fix #480
1361 canvas = (
1362 group(download_andor_expose_group)
1363 )
1364 if rapidpe_pastro_filename:
1365 canvas |= rapidpe_canvas
1367 canvas |= (
1368 group(
1369 voevent_canvas
1370 |
1371 group(
1372 circular_canvas,
1374 sent_label_canvas
1375 ),
1377 kafka_alert_canvas,
1379 high_profile_canvas
1380 )
1381 )
1383 canvas.apply_async()
1386@app.task(shared=False)
1387def _update_rapidpe_pastro(input_list, em_bright=None, pipeline_pastro=None):
1388 """
1389 If p_terr from rapidpe is different from the p_terr from the most
1390 recent preferred event, replaces rapidpe's p_terr with pipeline p_terr.
1391 Returns a tuple of em_bright, rapidpe pastro and a
1392 boolean(rapidpe_pastro_updated) indicating if rapidpe pastro has been
1393 updated. If p_terr in rapidpe has been updated, the return list contains
1394 the updated pastro and the rapidpe_pastro_updated is True. Else, the
1395 return list contains the rapidpe pastro from the input_list and
1396 rapidpe_pastro_updated is False.
1397 """
1398 # input_list is download_andor_expose_group in
1399 # function earlywarning_preliminary_initial_update_alert
1400 if pipeline_pastro is None:
1401 em_bright, pipeline_pastro, rapidpe_pastro, *_ = input_list
1402 else:
1403 rapidpe_pastro, *_ = input_list
1404 pipeline_pastro_contents = json.loads(pipeline_pastro)
1405 rapidpe_pastro_contents = json.loads(rapidpe_pastro)
1407 if (rapidpe_pastro_contents["Terrestrial"]
1408 == pipeline_pastro_contents["Terrestrial"]):
1409 rapidpe_pastro_updated = False
1410 else:
1411 rapidpe_pastro = json.dumps(
1412 rpe_pastro.renormalize_pastro_with_pipeline_pterr(
1413 rapidpe_pastro_contents, pipeline_pastro_contents
1414 )
1415 )
1416 rapidpe_pastro_updated = True
1418 return em_bright, rapidpe_pastro, rapidpe_pastro_updated
1421@app.task(shared=False)
1422def _update_rapidpe_pastro_shouldnt_run():
1423 raise RuntimeError(
1424 "The `rapidpe_canvas' was executed where it should"
1425 "not have been. A bug must have been introduced."
1426 )
1429@gracedb.task(shared=False)
1430def _upload_rapidpe_pastro_json(
1431 input_list,
1432 superevent_id,
1433 rapidpe_pastro_filename
1434):
1435 """
1436 Add public tag to RapidPE_RIFT.p_astro.json if p_terr from the
1437 preferred event is same as the p_terr in RapidPE_RIFT.p_astro.json.
1438 Else, uploads an updated version of RapidPE_RIFT.p_astro.json
1439 with file content from the task update_rapidpe_pastro.
1440 """
1441 # input_list is output from update_rapidpe_pastro
1442 *return_list, rapidpe_pastro_updated = input_list
1443 if rapidpe_pastro_updated is True:
1444 tags = ("pe", "p_astro", "public")
1446 upload_filename = "RapidPE_RIFT.p_astro.json"
1447 description = "RapidPE-RIFT Pastro results"
1448 content = input_list[1]
1449 gracedb.upload(
1450 content,
1451 upload_filename,
1452 superevent_id,
1453 description,
1454 tags
1455 )
1456 return return_list
1459@app.task(shared=False)
1460def _check_pastro_and_send_alert(input_classification, skymap, em_bright,
1461 superevent, alert_type, raven_coinc=False):
1462 """Wrapper for :meth:`~gwcelery.tasks.alerts.send` meant to take a
1463 potentially new p-astro as input from the preceding task.
1464 """
1465 _, p_astro = input_classification
1466 alerts.send.delay(
1467 (skymap, em_bright, p_astro),
1468 superevent,
1469 alert_type,
1470 raven_coinc=raven_coinc
1471 )
1474@gracedb.task(ignore_result=True, shared=False)
1475def initial_alert(filenames, alert):
1476 """Produce an initial alert.
1478 This does nothing more than call
1479 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`
1480 with ``alert_type='initial'``.
1482 Parameters
1483 ----------
1484 filenames : tuple
1485 A list of the sky map, em_bright, and p_astro filenames.
1486 alert : dict
1487 IGWN-Alert dictionary
1489 Notes
1490 -----
1491 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather
1492 than :obj:`gwcelery.app.task` so that a synchronous call to
1493 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB
1494 API failures.
1496 """
1497 earlywarning_preliminary_initial_update_alert(
1498 filenames,
1499 alert['object'],
1500 'initial'
1501 )
1504@gracedb.task(ignore_result=True, shared=False)
1505def update_alert(filenames, superevent_id):
1506 """Produce an update alert.
1508 This does nothing more than call
1509 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`
1510 with ``alert_type='update'``.
1512 Parameters
1513 ----------
1514 filenames : tuple
1515 A list of the sky map, em_bright, and p_astro filenames.
1516 superevent_id : str
1517 The superevent ID.
1519 Notes
1520 -----
1521 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather
1522 than :obj:`gwcelery.app.task` so that a synchronous call to
1523 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB
1524 API failures.
1526 """
1527 superevent = gracedb.get_superevent._orig_run(superevent_id)
1528 earlywarning_preliminary_initial_update_alert(
1529 filenames,
1530 superevent,
1531 'update'
1532 )
1535@app.task(ignore_result=True, shared=False)
1536def retraction_alert(alert):
1537 """Produce a retraction alert."""
1538 superevent_id = alert['uid']
1539 group(
1540 gracedb.expose.si(superevent_id)
1541 |
1542 group(
1543 _create_voevent.si(
1544 None, superevent_id, 'retraction',
1545 internal=False,
1546 open_alert=True
1547 )
1548 |
1549 group(
1550 gracedb.download.s(superevent_id)
1551 |
1552 gcn.send.s(),
1554 gracedb.create_tag.s('public', superevent_id)
1555 ),
1557 alerts.send.si(
1558 None,
1559 alert['object'],
1560 'retraction'
1561 )
1562 ),
1564 circulars.create_retraction_circular.si(superevent_id)
1565 |
1566 gracedb.upload.s(
1567 'retraction-circular.txt',
1568 superevent_id,
1569 'Template for retraction GCN Circular',
1570 tags=['em_follow']
1571 )
1572 ).apply_async()