Coverage for gwcelery/tasks/external_triggers.py: 100%

282 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1from pathlib import Path 

2from urllib.parse import urlparse 

3 

4from astropy.time import Time 

5from celery import group 

6from celery.utils.log import get_logger 

7from lxml import etree 

8 

9from .. import app 

10from . import (alerts, detchar, external_skymaps, gcn, gracedb, igwn_alert, 

11 raven) 

12 

13log = get_logger(__name__) 

14 

15 

16REQUIRED_LABELS_BY_TASK = { 

17 # EM_READY implies preferred event sky map is available 

18 'compare': {'EM_READY', 'EXT_SKYMAP_READY', 'EM_COINC'}, 

19 'SoG': {'SKYMAP_READY', 'RAVEN_ALERT', 'ADVOK'} 

20} 

21"""These labels should be present on an external event to consider it to 

22be ready for sky map comparison or for post-alert analysis, such as a 

23measurment of the speed of gravity (SoG). 

24""" 

25 

26FERMI_GRB_CLASS_VALUE = 4 

27"""This is the index that denote GRBs within Fermi's Flight Position 

28classification.""" 

29 

30FERMI_GRB_CLASS_THRESH = 50 

31"""This values denotes the threshold of the most likely Fermi source 

32classification, above which we will consider a Fermi Flight Position 

33notice.""" 

34 

35 

36@alerts.handler('snews', 

37 queue='exttrig', 

38 shared=False) 

39def handle_snews_gcn(payload): 

40 """Handles the GCN notice payload from SNEWS alerts. 

41 

42 Prepares the alert to be sent to graceDB as external events, updating the 

43 info if it already exists. 

44 

45 Parameters 

46 ---------- 

47 payload : str 

48 XML GCN notice alert packet in string format 

49 

50 """ 

51 root = etree.fromstring(payload) 

52 

53 # Get TrigID and Test Event Boolean 

54 trig_id = root.find("./What/Param[@name='TrigID']").attrib['value'] 

55 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' 

56 

57 event_observatory = 'SNEWS' 

58 if 'mdc-test_event' in root.attrib['ivorn'].lower(): 

59 search = 'MDC' 

60 else: 

61 search = 'Supernova' 

62 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

63 event_observatory, trig_id) 

64 

65 ( 

66 gracedb.get_events.si(query=query) 

67 | 

68 _create_replace_external_event_and_skymap.s( 

69 payload, search, event_observatory, ext_group=ext_group 

70 ) 

71 ).delay() 

72 

73 

74@alerts.handler('fermi_gbm_alert', 

75 'fermi_gbm_flt_pos', 

76 'fermi_gbm_gnd_pos', 

77 'fermi_gbm_fin_pos', 

78 'fermi_gbm_subthresh', 

79 'swift_bat_grb_pos_ack', 

80 'integral_wakeup', 

81 'integral_refined', 

82 'integral_offline', 

83 queue='exttrig', 

84 shared=False) 

85def handle_grb_gcn(payload): 

86 """Handles the payload from Fermi, Swift, and INTEGRAL GCN notices. 

87 

88 Filters out candidates likely to be noise. Creates external events 

89 from the notice if new notice, otherwise updates existing event. Then 

90 creates and/or grabs external sky map to be uploaded to the external event. 

91 

92 More info for these notices can be found at: 

93 Fermi-GBM: https://gcn.gsfc.nasa.gov/fermi_grbs.html 

94 Fermi-GBM sub: https://gcn.gsfc.nasa.gov/fermi_gbm_subthresh_archive.html 

95 Swift: https://gcn.gsfc.nasa.gov/swift.html 

96 INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html 

97 

98 Parameters 

99 ---------- 

100 payload : str 

101 XML GCN notice alert packet in string format 

102 

103 """ 

104 root = etree.fromstring(payload) 

105 u = urlparse(root.attrib['ivorn']) 

106 stream_path = u.path 

107 

108 stream_obsv_dict = {'/SWIFT': 'Swift', 

109 '/Fermi': 'Fermi', 

110 '/INTEGRAL': 'INTEGRAL'} 

111 event_observatory = stream_obsv_dict[stream_path] 

112 

113 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' 

114 

115 # Block Test INTEGRAL events on the production server to prevent 

116 # unneeded queries of old GW data during detchar check 

117 if event_observatory == 'INTEGRAL' and ext_group == 'Test' and \ 

118 app.conf['gracedb_host'] == 'gracedb.ligo.org': 

119 return 

120 # Get TrigID 

