Coverage for gwcelery/tasks/external_triggers.py: 100%
249 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1from pathlib import Path
2from urllib.parse import urlparse
4from astropy.time import Time
5from celery import group
6from celery.utils.log import get_logger
7from lxml import etree
9from .. import app
10from . import (alerts, detchar, external_skymaps, gcn, gracedb, igwn_alert,
11 raven)
13log = get_logger(__name__)
16REQUIRED_LABELS_BY_TASK = {
17 # EM_READY implies preferred event sky map is available
18 'compare': {'EM_READY', 'EXT_SKYMAP_READY', 'EM_COINC'},
19 'SoG': {'SKYMAP_READY', 'RAVEN_ALERT', 'ADVOK'}
20}
21"""These labels should be present on an external event to consider it to
22be ready for sky map comparison or for post-alert analysis, such as a
23measurment of the speed of gravity (SoG).
24"""
26FERMI_GRB_CLASS_VALUE = 4
27"""This is the index that denote GRBs within Fermi's Flight Position
28classification."""
30FERMI_GRB_CLASS_THRESH = 50
31"""This values denotes the threshold of the most likely Fermi source
32classification, above which we will consider a Fermi Flight Position
33notice."""
36@alerts.handler('snews',
37 queue='exttrig',
38 shared=False)
39def handle_snews_gcn(payload):
40 """Handles the GCN notice payload from SNEWS alerts.
42 Prepares the alert to be sent to graceDB as external events, updating the
43 info if it already exists.
45 Parameters
46 ----------
47 payload : str
48 XML GCN notice alert packet in string format
50 """
51 root = etree.fromstring(payload)
53 # Get TrigID and Test Event Boolean
54 trig_id = root.find("./What/Param[@name='TrigID']").attrib['value']
55 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
57 event_observatory = 'SNEWS'
58 if 'mdc-test_event' in root.attrib['ivorn'].lower():
59 search = 'MDC'
60 else:
61 search = 'Supernova'
62 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format(
63 event_observatory, trig_id)
65 (
66 gracedb.get_events.si(query=query)
67 |
68 _create_replace_external_event_and_skymap.s(
69 payload, search, event_observatory, ext_group=ext_group
70 )
71 ).delay()
74@alerts.handler('fermi_gbm_alert',
75 'fermi_gbm_flt_pos',
76 'fermi_gbm_gnd_pos',
77 'fermi_gbm_fin_pos',
78 'fermi_gbm_subthresh',
79 'swift_bat_grb_pos_ack',
80 'integral_wakeup',
81 'integral_refined',
82 'integral_offline',
83 queue='exttrig',
84 shared=False)
85def handle_grb_gcn(payload):
86 """Handles the payload from Fermi, Swift, and INTEGRAL GCN notices.
88 Filters out candidates likely to be noise. Creates external events
89 from the notice if new notice, otherwise updates existing event. Then
90 creates and/or grabs external sky map to be uploaded to the external event.
92 More info for these notices can be found at:
93 Fermi-GBM: https://gcn.gsfc.nasa.gov/fermi_grbs.html
94 Fermi-GBM sub: https://gcn.gsfc.nasa.gov/fermi_gbm_subthresh_archive.html
95 Swift: https://gcn.gsfc.nasa.gov/swift.html
96 INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html
98 Parameters
99 ----------
100 payload : str
101 XML GCN notice alert packet in string format
103 """
104 root = etree.fromstring(payload)
105 u = urlparse(root.attrib['ivorn'])
106 stream_path = u.path
108 stream_obsv_dict = {'/SWIFT': 'Swift',
109 '/Fermi': 'Fermi',
110 '/INTEGRAL': 'INTEGRAL'}
111 event_observatory = stream_obsv_dict[stream_path]
113 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
115 # Block Test INTEGRAL events on the production server to prevent
116 # unneeded queries of old GW data during detchar check
117 if event_observatory == 'INTEGRAL' and ext_group == 'Test' and \
118 app.conf['gracedb_host'] == 'gracedb.ligo.org':
119 return
120 # Get TrigID
121 elif event_observatory == 'INTEGRAL' and \
122 not any([x in u.fragment for x in ['O3-replay', 'MDC-test']]):
123 # FIXME: revert all this if INTEGRAL fixes their GCN notices
124 # If INTEGRAL, get trigger ID from ivorn rather than the TrigID field
125 # unless O3 replay or MDC event
126 trig_id = u.fragment.split('_')[-1].split('-')[0]
127 # Modify the TrigID field so GraceDB has the correct value
128 root.find("./What/Param[@name='TrigID']").attrib['value'] = \
129 str(trig_id).encode()
130 # Apply changes to payload delivered to GraceDB
131 payload = etree.tostring(root, xml_declaration=True, encoding="UTF-8")
132 else:
133 try:
134 trig_id = \
135 root.find("./What/Param[@name='TrigID']").attrib['value']
136 except AttributeError:
137 trig_id = \
138 root.find("./What/Param[@name='Trans_Num']").attrib['value']
140 notice_type = \
141 int(root.find("./What/Param[@name='Packet_Type']").attrib['value'])
143 reliability = root.find("./What/Param[@name='Reliability']")
144 if reliability is not None and int(reliability.attrib['value']) <= 4:
145 return
147 # Check if Fermi trigger is likely noise by checking classification
148 # Most_Likely_Index of 4 is an astrophysical GRB
149 # If not at least 50% chance of GRB we will not consider it for RAVEN
150 likely_source = root.find("./What/Param[@name='Most_Likely_Index']")
151 likely_prob = root.find("./What/Param[@name='Most_Likely_Prob']")
152 not_likely_grb = likely_source is not None and \
153 (likely_source.attrib['value'] != FERMI_GRB_CLASS_VALUE
154 or likely_prob.attrib['value'] < FERMI_GRB_CLASS_THRESH)
156 # Check if initial Fermi alert. These are generally unreliable and should
157 # never trigger a RAVEN alert, but will give us earlier warning of a
158 # possible coincidence. Later notices could change this.
159 initial_gbm_alert = notice_type == gcn.NoticeType.FERMI_GBM_ALERT
161 # Check if Swift has lost lock. If so then veto
162 lost_lock = \
163 root.find("./What/Group[@name='Solution_Status']" +
164 "/Param[@name='StarTrack_Lost_Lock']")
165 swift_veto = lost_lock is not None and lost_lock.attrib['value'] == 'true'
167 # Only send alerts if likely a GRB, is not a low-confidence early Fermi
168 # alert, and if not a Swift veto
169 if not_likely_grb or initial_gbm_alert or swift_veto:
170 label = 'NOT_GRB'
171 else:
172 label = None
174 ivorn = root.attrib['ivorn']
175 if 'subthresh' in ivorn.lower():
176 search = 'SubGRB'
177 elif 'mdc-test_event' in ivorn.lower():
178 search = 'MDC'
179 else:
180 search = 'GRB'
182 if search == 'SubGRB' and event_observatory == 'Fermi':
183 skymap_link = \
184 root.find("./What/Param[@name='HealPix_URL']").attrib['value']
185 else:
186 skymap_link = None
188 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format(
189 event_observatory, trig_id)
191 (
192 gracedb.get_events.si(query=query)
193 |
194 _create_replace_external_event_and_skymap.s(
195 payload, search, event_observatory,
196 ext_group=ext_group, label=label,
197 notice_date=root.find("./Who/Date").text,
198 notice_type=notice_type,
199 skymap_link=skymap_link,
200 use_radec=search in {'GRB', 'MDC'}
201 )
202 ).delay()
205@igwn_alert.handler('superevent',
206 'mdc_superevent',
207 'external_fermi',
208 'external_swift',
209 'external_integral',
210 shared=False)
211def handle_grb_igwn_alert(alert):
212 """Parse an IGWN alert message related to superevents/GRB external triggers
213 and dispatch it to other tasks.
215 Notes
216 -----
217 This IGWN alert message handler is triggered by creating a new superevent
218 or GRB external trigger event, a label associated with completeness of
219 skymaps or change in state, or if a sky map file is uploaded:
221 * New event/superevent triggers a coincidence search with
222 :meth:`gwcelery.tasks.raven.coincidence_search`.
223 * When both a GW and GRB sky map are available during a coincidence,
224 indicated by the labels ``EM_READY`` and ``EXT_SKYMAP_READY``
225 respectively on the external event, this triggers the spacetime coinc
226 FAR to be calculated and a combined GW-GRB sky map is created using
227 :meth:`gwcelery.tasks.external_skymaps.create_combined_skymap`.
228 * Re-run sky map comparison if complete, and either the GW or GRB sky
229 map has been updated or if the preferred event changed.
230 * Re-check RAVEN publishing conditions if the GRB was previously
231 considered non-astrophycial but now should be considered.
233 Parameters
234 ----------
235 alert : dict
236 IGWN alert packet
238 """
239 # Determine GraceDB ID
240 graceid = alert['uid']
242 # launch searches
243 if alert['alert_type'] == 'new':
244 if alert['object'].get('group') == 'External':
245 # launch search with MDC events and exit
246 if alert['object']['search'] == 'MDC':
247 raven.coincidence_search(graceid, alert['object'],
248 group='CBC', se_searches=['MDC'])
249 raven.coincidence_search(graceid, alert['object'],
250 group='Burst', se_searches=['MDC'])
251 return
253 if alert['object']['search'] in ['SubGRB', 'SubGRBTargeted']:
254 # if sub-threshold GRB, launch search with that pipeline
255 raven.coincidence_search(
256 graceid, alert['object'],
257 searches=['SubGRB', 'SubGRBTargeted'],
258 se_searches=['AllSky', 'BBH'],
259 pipelines=[alert['object']['pipeline']])
260 else:
261 # launch standard Burst-GRB search
262 raven.coincidence_search(graceid, alert['object'],
263 group='Burst', se_searches=['AllSky'])
265 # launch standard CBC-GRB search and similar BBH search
266 raven.coincidence_search(graceid, alert['object'],
267 group='CBC', searches=['GRB'])
268 raven.coincidence_search(graceid, alert['object'],
269 group='Burst', searches=['GRB'],
270 se_searches=['BBH'])
271 elif 'S' in graceid:
272 # launch standard GRB search based on group
273 gw_group = alert['object']['preferred_event_data']['group']
274 search = alert['object']['preferred_event_data']['search']
275 CBC_like = gw_group == 'CBC' or search == 'BBH'
277 # launch search with MDC events and exit
278 if search == 'MDC':
279 raven.coincidence_search(graceid, alert['object'],
280 group=gw_group, searches=['MDC'])
281 return
282 # Don't run search for IMBH Burst search
283 elif gw_group == 'Burst' and \
284 search.lower() not in {'allsky', 'bbh'}:
285 return
287 # Keep empty if CBC (field not needed), otherwise use AllSky or BBH
288 se_searches = ([] if gw_group == 'CBC' else
289 [alert['object']['preferred_event_data']['search']])
290 searches = (['SubGRB', 'SubGRBTargeted'] if CBC_like else
291 ['SubGRBTargeted'])
292 # launch standard GRB search
293 raven.coincidence_search(graceid, alert['object'],
294 group=gw_group, searches=['GRB'],
295 se_searches=se_searches)
296 # launch subthreshold search for Fermi and Swift separately to use
297 # different time windows, for both CBC and Burst
298 for pipeline in ['Fermi', 'Swift']:
299 raven.coincidence_search(
300 graceid, alert['object'], group=gw_group,
301 searches=searches, pipelines=[pipeline])
302 # re-run raven pipeline and create combined sky map (if not a Swift event)
303 # when sky maps are available
304 elif alert['alert_type'] == 'label_added' and \
305 alert['object'].get('group') == 'External':
306 if _skymaps_are_ready(alert['object'], alert['data']['name'],
307 'compare'):
308 # if both sky maps present and a coincidence, re-run RAVEN
309 # pipeline and create combined sky maps
310 ext_event = alert['object']
311 superevent_id, ext_id = _get_superevent_ext_ids(
312 graceid, ext_event)
313 superevent = gracedb.get_superevent(superevent_id)
314 _relaunch_raven_pipeline_with_skymaps(
315 superevent, ext_event, graceid)
316 elif 'EM_COINC' in alert['object']['labels']:
317 # if not complete, check if GW sky map; apply label to external
318 # event if GW sky map
319 se_labels = gracedb.get_labels(alert['object']['superevent'])
320 if 'SKYMAP_READY' in se_labels:
321 gracedb.create_label.si('SKYMAP_READY', graceid).delay()
322 if 'EM_READY' in se_labels:
323 gracedb.create_label.si('EM_READY', graceid).delay()
324 # apply labels from superevent to external event to update state
325 # and trigger functionality requiring sky maps, etc.
326 elif alert['alert_type'] == 'label_added' and 'S' in graceid:
327 if 'SKYMAP_READY' in alert['object']['labels']:
328 # if sky map in superevent, apply label to all external events
329 # at the time
330 group(
331 gracedb.create_label.si('SKYMAP_READY', ext_id)
332 for ext_id in alert['object']['em_events']
333 ).delay()
334 if 'EM_READY' in alert['object']['labels']:
335 # if sky map not in superevent but in preferred event, apply label
336 # to all external events at the time
337 group(
338 gracedb.create_label.si('EM_READY', ext_id)
339 for ext_id in alert['object']['em_events']
340 ).delay()
341 if _skymaps_are_ready(alert['object'], alert['data']['name'], 'SoG') \
342 and alert['object']['space_coinc_far'] is not None:
343 # if a superevent is vetted by ADVOK and a spatial joint FAR is
344 # available, check if SoG publishing conditions are met
345 (
346 gracedb.get_event.si(alert['object']['em_type'])
347 |
348 raven.sog_paper_pipeline.s(alert['object'])
349 ).delay()
350 # if new GW or external sky map after first being available, try to remake
351 # combine sky map and rerun raven pipeline
352 elif alert['alert_type'] == 'log' and \
353 'EM_COINC' in alert['object']['labels'] and \
354 'fit' in alert['data']['filename'] and \
355 'flat' not in alert['data']['comment'].lower() and \
356 (alert['data']['filename'] !=
357 external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER):
358 superevent_id, external_id = _get_superevent_ext_ids(
359 graceid, alert['object'])
360 if 'S' in graceid:
361 superevent = alert['object']
362 else:
363 superevent = gracedb.get_superevent(alert['object']['superevent'])
364 external_event = alert['object']
365 # check if combined sky map already made, with the exception of Swift
366 # which will fail
367 if 'S' in graceid:
368 # Rerun for all eligible external events
369 for ext_id in superevent['em_events']:
370 external_event = gracedb.get_event(ext_id)
371 if REQUIRED_LABELS_BY_TASK['compare'].issubset(
372 set(external_event['labels'])):
373 _relaunch_raven_pipeline_with_skymaps(
374 superevent, external_event, graceid,
375 use_superevent_skymap=True)
376 else:
377 if REQUIRED_LABELS_BY_TASK['compare'].issubset(
378 set(external_event['labels'])):
379 _relaunch_raven_pipeline_with_skymaps(
380 superevent, external_event, graceid)
381 # Rerun the coincidence FAR calculation if possible with combined sky map
382 # if the preferred event changes
383 # We don't want to run this logic if PE results are present
384 elif alert['alert_type'] == 'log' and \
385 'PE_READY' not in alert['object']['labels'] and \
386 'EM_COINC' in alert['object']['labels']:
387 new_log_comment = alert['data'].get('comment', '')
388 if 'S' in graceid and \
389 new_log_comment.startswith('Updated superevent parameters: '
390 'preferred_event: '):
391 superevent = alert['object']
392 # Rerun for all eligible external events
393 for ext_id in superevent['em_events']:
394 external_event = gracedb.get_event(ext_id)
395 if REQUIRED_LABELS_BY_TASK['compare'].issubset(
396 set(external_event['labels'])):
397 _relaunch_raven_pipeline_with_skymaps(
398 superevent, external_event, graceid,
399 use_superevent_skymap=False)
400 elif alert['alert_type'] == 'label_removed' and \
401 alert['object'].get('group') == 'External':
402 if alert['data']['name'] == 'NOT_GRB' and \
403 'EM_COINC' in alert['object']['labels']:
404 # if NOT_GRB is removed, re-check publishing conditions
405 superevent_id = alert['object']['superevent']
406 superevent = gracedb.get_superevent(superevent_id)
407 gw_group = superevent['preferred_event_data']['group']
408 coinc_far_dict = {
409 'temporal_coinc_far': superevent['time_coinc_far'],
410 'spatiotemporal_coinc_far': superevent['space_coinc_far']
411 }
412 raven.trigger_raven_alert(coinc_far_dict, superevent, graceid,
413 alert['object'], gw_group)
416@igwn_alert.handler('superevent',
417 'mdc_superevent',
418 'external_snews',
419 shared=False)
420def handle_snews_igwn_alert(alert):
421 """Parse an IGWN alert message related to superevents/Supernovae external
422 triggers and dispatch it to other tasks.
424 Notes
425 -----
426 This igwn_alert message handler is triggered whenever a new superevent
427 or Supernovae external event is created:
429 * New event triggers a coincidence search with
430 :meth:`gwcelery.tasks.raven.coincidence_search`.
432 Parameters
433 ----------
434 alert : dict
435 IGWN alert packet
437 """
438 # Determine GraceDB ID
439 graceid = alert['uid']
441 if alert['alert_type'] == 'new':
442 if alert['object'].get('superevent_id'):
443 group = alert['object']['preferred_event_data']['group']
444 search = alert['object']['preferred_event_data']['search']
445 searches = ['MDC'] if search == 'MDC' else ['Supernova']
446 se_searches = ['MDC'] if search == 'MDC' else ['AllSky']
447 # Run only on Test and Burst superevents
448 if group in {'Burst', 'Test'} and search in {'MDC', 'AllSky'}:
449 raven.coincidence_search(graceid, alert['object'],
450 group='Burst', searches=searches,
451 se_searches=se_searches,
452 pipelines=['SNEWS'])
453 else:
454 # Run on SNEWS event, either real or test
455 search = alert['object']['search']
456 if search == 'MDC':
457 raven.coincidence_search(graceid, alert['object'],
458 group='Burst', searches=['MDC'],
459 se_searches=['MDC'],
460 pipelines=['SNEWS'])
461 elif search == 'Supernova':
462 raven.coincidence_search(graceid, alert['object'],
463 group='Burst', searches=['Supernova'],
464 se_searches=['AllSky'],
465 pipelines=['SNEWS'])
468@alerts.handler('fermi_targeted',
469 'swift_targeted',
470 queue='exttrig',
471 shared=False)
472def handle_targeted_kafka_alert(alert):
473 """Parse an alert sent via Kafka from a MOU partner in our joint
474 subthreshold targeted search.
476 Parameters
477 ----------
478 alert : dict
479 Kafka alert packet
481 """
482 # Convert alert to VOEvent format
483 # FIXME: This is required until native ingesting of kafka events in GraceDB
484 payload, pipeline, time, trig_id = _kafka_to_voevent(alert)
486 # Veto events that don't pass GRB FAR threshold
487 far_grb = alert['far']
488 veto_event = \
489 app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] < far_grb
490 label = ('NOT_GRB' if alert['alert_type'] == "retraction" or veto_event
491 else None)
493 # Look whether a previous event with the same ID exists
494 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format(
495 pipeline, trig_id)
497 (
498 gracedb.get_events.si(query=query)
499 |
500 _create_replace_external_event_and_skymap.s(
501 payload, 'SubGRBTargeted', pipeline,
502 label=label, notice_date=time,
503 skymap=alert.get('healpix_file'),
504 use_radec=('ra' in alert and 'dec' in alert)
505 )
506 ).delay()
509def _skymaps_are_ready(event, label, task):
510 """Determine whether labels are complete to launch a certain task.
512 Parameters
513 ----------
514 event : dict
515 Either Superevent or external event dictionary
516 graceid : str
517 GraceDB ID
518 task : str
519 Determines which label schmema to check for completeness
521 Returns
522 -------
523 labels_pass : bool
524 True if all the require labels are present and the given label is part
525 of that set
526 """
527 label_set = set(event['labels'])
528 required_labels = REQUIRED_LABELS_BY_TASK[task]
529 return required_labels.issubset(label_set) and label in required_labels
532def _get_superevent_ext_ids(graceid, event):
533 """Grab superevent and external event IDs from a given event.
535 Parameters
536 ----------
537 graceid : str
538 GraceDB ID
539 event : dict
540 Either Superevent or external event dictionary
542 Returns
543 -------
544 se_id, ext_id : tuple
545 Tuple of superevent and external event GraceDB IDs
547 """
548 if 'S' in graceid:
549 se_id = event['superevent_id']
550 ext_id = event['em_type']
551 else:
552 se_id = event['superevent']
553 ext_id = event['graceid']
554 return se_id, ext_id
557@app.task(shared=False)
558def _launch_external_detchar(event):
559 """Launch detchar tasks for an external event.
561 Parameters
562 ----------
563 event : dict
564 External event dictionary
566 Returns
567 -------
568 event : dict
569 External event dictionary
571 """
572 start = event['gpstime']
573 if event['pipeline'] == 'SNEWS':
574 start, end = event['gpstime'], event['gpstime']
575 else:
576 integration_time = \
577 event['extra_attributes']['GRB']['trigger_duration'] or 4.0
578 end = start + integration_time
579 detchar.check_vectors.si(event, event['graceid'], start, end).delay()
581 return event
584def _relaunch_raven_pipeline_with_skymaps(superevent, ext_event, graceid,
585 use_superevent_skymap=None):
586 """Relaunch the RAVEN sky map comparison workflow, include recalculating
587 the joint FAR with updated sky map info and creating a new combined sky
588 map.
590 Parameters
591 ----------
592 superevent : dict
593 Superevent dictionary
594 exttrig : dict
595 External event dictionary
596 graceid : str
597 GraceDB ID of event
598 use_superevent_skymap : bool
599 If True/False, use/don't use skymap info from superevent.
600 Else if None, check SKYMAP_READY label in external event.
602 """
603 gw_group = superevent['preferred_event_data']['group']
604 tl, th = raven._time_window(
605 graceid, gw_group, [ext_event['pipeline']], [ext_event['search']],
606 [superevent['preferred_event_data']['search']])
607 # FIXME: both overlap integral and combined sky map could be
608 # done by the same function since they are so similar
609 use_superevent = (use_superevent_skymap
610 if use_superevent_skymap is not None else
611 'SKYMAP_READY' in ext_event['labels'])
612 canvas = raven.raven_pipeline.si(
613 [ext_event] if 'S' in graceid else [superevent],
614 graceid,
615 superevent if 'S' in graceid else ext_event,
616 tl, th, gw_group, use_superevent_skymap=use_superevent)
617 # Create new updated combined sky map
618 canvas |= external_skymaps.create_combined_skymap.si(
619 superevent['superevent_id'], ext_event['graceid'],
620 preferred_event=(
621 None if use_superevent
622 else superevent['preferred_event']))
623 canvas.delay()
626@app.task(shared=False,
627 queue='exttrig')
628def _create_replace_external_event_and_skymap(
629 events, payload, search, pipeline,
630 label=None, ext_group='External', notice_date=None, notice_type=None,
631 skymap=None, skymap_link=None, use_radec=False):
632 """Either create a new external event or replace an old one if applicable
633 Then either uploads a given sky map, try to download one given a link, or
634 create one given coordinates.
636 Parameters
637 ----------
638 events : list
639 List of external events sharing the same trigger ID
640 payload : str
641 VOEvent of event being considered
642 search : str
643 Search of external event
644 pipeline : str
645 Pipeline of external evevent
646 label : list
647 Label to be uploaded along with external event. If None, removes
648 'NOT_GRB' label from event
649 ext_group : str
650 Group of external event, 'External' or 'Test'
651 notice_date : str
652 External event trigger time in ISO format
653 notice_type : int
654 GCN notice type integer
655 skymap : str
656 Base64 encoded sky map
657 skymap_link : str
658 Link to external sky map to be downloaded
659 use_radec : bool
660 If True, try to create sky map using given coordinates
662 """
663 skymap_detchar_canvas = ()
664 # If previous event, try to append
665 if events and ext_group == 'External':
666 assert len(events) == 1, 'Found more than one matching GraceDB entry'
667 event, = events
668 graceid = event['graceid']
669 if label:
670 create_replace_canvas = gracedb.create_label.si(label, graceid)
671 else:
672 create_replace_canvas = gracedb.remove_label.si('NOT_GRB', graceid)
674 # Prevent SubGRBs from appending GRBs, also append if same search
675 if search == 'GRB' or search == event['search']:
676 # Replace event and pass already existing event dictionary
677 create_replace_canvas |= gracedb.replace_event.si(graceid, payload)
678 create_replace_canvas |= gracedb.get_event.si(graceid)
679 else:
680 # If not appending just exit
681 return
683 # If new event, create new entry in GraceDB and launch detchar
684 else:
685 create_replace_canvas = gracedb.create_event.si(
686 filecontents=payload,
687 search=search,
688 group=ext_group,
689 pipeline=pipeline,
690 labels=[label] if label else None)
691 skymap_detchar_canvas += _launch_external_detchar.s(),
693 # Use sky map if provided
694 if skymap:
695 skymap_detchar_canvas += \
696 external_skymaps.read_upload_skymap_from_base64.s(skymap),
697 # Otherwise upload one based on given info
698 else:
699 # Grab sky map from provided link
700 if skymap_link:
701 skymap_detchar_canvas += \
702 external_skymaps.get_upload_external_skymap.s(skymap_link),
703 # Otherwise if FINAL Fermi notice try to grab sky map
704 elif notice_type == gcn.NoticeType.FERMI_GBM_FIN_POS:
705 # Wait 10 min and then look for official Fermi sky map
706 skymap_detchar_canvas += \
707 external_skymaps.get_upload_external_skymap.s(None).set(
708 countdown=600),
709 # Otherwise create sky map from given coordinates
710 if use_radec:
711 skymap_detchar_canvas += \
712 external_skymaps.create_upload_external_skymap.s(
713 notice_type, notice_date),
715 (
716 create_replace_canvas
717 |
718 group(skymap_detchar_canvas)
719 ).delay()
722def _kafka_to_voevent(alert):
723 """Parse an alert sent via Kafka from a MOU partner in our joint
724 subthreshold targeted search and convert to an equivalent XML string
725 GCN VOEvent.
727 Parameters
728 ----------
729 alert : dict
730 Kafka alert packet
732 Returns
733 -------
734 payload : str
735 XML GCN notice alert packet in string format
737 """
738 # Define basic values
739 pipeline = alert['mission']
740 start_time = alert['trigger_time']
741 alert_time = alert['alert_datetime']
742 far = alert['far']
743 duration = alert['rate_duration']
744 id = '_'.join(str(x) for x in alert['id'])
745 # Use central time since starting time is not well measured
746 central_time = \
747 Time(start_time, format='isot', scale='utc').to_value('gps') + \
748 .5 * duration
749 trigger_time = \
750 str(Time(central_time, format='gps', scale='utc').isot) + 'Z'
752 # sky localization may not be available
753 ra = alert.get('ra')
754 dec = alert.get('dec')
755 # Try to get dec first then ra, None if both misssing
756 error = alert.get('dec_uncertainty')
757 if error is None:
758 error = alert.get('ra_uncertainty')
759 # Argument should be list if not None
760 if isinstance(error, list):
761 error = error[0]
762 # if any missing sky map info, set to zeros so will be ignored later
763 if ra is None or dec is None or error is None:
764 ra, dec, error = 0., 0., 0.
766 # Load template
767 fname = str(Path(__file__).parent /
768 '../tests/data/{}_subgrbtargeted_template.xml'.format(
769 pipeline.lower()))
770 root = etree.parse(fname)
772 # Update template values
773 # Change ivorn to indicate this is a subthreshold targeted event
774 root.xpath('.')[0].attrib['ivorn'] = \
775 'ivo://lvk.internal/{0}#targeted_subthreshold-{1}'.format(
776 pipeline.lower(), trigger_time).encode()
778 # Update ID
779 root.find("./What/Param[@name='TrigID']").attrib['value'] = \
780 id.encode()
782 # Change times to chosen time
783 root.find("./Who/Date").text = str(alert_time).encode()
784 root.find(("./WhereWhen/ObsDataLocation/"
785 "ObservationLocation/AstroCoords/Time/TimeInstant/"
786 "ISOTime")).text = str(trigger_time).encode()
788 root.find("./What/Param[@name='FAR']").attrib['value'] = \
789 str(far).encode()
791 root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \
792 str(duration).encode()
794 # Sky position
795 root.find(("./WhereWhen/ObsDataLocation/"
796 "ObservationLocation/AstroCoords/Position2D/Value2/"
797 "C1")).text = str(ra).encode()
798 root.find(("./WhereWhen/ObsDataLocation/"
799 "ObservationLocation/AstroCoords/Position2D/Value2/"
800 "C2")).text = str(dec).encode()
801 root.find(("./WhereWhen/ObsDataLocation/"
802 "ObservationLocation/AstroCoords/Position2D/"
803 "Error2Radius")).text = str(error).encode()
805 return (etree.tostring(root, xml_declaration=True, encoding="UTF-8",
806 pretty_print=True),
807 pipeline, trigger_time.replace('Z', ''), id)