Coverage for gwcelery/tasks/external_triggers.py: 100%
282 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1from pathlib import Path
2from urllib.parse import urlparse
4from astropy.time import Time
5from celery import group
6from celery.utils.log import get_logger
7from lxml import etree
9from .. import app
10from . import (alerts, detchar, external_skymaps, gcn, gracedb, igwn_alert,
11 raven)
13log = get_logger(__name__)
16REQUIRED_LABELS_BY_TASK = {
17 # EM_READY implies preferred event sky map is available
18 'compare': {'EM_READY', 'EXT_SKYMAP_READY', 'EM_COINC'},
19 'SoG': {'SKYMAP_READY', 'RAVEN_ALERT', 'ADVOK'}
20}
21"""These labels should be present on an external event to consider it to
22be ready for sky map comparison or for post-alert analysis, such as a
23measurment of the speed of gravity (SoG).
24"""
26FERMI_GRB_CLASS_VALUE = 4
27"""This is the index that denote GRBs within Fermi's Flight Position
28classification."""
30FERMI_GRB_CLASS_THRESH = 50
31"""This values denotes the threshold of the most likely Fermi source
32classification, above which we will consider a Fermi Flight Position
33notice."""
36@alerts.handler('snews',
37 queue='exttrig',
38 shared=False)
39def handle_snews_gcn(payload):
40 """Handles the GCN notice payload from SNEWS alerts.
42 Prepares the alert to be sent to graceDB as external events, updating the
43 info if it already exists.
45 Parameters
46 ----------
47 payload : str
48 XML GCN notice alert packet in string format
50 """
51 root = etree.fromstring(payload)
53 # Get TrigID and Test Event Boolean
54 trig_id = root.find("./What/Param[@name='TrigID']").attrib['value']
55 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
57 event_observatory = 'SNEWS'
58 if 'mdc-test_event' in root.attrib['ivorn'].lower():
59 search = 'MDC'
60 else:
61 search = 'Supernova'
62 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format(
63 event_observatory, trig_id)
65 (
66 gracedb.get_events.si(query=query)
67 |
68 _create_replace_external_event_and_skymap.s(
69 payload, search, event_observatory, ext_group=ext_group
70 )
71 ).delay()
74@alerts.handler('fermi_gbm_alert',
75 'fermi_gbm_flt_pos',
76 'fermi_gbm_gnd_pos',
77 'fermi_gbm_fin_pos',
78 'fermi_gbm_subthresh',
79 'swift_bat_grb_pos_ack',
80 'integral_wakeup',
81 'integral_refined',
82 'integral_offline',
83 queue='exttrig',
84 shared=False)
85def handle_grb_gcn(payload):
86 """Handles the payload from Fermi, Swift, and INTEGRAL GCN notices.
88 Filters out candidates likely to be noise. Creates external events
89 from the notice if new notice, otherwise updates existing event. Then
90 creates and/or grabs external sky map to be uploaded to the external event.
92 More info for these notices can be found at:
93 Fermi-GBM: https://gcn.gsfc.nasa.gov/fermi_grbs.html
94 Fermi-GBM sub: https://gcn.gsfc.nasa.gov/fermi_gbm_subthresh_archive.html
95 Swift: https://gcn.gsfc.nasa.gov/swift.html
96 INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html
98 Parameters
99 ----------
100 payload : str
101 XML GCN notice alert packet in string format
103 """
104 root = etree.fromstring(payload)
105 u = urlparse(root.attrib['ivorn'])
106 stream_path = u.path
108 stream_obsv_dict = {'/SWIFT': 'Swift',
109 '/Fermi': 'Fermi',
110 '/INTEGRAL': 'INTEGRAL'}
111 event_observatory = stream_obsv_dict[stream_path]
113 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
115 # Block Test INTEGRAL events on the production server to prevent
116 # unneeded queries of old GW data during detchar check
117 if event_observatory == 'INTEGRAL' and ext_group == 'Test' and \
118 app.conf['gracedb_host'] == 'gracedb.ligo.org':
119 return
120 # Get TrigID
121 elif event_observatory == 'INTEGRAL' and \
122 not any([x in u.fragment for x in ['O3-replay', 'MDC-test']]):
123 # FIXME: revert all this if INTEGRAL fixes their GCN notices
124 # If INTEGRAL, get trigger ID from ivorn rather than the TrigID field
125 # unless O3 replay or MDC event
126 trig_id = u.fragment.split('_')[-1].split('-')[0]
127 # Modify the TrigID field so GraceDB has the correct value
128 root.find("./What/Param[@name='TrigID']").attrib['value'] = \
129 str(trig_id).encode()
130 # Apply changes to payload delivered to GraceDB
131 payload = etree.tostring(root, xml_declaration=True, encoding="UTF-8")
132 else:
133 try:
134 trig_id = \
135 root.find("./What/Param[@name='TrigID']").attrib['value']
136 except AttributeError:
137 trig_id = \
138 root.find("./What/Param[@name='Trans_Num']").attrib['value']
140 notice_type = \
141 int(root.find("./What/Param[@name='Packet_Type']").attrib['value'])
143 reliability = root.find("./What/Param[@name='Reliability']")
144 if reliability is not None and int(reliability.attrib['value']) <= 4:
145 return
147 # Check if Fermi trigger is likely noise by checking classification
148 # Most_Likely_Index of 4 is an astrophysical GRB
149 # If not at least 50% chance of GRB we will not consider it for RAVEN
150 likely_source = root.find("./What/Param[@name='Most_Likely_Index']")
151 likely_prob = root.find("./What/Param[@name='Most_Likely_Prob']")
152 not_likely_grb = likely_source is not None and \
153 (likely_source.attrib['value'] != FERMI_GRB_CLASS_VALUE
154 or likely_prob.attrib['value'] < FERMI_GRB_CLASS_THRESH)
156 # Check if initial Fermi alert. These are generally unreliable and should
157 # never trigger a RAVEN alert, but will give us earlier warning of a
158 # possible coincidence. Later notices could change this.
159 initial_gbm_alert = notice_type == gcn.NoticeType.FERMI_GBM_ALERT
161 # Check if Swift has lost lock. If so then veto
162 lost_lock = \
163 root.find("./What/Group[@name='Solution_Status']" +
164 "/Param[@name='StarTrack_Lost_Lock']")
165 swift_veto = lost_lock is not None and lost_lock.attrib['value'] == 'true'
167 # Only send alerts if likely a GRB, is not a low-confidence early Fermi
168 # alert, and if not a Swift veto
169 if not_likely_grb or initial_gbm_alert or swift_veto:
170 label = 'NOT_GRB'
171 else:
172 label = None
174 ivorn = root.attrib['ivorn']
175 if 'subthresh' in ivorn.lower():
176 search = 'SubGRB'
177 elif 'mdc-test_event' in ivorn.lower():
178 search = 'MDC'
179 else:
180 search = 'GRB'
182 if search == 'SubGRB' and event_observatory == 'Fermi':
183 skymap_link = \
184 root.find("./What/Param[@name='HealPix_URL']").attrib['value']
185 else:
186 skymap_link = None
188 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format(
189 event_observatory, trig_id)
191 (
192 gracedb.get_events.si(query=query)
193 |
194 _create_replace_external_event_and_skymap.s(
195 payload, search, event_observatory,
196 ext_group=ext_group, label=label,
197 notice_date=root.find("./Who/Date").text,
198 notice_type=notice_type,
199 skymap_link=skymap_link,
200 use_radec=search in {'GRB', 'MDC'}
201 )
202 ).delay()
205@igwn_alert.handler('superevent',
206 'mdc_superevent',
207 'external_fermi',
208 'external_swift',
209 'external_integral',
210 shared=False)
211def handle_grb_igwn_alert(alert):
212 """Parse an IGWN alert message related to superevents/GRB external triggers
213 and dispatch it to other tasks.
215 Notes
216 -----
217 This IGWN alert message handler is triggered by creating a new superevent
218 or GRB external trigger event, a label associated with completeness of
219 skymaps or change in state, or if a sky map file is uploaded:
221 * New event/superevent triggers a coincidence search with
222 :meth:`gwcelery.tasks.raven.coincidence_search`.
223 * If other type of IGWN alert, pass to _handle_skymaps to decide whether
224 to re-run RAVEN pipeline based on labels or whether to add labels that
225 could start this process.
227 Parameters
228 ----------
229 alert : dict
230 IGWN alert packet
232 """
233 # Determine GraceDB ID
234 graceid = alert['uid']
236 # launch searches
237 if alert['alert_type'] == 'new':
238 if alert['object'].get('group') == 'External':
239 # launch search with MDC events and exit
240 if alert['object']['search'] == 'MDC':
241 raven.coincidence_search(graceid, alert['object'],
242 group='CBC', se_searches=['MDC'])
243 raven.coincidence_search(graceid, alert['object'],
244 group='Burst', se_searches=['MDC'])
245 return
247 elif alert['object']['search'] == 'SubGRB':
248 # Launch search with standard CBC
249 raven.coincidence_search(
250 graceid, alert['object'],
251 searches=['SubGRB'],
252 group='CBC',
253 pipelines=[alert['object']['pipeline']])
254 # Launch search with CWB BBH
255 raven.coincidence_search(
256 graceid, alert['object'],
257 searches=['SubGRB'],
258 group='Burst',
259 se_searches=['BBH'],
260 pipelines=[alert['object']['pipeline']])
261 elif alert['object']['search'] == 'SubGRBTargeted':
262 # if sub-threshold GRB, launch search with that pipeline
263 raven.coincidence_search(
264 graceid, alert['object'],
265 searches=['SubGRBTargeted'],
266 se_searches=['AllSky', 'BBH'],
267 pipelines=[alert['object']['pipeline']])
268 elif alert['object']['search'] == 'GRB':
269 # launch standard Burst-GRB search
270 raven.coincidence_search(graceid, alert['object'],
271 group='Burst', se_searches=['AllSky'])
273 # launch standard CBC-GRB search and similar BBH search
274 raven.coincidence_search(graceid, alert['object'],
275 group='CBC', searches=['GRB'])
276 raven.coincidence_search(graceid, alert['object'],
277 group='Burst', searches=['GRB'],
278 se_searches=['BBH'])
279 elif 'S' in graceid:
280 # launch standard GRB search based on group
281 gw_group = alert['object']['preferred_event_data']['group']
282 search = alert['object']['preferred_event_data']['search']
283 CBC_like = gw_group == 'CBC' or search == 'BBH'
285 # launch search with MDC events and exit
286 if search == 'MDC':
287 raven.coincidence_search(graceid, alert['object'],
288 group=gw_group, searches=['MDC'])
289 return
290 # Don't run search for IMBH Burst search
291 elif gw_group == 'Burst' and \
292 search.lower() not in {'allsky', 'bbh'}:
293 return
295 # Keep empty if CBC (field not needed), otherwise use AllSky or BBH
296 se_searches = ([] if gw_group == 'CBC' else
297 [alert['object']['preferred_event_data']['search']])
298 searches = (['SubGRB', 'SubGRBTargeted'] if CBC_like else
299 ['SubGRBTargeted'])
300 # launch standard GRB search
301 raven.coincidence_search(graceid, alert['object'],
302 group=gw_group, searches=['GRB'],
303 se_searches=se_searches)
304 # launch subthreshold search for Fermi and Swift separately to use
305 # different time windows, for both CBC and Burst
306 for pipeline in ['Fermi', 'Swift']:
307 raven.coincidence_search(
308 graceid, alert['object'], group=gw_group,
309 searches=searches, pipelines=[pipeline])
310 else:
311 _handle_skymaps(alert)
314def _handle_skymaps(alert):
315 """Parse an IGWN alert message related to superevents/GRB external triggers
316 and dispatch tasks related to re-running RAVEN pipeline due to new sky
317 maps.
319 Notes
320 -----
321 This IGWN alert message handler is triggered by a label associated with
322 completeness of skymaps or change in state, or if a sky map file is
323 uploaded:
325 * When both a GW and GRB sky map are available during a coincidence,
326 indicated by the labels ``EM_READY`` and ``EXT_SKYMAP_READY``
327 respectively on the external event, this triggers the spacetime coinc
328 FAR to be calculated and a combined GW-GRB sky map is created using
329 :meth:`gwcelery.tasks.external_skymaps.create_combined_skymap`.
330 * If new label indicates sky maps are available in the superevent,
331 apply to all associated external events.
332 * Re-run sky map comparison if complete, and also if either the GW or GRB
333 sky map has been updated or if the preferred event changed.
334 * Re-check RAVEN publishing conditions if the GRB was previously
335 considered non-astrophysical but now should be considered.
337 Parameters
338 ----------
339 alert : dict
340 IGWN alert packet
342 """
343 # Determine GraceDB ID
344 graceid = alert['uid']
346 # Define state variables
347 is_superevent = 'S' in graceid
348 is_external_event = alert['object'].get('group') == 'External'
349 is_coincidence = 'EM_COINC' in alert['object']['labels']
350 pe_ready = 'PE_READY' in alert['object']['labels']
352 # re-run raven pipeline and create combined sky map (if not a Swift event)
353 # when sky maps are available
354 if alert['alert_type'] == 'label_added' and is_external_event:
355 if _skymaps_are_ready(alert['object'], alert['data']['name'],
356 'compare'):
357 # if both sky maps present and a coincidence, re-run RAVEN
358 # pipeline and create combined sky maps
359 ext_event = alert['object']
360 superevent_id, ext_id = _get_superevent_ext_ids(
361 graceid, ext_event)
362 superevent = gracedb.get_superevent(superevent_id)
363 _relaunch_raven_pipeline_with_skymaps(
364 superevent, ext_event, graceid)
365 elif is_coincidence:
366 # if not complete, check if GW sky map; apply label to external
367 # event if GW sky map
368 se_labels = gracedb.get_labels(alert['object']['superevent'])
369 if 'SKYMAP_READY' in se_labels:
370 gracedb.create_label.si('SKYMAP_READY', graceid).delay()
371 if 'EM_READY' in se_labels:
372 gracedb.create_label.si('EM_READY', graceid).delay()
373 # apply labels from superevent to external event to update state
374 # and trigger functionality requiring sky maps, etc.
375 elif alert['alert_type'] == 'label_added' and is_superevent:
376 if 'SKYMAP_READY' in alert['object']['labels']:
377 # if sky map in superevent, apply label to all external events
378 # at the time
379 group(
380 gracedb.create_label.si('SKYMAP_READY', ext_id)
381 for ext_id in alert['object']['em_events']
382 ).delay()
383 if 'EM_READY' in alert['object']['labels']:
384 # if sky map not in superevent but in preferred event, apply label
385 # to all external events at the time
386 group(
387 gracedb.create_label.si('EM_READY', ext_id)
388 for ext_id in alert['object']['em_events']
389 ).delay()
390 if _skymaps_are_ready(alert['object'], alert['data']['name'], 'SoG') \
391 and alert['object']['space_coinc_far'] is not None:
392 # if a superevent is vetted by ADVOK and a spatial joint FAR is
393 # available, check if SoG publishing conditions are met
394 (
395 gracedb.get_event.si(alert['object']['em_type'])
396 |
397 raven.sog_paper_pipeline.s(alert['object'])
398 ).delay()
399 # if new GW or external sky map after first being available, try to remake
400 # combine sky map and rerun raven pipeline
401 elif alert['alert_type'] == 'log' and \
402 is_coincidence and \
403 'fit' in alert['data']['filename'] and \
404 'flat' not in alert['data']['comment'].lower() and \
405 (alert['data']['filename'] !=
406 external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER):
407 superevent_id, external_id = _get_superevent_ext_ids(
408 graceid, alert['object'])
409 # check if combined sky map already made, with the exception of Swift
410 # which will fail
411 if is_superevent:
412 superevent = alert['object']
413 # Rerun for all eligible external events
414 for ext_id in superevent['em_events']:
415 external_event = gracedb.get_event(ext_id)
416 if REQUIRED_LABELS_BY_TASK['compare'].issubset(
417 set(external_event['labels'])):
418 _relaunch_raven_pipeline_with_skymaps(
419 superevent, external_event, graceid,
420 use_superevent_skymap=True)
421 else:
422 superevent = gracedb.get_superevent(alert['object']['superevent'])
423 external_event = alert['object']
424 if REQUIRED_LABELS_BY_TASK['compare'].issubset(
425 set(external_event['labels'])):
426 _relaunch_raven_pipeline_with_skymaps(
427 superevent, external_event, graceid)
428 # Rerun the coincidence FAR calculation if possible with combined sky map
429 # if the preferred event changes
430 # We don't want to run this logic if PE results are present
431 elif alert['alert_type'] == 'log' and not pe_ready and is_coincidence:
432 new_log_comment = alert['data'].get('comment', '')
433 if is_superevent and \
434 new_log_comment.startswith('Updated superevent parameters: '
435 'preferred_event: '):
436 superevent = alert['object']
437 # Rerun for all eligible external events
438 for ext_id in superevent['em_events']:
439 external_event = gracedb.get_event(ext_id)
440 if REQUIRED_LABELS_BY_TASK['compare'].issubset(
441 set(external_event['labels'])):
442 _relaunch_raven_pipeline_with_skymaps(
443 superevent, external_event, graceid,
444 use_superevent_skymap=False)
445 elif alert['alert_type'] == 'label_removed' and is_external_event:
446 if alert['data']['name'] == 'NOT_GRB' and is_coincidence:
447 # if NOT_GRB is removed, re-check publishing conditions
448 superevent_id = alert['object']['superevent']
449 superevent = gracedb.get_superevent(superevent_id)
450 gw_group = superevent['preferred_event_data']['group']
451 coinc_far_dict = {
452 'temporal_coinc_far': superevent['time_coinc_far'],
453 'spatiotemporal_coinc_far': superevent['space_coinc_far']
454 }
455 raven.trigger_raven_alert(coinc_far_dict, superevent, graceid,
456 alert['object'], gw_group)
459@igwn_alert.handler('superevent',
460 'mdc_superevent',
461 'external_snews',
462 shared=False)
463def handle_snews_igwn_alert(alert):
464 """Parse an IGWN alert message related to superevents/Supernovae external
465 triggers and dispatch it to other tasks.
467 Notes
468 -----
469 This igwn_alert message handler is triggered whenever a new superevent
470 or Supernovae external event is created:
472 * New event triggers a coincidence search with
473 :meth:`gwcelery.tasks.raven.coincidence_search`.
475 Parameters
476 ----------
477 alert : dict
478 IGWN alert packet
480 """
481 # Determine GraceDB ID
482 graceid = alert['uid']
484 if alert['alert_type'] == 'new':
485 if alert['object'].get('superevent_id'):
486 group = alert['object']['preferred_event_data']['group']
487 search = alert['object']['preferred_event_data']['search']
488 searches = ['MDC'] if search == 'MDC' else ['Supernova']
489 se_searches = ['MDC'] if search == 'MDC' else ['AllSky']
490 # Run only on Test and Burst superevents
491 if group in {'Burst', 'Test'} and search in {'MDC', 'AllSky'}:
492 raven.coincidence_search(graceid, alert['object'],
493 group='Burst', searches=searches,
494 se_searches=se_searches,
495 pipelines=['SNEWS'])
496 else:
497 # Run on SNEWS event, either real or test
498 search = alert['object']['search']
499 if search == 'MDC':
500 raven.coincidence_search(graceid, alert['object'],
501 group='Burst', searches=['MDC'],
502 se_searches=['MDC'],
503 pipelines=['SNEWS'])
504 elif search == 'Supernova':
505 raven.coincidence_search(graceid, alert['object'],
506 group='Burst', searches=['Supernova'],
507 se_searches=['AllSky'],
508 pipelines=['SNEWS'])
511@alerts.handler('fermi_targeted',
512 'swift_targeted',
513 queue='exttrig',
514 shared=False)
515def handle_targeted_kafka_alert(alert):
516 """Parse an alert sent via Kafka from a MOU partner in our joint
517 subthreshold targeted search.
519 Parameters
520 ----------
521 alert : dict
522 Kafka alert packet
524 """
525 # Convert alert to VOEvent format
526 # FIXME: This is required until native ingesting of kafka events in GraceDB
527 payload, pipeline, time, trig_id = \
528 _kafka_to_voevent(alert, 'SubGRBTargeted')
530 # Veto events that don't pass GRB FAR threshold
531 far_grb = alert['far']
532 veto_event = \
533 app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] < far_grb
534 label = ('NOT_GRB' if alert['alert_type'] == "retraction" or veto_event
535 else None)
537 # Look whether a previous event with the same ID exists
538 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format(
539 pipeline, trig_id)
541 (
542 gracedb.get_events.si(query=query)
543 |
544 _create_replace_external_event_and_skymap.s(
545 payload, 'SubGRBTargeted', pipeline,
546 label=label, notice_date=time,
547 skymap=alert.get('healpix_file'),
548 use_radec=('ra' in alert and 'dec' in alert)
549 )
550 ).delay()
553def _skymaps_are_ready(event, label, task):
554 """Determine whether labels are complete to launch a certain task.
556 Parameters
557 ----------
558 event : dict
559 Either Superevent or external event dictionary
560 graceid : str
561 GraceDB ID
562 task : str
563 Determines which label schmema to check for completeness
565 Returns
566 -------
567 labels_pass : bool
568 True if all the require labels are present and the given label is part
569 of that set
570 """
571 label_set = set(event['labels'])
572 required_labels = REQUIRED_LABELS_BY_TASK[task]
573 return required_labels.issubset(label_set) and label in required_labels
576def _get_superevent_ext_ids(graceid, event):
577 """Grab superevent and external event IDs from a given event.
579 Parameters
580 ----------
581 graceid : str
582 GraceDB ID
583 event : dict
584 Either Superevent or external event dictionary
586 Returns
587 -------
588 se_id, ext_id : tuple
589 Tuple of superevent and external event GraceDB IDs
591 """
592 if 'S' in graceid:
593 se_id = event['superevent_id']
594 ext_id = event['em_type']
595 else:
596 se_id = event['superevent']
597 ext_id = event['graceid']
598 return se_id, ext_id
601@app.task(shared=False)
602def _launch_external_detchar(event):
603 """Launch detchar tasks for an external event.
605 Parameters
606 ----------
607 event : dict
608 External event dictionary
610 Returns
611 -------
612 event : dict
613 External event dictionary
615 """
616 start = event['gpstime']
617 if event['pipeline'] == 'SNEWS':
618 start, end = event['gpstime'], event['gpstime']
619 else:
620 integration_time = \
621 event['extra_attributes']['GRB']['trigger_duration'] or 4.0
622 end = start + integration_time
623 detchar.check_vectors.si(event, event['graceid'], start, end).delay()
625 return event
628def _relaunch_raven_pipeline_with_skymaps(superevent, ext_event, graceid,
629 use_superevent_skymap=None):
630 """Relaunch the RAVEN sky map comparison workflow, include recalculating
631 the joint FAR with updated sky map info and creating a new combined sky
632 map.
634 Parameters
635 ----------
636 superevent : dict
637 Superevent dictionary
638 exttrig : dict
639 External event dictionary
640 graceid : str
641 GraceDB ID of event
642 use_superevent_skymap : bool
643 If True/False, use/don't use skymap info from superevent.
644 Else if None, check SKYMAP_READY label in external event.
646 """
647 gw_group = superevent['preferred_event_data']['group']
648 tl, th = raven._time_window(
649 graceid, gw_group, [ext_event['pipeline']], [ext_event['search']],
650 [superevent['preferred_event_data']['search']])
651 # FIXME: both overlap integral and combined sky map could be
652 # done by the same function since they are so similar
653 use_superevent = (use_superevent_skymap
654 if use_superevent_skymap is not None else
655 'SKYMAP_READY' in ext_event['labels'])
656 canvas = raven.raven_pipeline.si(
657 [ext_event] if 'S' in graceid else [superevent],
658 graceid,
659 superevent if 'S' in graceid else ext_event,
660 tl, th, gw_group, use_superevent_skymap=use_superevent)
661 # Create new updated combined sky map
662 canvas |= external_skymaps.create_combined_skymap.si(
663 superevent['superevent_id'], ext_event['graceid'],
664 preferred_event=(
665 None if use_superevent
666 else superevent['preferred_event']))
667 canvas.delay()
670@app.task(shared=False,
671 queue='exttrig')
672def _create_replace_external_event_and_skymap(
673 events, payload, search, pipeline,
674 label=None, ext_group='External', notice_date=None, notice_type=None,
675 skymap=None, skymap_link=None, use_radec=False):
676 """Either create a new external event or replace an old one if applicable
677 Then either uploads a given sky map, try to download one given a link, or
678 create one given coordinates.
680 Parameters
681 ----------
682 events : list
683 List of external events sharing the same trigger ID
684 payload : str
685 VOEvent of event being considered
686 search : str
687 Search of external event
688 pipeline : str
689 Pipeline of external evevent
690 label : list
691 Label to be uploaded along with external event. If None, removes
692 'NOT_GRB' label from event
693 ext_group : str
694 Group of external event, 'External' or 'Test'
695 notice_date : str
696 External event trigger time in ISO format
697 notice_type : int
698 GCN notice type integer
699 skymap : str
700 Base64 encoded sky map
701 skymap_link : str
702 Link to external sky map to be downloaded
703 use_radec : bool
704 If True, try to create sky map using given coordinates
706 """
707 skymap_detchar_canvas = ()
708 # If previous event, try to append
709 if events and ext_group == 'External':
710 assert len(events) == 1, 'Found more than one matching GraceDB entry'
711 event, = events
712 graceid = event['graceid']
713 if label:
714 create_replace_canvas = gracedb.create_label.si(label, graceid)
715 else:
716 create_replace_canvas = gracedb.remove_label.si('NOT_GRB', graceid)
718 # Prevent SubGRBs from appending GRBs, also append if same search
719 if search == 'GRB' or search == event['search']:
720 # Replace event and pass already existing event dictionary
721 create_replace_canvas |= gracedb.replace_event.si(graceid, payload)
722 create_replace_canvas |= gracedb.get_event.si(graceid)
723 else:
724 # If not appending just exit
725 return
727 # If new event, create new entry in GraceDB and launch detchar
728 else:
729 create_replace_canvas = gracedb.create_event.si(
730 filecontents=payload,
731 search=search,
732 group=ext_group,
733 pipeline=pipeline,
734 labels=[label] if label else None)
735 skymap_detchar_canvas += _launch_external_detchar.s(),
737 # Use sky map if provided
738 if skymap:
739 skymap_detchar_canvas += \
740 external_skymaps.read_upload_skymap_from_base64.s(skymap),
741 # Otherwise upload one based on given info
742 else:
743 # Grab sky map from provided link
744 if skymap_link:
745 skymap_detchar_canvas += \
746 external_skymaps.get_upload_external_skymap.s(skymap_link),
747 # Otherwise if FINAL Fermi notice try to grab sky map
748 elif notice_type == gcn.NoticeType.FERMI_GBM_FIN_POS:
749 # Wait 10 min and then look for official Fermi sky map
750 skymap_detchar_canvas += \
751 external_skymaps.get_upload_external_skymap.s(None).set(
752 countdown=600),
753 # Otherwise create sky map from given coordinates
754 if use_radec:
755 skymap_detchar_canvas += \
756 external_skymaps.create_upload_external_skymap.s(
757 notice_type, notice_date),
759 (
760 create_replace_canvas
761 |
762 group(skymap_detchar_canvas)
763 ).delay()
766def _kafka_to_voevent(alert, search):
767 """Parse an alert in JSON format sent via Kafka from public GCN Notices or
768 from a MOU partner in our joint subthreshold targeted search, converting
769 to an equivalent XML string VOEvent.
771 Parameters
772 ----------
773 alert : dict
774 Kafka alert packet
775 search : list
776 External trigger search
778 Returns
779 -------
780 payload : str
781 XML GCN notice alert packet in string format
783 """
784 # Define basic values
785 pipeline = alert.get('mission')
786 if pipeline is None:
787 if 'instrument' not in alert:
788 raise ValueError("Alert does not contain pipeline information.")
789 elif alert['instrument'].lower() == 'wxt':
790 # FIXME: Replace with official GraceDB name once added
791 pipeline = 'EinsteinProbe'
793 # Get times, adding missing
794 start_time = alert['trigger_time']
795 alert_time = alert.get('alert_datetime', start_time)
796 # If missing, add character onto end
797 if not start_time.endswith('Z'):
798 start_time += 'Z'
799 if not alert_time.endswith('Z'):
800 alert_time += 'Z'
802 # Try to get FAR is there
803 far = alert.get('far')
805 # Go through list of possible values for durations, 0. if no keys match
806 duration = 0.
807 duration_params = ['rate_duration', 'image_duration']
808 for param in duration_params:
809 if param in alert.keys():
810 duration = alert[param]
811 break
813 # Form ID out of given list
814 id = '_'.join(str(x) for x in alert['id'])
815 if search == 'SubGRBTargeted':
816 # Use central time since starting time is not well measured
817 central_time = \
818 Time(start_time, format='isot', scale='utc').to_value('gps') + \
819 .5 * duration
820 trigger_time = \
821 str(Time(central_time, format='gps', scale='utc').isot) + 'Z'
822 else:
823 trigger_time = start_time
825 # sky localization may not be available
826 ra = alert.get('ra')
827 dec = alert.get('dec')
829 # Go through list of possible values for error, None if no keys match
830 error = None
831 error_params = ['dec_uncertainty', 'ra_uncertainty', 'ra_dec_error']
832 for param in error_params:
833 if param in alert.keys():
834 error = alert[param]
835 break
837 # If argument is list, get value
838 if isinstance(error, list):
839 error = error[0]
841 # if any missing sky map info, set to zeros so will be ignored later
842 if ra is None or dec is None or error is None:
843 ra, dec, error = 0., 0., 0.
845 # Load template
846 fname = \
847 str(Path(__file__).parent /
848 f'../tests/data/{pipeline.lower()}_{search.lower()}_template.xml')
849 root = etree.parse(fname)
851 # Update template values
852 # Change ivorn to indicate this is a subthreshold targeted event
853 root.xpath('.')[0].attrib['ivorn'] = \
854 'ivo://lvk.internal/{0}#{1}-{2}'.format(
855 pipeline.lower(),
856 ('targeted_subthreshold' if search == 'SubGRBTargeted'
857 else search.lower()),
858 trigger_time).encode()
860 # Change times to chosen time
861 root.find("./Who/Date").text = str(alert_time).encode()
862 root.find(("./WhereWhen/ObsDataLocation/"
863 "ObservationLocation/AstroCoords/Time/TimeInstant/"
864 "ISOTime")).text = str(trigger_time).encode()
866 # Update ID and duration
867 # SVOM has separate template, use different paths
868 if pipeline == 'SVOM':
869 root.find(("./What/Group[@name='Svom_Identifiers']"
870 "/Param[@name='Burst_Id']")).attrib['value'] = \
871 id.encode()
873 root.find(("./What/Group[@name='Detection_Info']"
874 "/Param[@name='Timescale']")).attrib['value'] = \
875 str(duration).encode()
876 # Every other type uses similar format
877 else:
878 root.find("./What/Param[@name='TrigID']").attrib['value'] = \
879 id.encode()
881 root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \
882 str(duration).encode()
884 if far is not None:
885 root.find("./What/Param[@name='FAR']").attrib['value'] = \
886 str(far).encode()
888 # Sky position
889 root.find(("./WhereWhen/ObsDataLocation/"
890 "ObservationLocation/AstroCoords/Position2D/Value2/"
891 "C1")).text = str(ra).encode()
892 root.find(("./WhereWhen/ObsDataLocation/"
893 "ObservationLocation/AstroCoords/Position2D/Value2/"
894 "C2")).text = str(dec).encode()
895 root.find(("./WhereWhen/ObsDataLocation/"
896 "ObservationLocation/AstroCoords/Position2D/"
897 "Error2Radius")).text = str(error).encode()
899 return (etree.tostring(root, xml_declaration=True, encoding="UTF-8",
900 pretty_print=True),
901 pipeline, trigger_time.replace('Z', ''), id)