121 elif event_observatory == 'INTEGRAL' and \ 

122 not any([x in u.fragment for x in ['O3-replay', 'MDC-test']]): 

123 # FIXME: revert all this if INTEGRAL fixes their GCN notices 

124 # If INTEGRAL, get trigger ID from ivorn rather than the TrigID field 

125 # unless O3 replay or MDC event 

126 trig_id = u.fragment.split('_')[-1].split('-')[0] 

127 # Modify the TrigID field so GraceDB has the correct value 

128 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

129 str(trig_id).encode() 

130 # Apply changes to payload delivered to GraceDB 

131 payload = etree.tostring(root, xml_declaration=True, encoding="UTF-8") 

132 else: 

133 try: 

134 trig_id = \ 

135 root.find("./What/Param[@name='TrigID']").attrib['value'] 

136 except AttributeError: 

137 trig_id = \ 

138 root.find("./What/Param[@name='Trans_Num']").attrib['value'] 

139 

140 notice_type = \ 

141 int(root.find("./What/Param[@name='Packet_Type']").attrib['value']) 

142 

143 reliability = root.find("./What/Param[@name='Reliability']") 

144 if reliability is not None and int(reliability.attrib['value']) <= 4: 

145 return 

146 

147 # Check if Fermi trigger is likely noise by checking classification 

148 # Most_Likely_Index of 4 is an astrophysical GRB 

149 # If not at least 50% chance of GRB we will not consider it for RAVEN 

150 likely_source = root.find("./What/Param[@name='Most_Likely_Index']") 

151 likely_prob = root.find("./What/Param[@name='Most_Likely_Prob']") 

152 not_likely_grb = likely_source is not None and \ 

153 (likely_source.attrib['value'] != FERMI_GRB_CLASS_VALUE 

154 or likely_prob.attrib['value'] < FERMI_GRB_CLASS_THRESH) 

155 

156 # Check if initial Fermi alert. These are generally unreliable and should 

157 # never trigger a RAVEN alert, but will give us earlier warning of a 

158 # possible coincidence. Later notices could change this. 

159 initial_gbm_alert = notice_type == gcn.NoticeType.FERMI_GBM_ALERT 

160 

161 # Check if Swift has lost lock. If so then veto 

162 lost_lock = \ 

163 root.find("./What/Group[@name='Solution_Status']" + 

164 "/Param[@name='StarTrack_Lost_Lock']") 

165 swift_veto = lost_lock is not None and lost_lock.attrib['value'] == 'true' 

166 

167 # Only send alerts if likely a GRB, is not a low-confidence early Fermi 

168 # alert, and if not a Swift veto 

169 if not_likely_grb or initial_gbm_alert or swift_veto: 

170 label = 'NOT_GRB' 

171 else: 

172 label = None 

173 

174 ivorn = root.attrib['ivorn'] 

175 if 'subthresh' in ivorn.lower(): 

176 search = 'SubGRB' 

177 elif 'mdc-test_event' in ivorn.lower(): 

178 search = 'MDC' 

179 else: 

180 search = 'GRB' 

181 

182 if search == 'SubGRB' and event_observatory == 'Fermi': 

183 skymap_link = \ 

184 root.find("./What/Param[@name='HealPix_URL']").attrib['value'] 

185 else: 

186 skymap_link = None 

187 

188 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

189 event_observatory, trig_id) 

190 

191 ( 

192 gracedb.get_events.si(query=query) 

193 | 

194 _create_replace_external_event_and_skymap.s( 

195 payload, search, event_observatory, 

196 ext_group=ext_group, label=label, 

197 notice_date=root.find("./Who/Date").text, 

198 notice_type=notice_type, 

199 skymap_link=skymap_link, 

200 use_radec=search in {'GRB', 'MDC'} 

201 ) 

202 ).delay() 

203 

204 

205@igwn_alert.handler('superevent', 

206 'mdc_superevent', 

207 'external_fermi', 

208 'external_swift', 

209 'external_integral', 

210 shared=False) 

211def handle_grb_igwn_alert(alert): 

212 """Parse an IGWN alert message related to superevents/GRB external triggers 

213 and dispatch it to other tasks. 

214 

215 Notes 

216 ----- 

217 This IGWN alert message handler is triggered by creating a new superevent 

218 or GRB external trigger event, a label associated with completeness of 

219 skymaps or change in state, or if a sky map file is uploaded: 

220 

221 * New event/superevent triggers a coincidence search with 

222 :meth:`gwcelery.tasks.raven.coincidence_search`. 

223 * If other type of IGWN alert, pass to _handle_skymaps to decide whether 

224 to re-run RAVEN pipeline based on labels or whether to add labels that 

225 could start this process. 

226 

227 Parameters 

228 ---------- 

229 alert : dict 

230 IGWN alert packet 

231 

232 """ 

233 # Determine GraceDB ID 

234 graceid = alert['uid'] 

235 

236 # launch searches 

237 if alert['alert_type'] == 'new': 

238 if alert['object'].get('group') == 'External': 

239 # launch search with MDC events and exit 

240 if alert['object']['search'] == 'MDC': 

241 raven.coincidence_search(graceid, alert['object'], 

242 group='CBC', se_searches=['MDC']) 

243 raven.coincidence_search(graceid, alert['object'], 

244 group='Burst', se_searches=['MDC']) 

245 return 

246 

247 elif alert['object']['search'] == 'SubGRB': 

248 # Launch search with standard CBC 

249 raven.coincidence_search( 

250 graceid, alert['object'], 

251 searches=['SubGRB'], 

252 group='CBC', 

253 pipelines=[alert['object']['pipeline']]) 

254 # Launch search with CWB BBH 

255 raven.coincidence_search( 

256 graceid, alert['object'], 

257 searches=['SubGRB'], 

258 group='Burst', 

259 se_searches=['BBH'], 

260 pipelines=[alert['object']['pipeline']]) 

261 elif alert['object']['search'] == 'SubGRBTargeted': 

262 # if sub-threshold GRB, launch search with that pipeline 

263 raven.coincidence_search( 

264 graceid, alert['object'], 

265 searches=['SubGRBTargeted'], 

266 se_searches=['AllSky', 'BBH'], 

267 pipelines=[alert['object']['pipeline']]) 

268 elif alert['object']['search'] == 'GRB': 

269 # launch standard Burst-GRB search 

270 raven.coincidence_search(graceid, alert['object'], 

271 group='Burst', se_searches=['AllSky']) 

272 

273 # launch standard CBC-GRB search and similar BBH search 

274 raven.coincidence_search(graceid, alert['object'], 

275 group='CBC', searches=['GRB']) 

276 raven.coincidence_search(graceid, alert['object'], 

277 group='Burst', searches=['GRB'], 

278 se_searches=['BBH']) 

279 elif 'S' in graceid: 

280 # launch standard GRB search based on group 

281 gw_group = alert['object']['preferred_event_data']['group'] 

282 search = alert['object']['preferred_event_data']['search'] 

283 CBC_like = gw_group == 'CBC' or search == 'BBH' 

284 

285 # launch search with MDC events and exit 

286 if search == 'MDC': 

287 raven.coincidence_search(graceid, alert['object'], 

288 group=gw_group, searches=['MDC']) 

289 return 

290 # Don't run search for IMBH Burst search 

291 elif gw_group == 'Burst' and \ 

292 search.lower() not in {'allsky', 'bbh'}: 

293 return 

294 

295 # Keep empty if CBC (field not needed), otherwise use AllSky or BBH 

296 se_searches = ([] if gw_group == 'CBC' else 

297 [alert['object']['preferred_event_data']['search']]) 

298 searches = (['SubGRB', 'SubGRBTargeted'] if CBC_like else 

299 ['SubGRBTargeted']) 

300 # launch standard GRB search 

301 raven.coincidence_search(graceid, alert['object'], 

302 group=gw_group, searches=['GRB'], 

303 se_searches=se_searches) 

304 # launch subthreshold search for Fermi and Swift separately to use 

305 # different time windows, for both CBC and Burst 

306 for pipeline in ['Fermi', 'Swift']: 

307 raven.coincidence_search( 

308 graceid, alert['object'], group=gw_group, 

309 searches=searches, pipelines=[pipeline]) 

310 else: 

311 _handle_skymaps(alert) 

312 

313 

314def _handle_skymaps(alert): 

315 """Parse an IGWN alert message related to superevents/GRB external triggers 

316 and dispatch tasks related to re-running RAVEN pipeline due to new sky 

317 maps. 

318 

319 Notes 

320 ----- 

321 This IGWN alert message handler is triggered by a label associated with 

322 completeness of skymaps or change in state, or if a sky map file is 

323 uploaded: 

324 

325 * When both a GW and GRB sky map are available during a coincidence, 

326 indicated by the labels ``EM_READY`` and ``EXT_SKYMAP_READY`` 

327 respectively on the external event, this triggers the spacetime coinc 

328 FAR to be calculated and a combined GW-GRB sky map is created using 

329 :meth:`gwcelery.tasks.external_skymaps.create_combined_skymap`. 

330 * If new label indicates sky maps are available in the superevent, 

331 apply to all associated external events. 

332 * Re-run sky map comparison if complete, and also if either the GW or GRB 

333 sky map has been updated or if the preferred event changed. 

334 * Re-check RAVEN publishing conditions if the GRB was previously 

335 considered non-astrophysical but now should be considered. 

336 

337 Parameters 

338 ---------- 

339 alert : dict 

340 IGWN alert packet 

341 

342 """ 

343 # Determine GraceDB ID 

344 graceid = alert['uid'] 

345 

346 # Define state variables 

347 is_superevent = 'S' in graceid 

348 is_external_event = alert['object'].get('group') == 'External' 

349 is_coincidence = 'EM_COINC' in alert['object']['labels'] 

350 pe_ready = 'PE_READY' in alert['object']['labels'] 

351 

352 # re-run raven pipeline and create combined sky map (if not a Swift event) 

353 # when sky maps are available 

354 if alert['alert_type'] == 'label_added' and is_external_event: 

355 if _skymaps_are_ready(alert['object'], alert['data']['name'], 

356 'compare'): 

357 # if both sky maps present and a coincidence, re-run RAVEN 

358 # pipeline and create combined sky maps 

359 ext_event = alert['object'] 

360 superevent_id, ext_id = _get_superevent_ext_ids( 

361 graceid, ext_event) 

362 superevent = gracedb.get_superevent(superevent_id) 

363 _relaunch_raven_pipeline_with_skymaps( 

364 superevent, ext_event, graceid) 

365 elif is_coincidence: 

366 # if not complete, check if GW sky map; apply label to external 

367 # event if GW sky map 

368 se_labels = gracedb.get_labels(alert['object']['superevent']) 

369 if 'SKYMAP_READY' in se_labels: 

370 gracedb.create_label.si('SKYMAP_READY', graceid).delay() 

371 if 'EM_READY' in se_labels: 

372 gracedb.create_label.si('EM_READY', graceid).delay() 

373 # apply labels from superevent to external event to update state 

374 # and trigger functionality requiring sky maps, etc. 

375 elif alert['alert_type'] == 'label_added' and is_superevent: 

376 if 'SKYMAP_READY' in alert['object']['labels']: 

377 # if sky map in superevent, apply label to all external events 

378 # at the time 

379 group( 

380 gracedb.create_label.si('SKYMAP_READY', ext_id) 

381 for ext_id in alert['object']['em_events'] 

382 ).delay() 

383 if 'EM_READY' in alert['object']['labels']: 

384 # if sky map not in superevent but in preferred event, apply label 

385 # to all external events at the time 

386 group( 

387 gracedb.create_label.si('EM_READY', ext_id) 

388 for ext_id in alert['object']['em_events'] 

389 ).delay() 

390 if _skymaps_are_ready(alert['object'], alert['data']['name'], 'SoG') \ 

391 and alert['object']['space_coinc_far'] is not None: 

392 # if a superevent is vetted by ADVOK and a spatial joint FAR is 

393 # available, check if SoG publishing conditions are met 

394 ( 

395 gracedb.get_event.si(alert['object']['em_type']) 

396 | 

397 raven.sog_paper_pipeline.s(alert['object']) 

398 ).delay() 

399 # if new GW or external sky map after first being available, try to remake 

400 # combine sky map and rerun raven pipeline 

401 elif alert['alert_type'] == 'log' and \ 

402 is_coincidence and \ 

403 'fit' in alert['data']['filename'] and \ 

404 'flat' not in alert['data']['comment'].lower() and \ 

405 (alert['data']['filename'] != 

406 external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER): 

407 superevent_id, external_id = _get_superevent_ext_ids( 

408 graceid, alert['object']) 

409 # check if combined sky map already made, with the exception of Swift 

410 # which will fail 

411 if is_superevent: 

412 superevent = alert['object'] 

413 # Rerun for all eligible external events 

414 for ext_id in superevent['em_events']: 

415 external_event = gracedb.get_event(ext_id) 

416 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

417 set(external_event['labels'])): 

418 _relaunch_raven_pipeline_with_skymaps( 

419 superevent, external_event, graceid, 

420 use_superevent_skymap=True) 

421 else: 

422 superevent = gracedb.get_superevent(alert['object']['superevent']) 

423 external_event = alert['object'] 

424 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

425 set(external_event['labels'])): 

426 _relaunch_raven_pipeline_with_skymaps( 

427 superevent, external_event, graceid) 

428 # Rerun the coincidence FAR calculation if possible with combined sky map 

429 # if the preferred event changes 

430 # We don't want to run this logic if PE results are present 

431 elif alert['alert_type'] == 'log' and not pe_ready and is_coincidence: 

432 new_log_comment = alert['data'].get('comment', '') 

433 if is_superevent and \ 

434 new_log_comment.startswith('Updated superevent parameters: ' 

435 'preferred_event: '): 

436 superevent = alert['object'] 

437 # Rerun for all eligible external events 

438 for ext_id in superevent['em_events']: 

439 external_event = gracedb.get_event(ext_id) 

440 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

441 set(external_event['labels'])): 

442 _relaunch_raven_pipeline_with_skymaps( 

443 superevent, external_event, graceid, 

444 use_superevent_skymap=False) 

445 elif alert['alert_type'] == 'label_removed' and is_external_event: 

446 if alert['data']['name'] == 'NOT_GRB' and is_coincidence: 

447 # if NOT_GRB is removed, re-check publishing conditions 

448 superevent_id = alert['object']['superevent'] 

449 superevent = gracedb.get_superevent(superevent_id) 

450 gw_group = superevent['preferred_event_data']['group'] 

451 coinc_far_dict = { 

452 'temporal_coinc_far': superevent['time_coinc_far'], 

453 'spatiotemporal_coinc_far': superevent['space_coinc_far'] 

454 } 

455 raven.trigger_raven_alert(coinc_far_dict, superevent, graceid, 

456 alert['object'], gw_group) 

457 

458 

459@igwn_alert.handler('superevent', 

460 'mdc_superevent', 

461 'external_snews', 

462 shared=False) 

463def handle_snews_igwn_alert(alert): 

464 """Parse an IGWN alert message related to superevents/Supernovae external 

465 triggers and dispatch it to other tasks. 

466 

467 Notes 

468 ----- 

469 This igwn_alert message handler is triggered whenever a new superevent 

470 or Supernovae external event is created: 

471 

472 * New event triggers a coincidence search with 

473 :meth:`gwcelery.tasks.raven.coincidence_search`. 

474 

475 Parameters 

476 ---------- 

477 alert : dict 

478 IGWN alert packet 

479 

480 """ 

481 # Determine GraceDB ID 

482 graceid = alert['uid'] 

483 

484 if alert['alert_type'] == 'new': 

485 if alert['object'].get('superevent_id'): 

486 group = alert['object']['preferred_event_data']['group'] 

487 search = alert['object']['preferred_event_data']['search'] 

488 searches = ['MDC'] if search == 'MDC' else ['Supernova'] 

489 se_searches = ['MDC'] if search == 'MDC' else ['AllSky'] 

490 # Run only on Test and Burst superevents 

491 if group in {'Burst', 'Test'} and search in {'MDC', 'AllSky'}: 

492 raven.coincidence_search(graceid, alert['object'], 

493 group='Burst', searches=searches, 

494 se_searches=se_searches, 

495 pipelines=['SNEWS']) 

496 else: 

497 # Run on SNEWS event, either real or test 

498 search = alert['object']['search'] 

499 if search == 'MDC': 

500 raven.coincidence_search(graceid, alert['object'], 

501 group='Burst', searches=['MDC'], 

502 se_searches=['MDC'], 

503 pipelines=['SNEWS']) 

504 elif search == 'Supernova': 

505 raven.coincidence_search(graceid, alert['object'], 

506 group='Burst', searches=['Supernova'], 

507 se_searches=['AllSky'], 

508 pipelines=['SNEWS']) 

509 

510 

511@alerts.handler('fermi_targeted', 

512 'swift_targeted', 

513 queue='exttrig', 

514 shared=False) 

515def handle_targeted_kafka_alert(alert): 

516 """Parse an alert sent via Kafka from a MOU partner in our joint 

517 subthreshold targeted search. 

518 

519 Parameters 

520 ---------- 

521 alert : dict 

522 Kafka alert packet 

523 

524 """ 

525 # Convert alert to VOEvent format 

526 # FIXME: This is required until native ingesting of kafka events in GraceDB 

527 payload, pipeline, time, trig_id = \ 

528 _kafka_to_voevent(alert, 'SubGRBTargeted') 

529 

530 # Veto events that don't pass GRB FAR threshold 

531 far_grb = alert['far'] 

532 veto_event = \ 

533 app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] < far_grb 

534 label = ('NOT_GRB' if alert['alert_type'] == "retraction" or veto_event 

535 else None) 

536 

537 # Look whether a previous event with the same ID exists 

538 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

539 pipeline, trig_id) 

540 

541 ( 

542 gracedb.get_events.si(query=query) 

543 | 

544 _create_replace_external_event_and_skymap.s( 

545 payload, 'SubGRBTargeted', pipeline, 

546 label=label, notice_date=time, 

547 skymap=alert.get('healpix_file'), 

548 use_radec=('ra' in alert and 'dec' in alert) 

549 ) 

550 ).delay() 

551 

552 

553def _skymaps_are_ready(event, label, task): 

554 """Determine whether labels are complete to launch a certain task. 

555 

556 Parameters 

557 ---------- 

558 event : dict 

559 Either Superevent or external event dictionary 

560 graceid : str 

561 GraceDB ID 

562 task : str 

563 Determines which label schmema to check for completeness 

564 

565 Returns 

566 ------- 

567 labels_pass : bool 

568 True if all the require labels are present and the given label is part 

569 of that set 

570 """ 

571 label_set = set(event['labels']) 

572 required_labels = REQUIRED_LABELS_BY_TASK[task] 

573 return required_labels.issubset(label_set) and label in required_labels 

574 

575 

576def _get_superevent_ext_ids(graceid, event): 

577 """Grab superevent and external event IDs from a given event. 

578 

579 Parameters 

580 ---------- 

581 graceid : str 

582 GraceDB ID 

583 event : dict 

584 Either Superevent or external event dictionary 

585 

586 Returns 

587 ------- 

588 se_id, ext_id : tuple 

589 Tuple of superevent and external event GraceDB IDs 

590 

591 """ 

592 if 'S' in graceid: 

593 se_id = event['superevent_id'] 

594 ext_id = event['em_type'] 

595 else: 

596 se_id = event['superevent'] 

597 ext_id = event['graceid'] 

598 return se_id, ext_id 

599 

600 

601@app.task(shared=False) 

602def _launch_external_detchar(event): 

603 """Launch detchar tasks for an external event. 

604 

605 Parameters 

606 ---------- 

607 event : dict 

608 External event dictionary 

609 

610 Returns 

611 ------- 

612 event : dict 

613 External event dictionary 

614 

615 """ 

616 start = event['gpstime'] 

617 if event['pipeline'] == 'SNEWS': 

618 start, end = event['gpstime'], event['gpstime'] 

619 else: 

620 integration_time = \ 

621 event['extra_attributes']['GRB']['trigger_duration'] or 4.0 

622 end = start + integration_time 

623 detchar.check_vectors.si(event, event['graceid'], start, end).delay() 

624 

625 return event 

626 

627 

628def _relaunch_raven_pipeline_with_skymaps(superevent, ext_event, graceid, 

629 use_superevent_skymap=None): 

630 """Relaunch the RAVEN sky map comparison workflow, include recalculating 

631 the joint FAR with updated sky map info and creating a new combined sky 

632 map. 

633 

634 Parameters 

635 ---------- 

636 superevent : dict 

637 Superevent dictionary 

638 exttrig : dict 

639 External event dictionary 

640 graceid : str 

641 GraceDB ID of event 

642 use_superevent_skymap : bool 

643 If True/False, use/don't use skymap info from superevent. 

644 Else if None, check SKYMAP_READY label in external event. 

645 

646 """ 

647 gw_group = superevent['preferred_event_data']['group'] 

648 tl, th = raven._time_window( 

649 graceid, gw_group, [ext_event['pipeline']], [ext_event['search']], 

650 [superevent['preferred_event_data']['search']]) 

651 # FIXME: both overlap integral and combined sky map could be 

652 # done by the same function since they are so similar 

653 use_superevent = (use_superevent_skymap 

654 if use_superevent_skymap is not None else 

655 'SKYMAP_READY' in ext_event['labels']) 

656 canvas = raven.raven_pipeline.si( 

657 [ext_event] if 'S' in graceid else [superevent], 

658 graceid, 

659 superevent if 'S' in graceid else ext_event, 

660 tl, th, gw_group, use_superevent_skymap=use_superevent) 

661 # Create new updated combined sky map 

662 canvas |= external_skymaps.create_combined_skymap.si( 

663 superevent['superevent_id'], ext_event['graceid'], 

664 preferred_event=( 

665 None if use_superevent 

666 else superevent['preferred_event'])) 

667 canvas.delay() 

668 

669 

670@app.task(shared=False, 

671 queue='exttrig') 

672def _create_replace_external_event_and_skymap( 

673 events, payload, search, pipeline, 

674 label=None, ext_group='External', notice_date=None, notice_type=None, 

675 skymap=None, skymap_link=None, use_radec=False): 

676 """Either create a new external event or replace an old one if applicable 

677 Then either uploads a given sky map, try to download one given a link, or 

678 create one given coordinates. 

679 

680 Parameters 

681 ---------- 

682 events : list 

683 List of external events sharing the same trigger ID 

684 payload : str 

685 VOEvent of event being considered 

686 search : str 

687 Search of external event 

688 pipeline : str 

689 Pipeline of external evevent 

690 label : list 

691 Label to be uploaded along with external event. If None, removes 

692 'NOT_GRB' label from event 

693 ext_group : str 

694 Group of external event, 'External' or 'Test' 

695 notice_date : str 

696 External event trigger time in ISO format 

697 notice_type : int 

698 GCN notice type integer 

699 skymap : str 

700 Base64 encoded sky map 

701 skymap_link : str 

702 Link to external sky map to be downloaded 

703 use_radec : bool 

704 If True, try to create sky map using given coordinates 

705 

706 """ 

707 skymap_detchar_canvas = () 

708 # If previous event, try to append 

709 if events and ext_group == 'External': 

710 assert len(events) == 1, 'Found more than one matching GraceDB entry' 

711 event, = events 

712 graceid = event['graceid'] 

713 if label: 

714 create_replace_canvas = gracedb.create_label.si(label, graceid) 

715 else: 

716 create_replace_canvas = gracedb.remove_label.si('NOT_GRB', graceid) 

717 

718 # Prevent SubGRBs from appending GRBs, also append if same search 

719 if search == 'GRB' or search == event['search']: 

720 # Replace event and pass already existing event dictionary 

721 create_replace_canvas |= gracedb.replace_event.si(graceid, payload) 

722 create_replace_canvas |= gracedb.get_event.si(graceid) 

723 else: 

724 # If not appending just exit 

725 return 

726 

727 # If new event, create new entry in GraceDB and launch detchar 

728 else: 

729 create_replace_canvas = gracedb.create_event.si( 

730 filecontents=payload, 

731 search=search, 

732 group=ext_group, 

733 pipeline=pipeline, 

734 labels=[label] if label else None) 

735 skymap_detchar_canvas += _launch_external_detchar.s(), 

736 

737 # Use sky map if provided 

738 if skymap: 

739 skymap_detchar_canvas += \ 

740 external_skymaps.read_upload_skymap_from_base64.s(skymap), 

741 # Otherwise upload one based on given info 

742 else: 

743 # Grab sky map from provided link 

744 if skymap_link: 

745 skymap_detchar_canvas += \ 

746 external_skymaps.get_upload_external_skymap.s(skymap_link), 

747 # Otherwise if FINAL Fermi notice try to grab sky map 

748 elif notice_type == gcn.NoticeType.FERMI_GBM_FIN_POS: 

749 # Wait 10 min and then look for official Fermi sky map 

750 skymap_detchar_canvas += \ 

751 external_skymaps.get_upload_external_skymap.s(None).set( 

752 countdown=600), 

753 # Otherwise create sky map from given coordinates 

754 if use_radec: 

755 skymap_detchar_canvas += \ 

756 external_skymaps.create_upload_external_skymap.s( 

757 notice_type, notice_date), 

758 

759 ( 

760 create_replace_canvas 

761 | 

762 group(skymap_detchar_canvas) 

763 ).delay() 

764 

765 

766def _kafka_to_voevent(alert, search): 

767 """Parse an alert in JSON format sent via Kafka from public GCN Notices or 

768 from a MOU partner in our joint subthreshold targeted search, converting 

769 to an equivalent XML string VOEvent. 

770 

771 Parameters 

772 ---------- 

773 alert : dict 

774 Kafka alert packet 

775 search : list 

776 External trigger search 

777 

778 Returns 

779 ------- 

780 payload : str 

781 XML GCN notice alert packet in string format 

782 

783 """ 

784 # Define basic values 

785 pipeline = alert.get('mission') 

786 if pipeline is None: 

787 if 'instrument' not in alert: 

788 raise ValueError("Alert does not contain pipeline information.") 

789 elif alert['instrument'].lower() == 'wxt': 

790 # FIXME: Replace with official GraceDB name once added 

791 pipeline = 'EinsteinProbe' 

792 

793 # Get times, adding missing 

794 start_time = alert['trigger_time'] 

795 alert_time = alert.get('alert_datetime', start_time) 

796 # If missing, add character onto end 

797 if not start_time.endswith('Z'): 

798 start_time += 'Z' 

799 if not alert_time.endswith('Z'): 

800 alert_time += 'Z' 

801 

802 # Try to get FAR is there 

803 far = alert.get('far') 

804 

805 # Go through list of possible values for durations, 0. if no keys match 

806 duration = 0. 

807 duration_params = ['rate_duration', 'image_duration'] 

808 for param in duration_params: 

809 if param in alert.keys(): 

810 duration = alert[param] 

811 break 

812 

813 # Form ID out of given list 

814 id = '_'.join(str(x) for x in alert['id']) 

815 if search == 'SubGRBTargeted': 

816 # Use central time since starting time is not well measured 

817 central_time = \ 

818 Time(start_time, format='isot', scale='utc').to_value('gps') + \ 

819 .5 * duration 

820 trigger_time = \ 

821 str(Time(central_time, format='gps', scale='utc').isot) + 'Z' 

822 else: 

823 trigger_time = start_time 

824 

825 # sky localization may not be available 

826 ra = alert.get('ra') 

827 dec = alert.get('dec') 

828 

829 # Go through list of possible values for error, None if no keys match 

830 error = None 

831 error_params = ['dec_uncertainty', 'ra_uncertainty', 'ra_dec_error'] 

832 for param in error_params: 

833 if param in alert.keys(): 

834 error = alert[param] 

835 break 

836 

837 # If argument is list, get value 

838 if isinstance(error, list): 

839 error = error[0] 

840 

841 # if any missing sky map info, set to zeros so will be ignored later 

842 if ra is None or dec is None or error is None: 

843 ra, dec, error = 0., 0., 0. 

844 

845 # Load template 

846 fname = \ 

847 str(Path(__file__).parent / 

848 f'../tests/data/{pipeline.lower()}_{search.lower()}_template.xml') 

849 root = etree.parse(fname) 

850 

851 # Update template values 

852 # Change ivorn to indicate this is a subthreshold targeted event 

853 root.xpath('.')[0].attrib['ivorn'] = \ 

854 'ivo://lvk.internal/{0}#{1}-{2}'.format( 

855 pipeline.lower(), 

856 ('targeted_subthreshold' if search == 'SubGRBTargeted' 

857 else search.lower()), 

858 trigger_time).encode() 

859 

860 # Change times to chosen time 

861 root.find("./Who/Date").text = str(alert_time).encode() 

862 root.find(("./WhereWhen/ObsDataLocation/" 

863 "ObservationLocation/AstroCoords/Time/TimeInstant/" 

864 "ISOTime")).text = str(trigger_time).encode() 

865 

866 # Update ID and duration 

867 # SVOM has separate template, use different paths 

868 if pipeline == 'SVOM': 

869 root.find(("./What/Group[@name='Svom_Identifiers']" 

870 "/Param[@name='Burst_Id']")).attrib['value'] = \ 

871 id.encode() 

872 

873 root.find(("./What/Group[@name='Detection_Info']" 

874 "/Param[@name='Timescale']")).attrib['value'] = \ 

875 str(duration).encode() 

876 # Every other type uses similar format 

877 else: 

878 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

879 id.encode() 

880 

881 root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \ 

882 str(duration).encode() 

883 

884 if far is not None: 

885 root.find("./What/Param[@name='FAR']").attrib['value'] = \ 

886 str(far).encode() 

887 

888 # Sky position 

889 root.find(("./WhereWhen/ObsDataLocation/" 

890 "ObservationLocation/AstroCoords/Position2D/Value2/" 

891 "C1")).text = str(ra).encode() 

892 root.find(("./WhereWhen/ObsDataLocation/" 

893 "ObservationLocation/AstroCoords/Position2D/Value2/" 

894 "C2")).text = str(dec).encode() 

895 root.find(("./WhereWhen/ObsDataLocation/" 

896 "ObservationLocation/AstroCoords/Position2D/" 

897 "Error2Radius")).text = str(error).encode() 

898 

899 return (etree.tostring(root, xml_declaration=True, encoding="UTF-8", 

900 pretty_print=True), 

901 pipeline, trigger_time.replace('Z', ''), id)