Coverage for gwcelery/tasks/external_triggers.py: 100%

253 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-08-16 20:59 +0000

1from pathlib import Path 

2from urllib.parse import urlparse 

3 

4from astropy.time import Time 

5from celery import group 

6from celery.utils.log import get_logger 

7from lxml import etree 

8 

9from .. import app 

10from . import (alerts, detchar, external_skymaps, gcn, gracedb, igwn_alert, 

11 raven) 

12 

13log = get_logger(__name__) 

14 

15 

16REQUIRED_LABELS_BY_TASK = { 

17 # EM_READY implies preferred event sky map is available 

18 'compare': {'EM_READY', 'EXT_SKYMAP_READY', 'EM_COINC'}, 

19 'SoG': {'SKYMAP_READY', 'RAVEN_ALERT', 'ADVOK'} 

20} 

21"""These labels should be present on an external event to consider it to 

22be ready for sky map comparison or for post-alert analysis, such as a 

23measurment of the speed of gravity (SoG). 

24""" 

25 

26FERMI_GRB_CLASS_VALUE = 4 

27"""This is the index that denote GRBs within Fermi's Flight Position 

28classification.""" 

29 

30FERMI_GRB_CLASS_THRESH = 50 

31"""This values denotes the threshold of the most likely Fermi source 

32classification, above which we will consider a Fermi Flight Position 

33notice.""" 

34 

35 

36@alerts.handler('snews', 

37 queue='exttrig', 

38 shared=False) 

39def handle_snews_gcn(payload): 

40 """Handles the GCN notice payload from SNEWS alerts. 

41 

42 Prepares the alert to be sent to graceDB as external events, updating the 

43 info if it already exists. 

44 

45 Parameters 

46 ---------- 

47 payload : str 

48 XML GCN notice alert packet in string format 

49 

50 """ 

51 root = etree.fromstring(payload) 

52 

53 # Get TrigID and Test Event Boolean 

54 trig_id = root.find("./What/Param[@name='TrigID']").attrib['value'] 

55 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' 

56 

57 event_observatory = 'SNEWS' 

58 if 'mdc-test_event' in root.attrib['ivorn'].lower(): 

59 search = 'MDC' 

60 else: 

61 search = 'Supernova' 

62 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

63 event_observatory, trig_id) 

64 

65 ( 

66 gracedb.get_events.si(query=query) 

67 | 

68 _create_replace_external_event_and_skymap.s( 

69 payload, search, event_observatory, ext_group=ext_group 

70 ) 

71 ).delay() 

72 

73 

74@alerts.handler('fermi_gbm_alert', 

75 'fermi_gbm_flt_pos', 

76 'fermi_gbm_gnd_pos', 

77 'fermi_gbm_fin_pos', 

78 'fermi_gbm_subthresh', 

79 'swift_bat_grb_pos_ack', 

80 'integral_wakeup', 

81 'integral_refined', 

82 'integral_offline', 

83 queue='exttrig', 

84 shared=False) 

85def handle_grb_gcn(payload): 

86 """Handles the payload from Fermi, Swift, and INTEGRAL GCN notices. 

87 

88 Filters out candidates likely to be noise. Creates external events 

89 from the notice if new notice, otherwise updates existing event. Then 

90 creates and/or grabs external sky map to be uploaded to the external event. 

91 

92 More info for these notices can be found at: 

93 Fermi-GBM: https://gcn.gsfc.nasa.gov/fermi_grbs.html 

94 Fermi-GBM sub: https://gcn.gsfc.nasa.gov/fermi_gbm_subthresh_archive.html 

95 Swift: https://gcn.gsfc.nasa.gov/swift.html 

96 INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html 

97 

98 Parameters 

99 ---------- 

100 payload : str 

101 XML GCN notice alert packet in string format 

102 

103 """ 

104 root = etree.fromstring(payload) 

105 u = urlparse(root.attrib['ivorn']) 

106 stream_path = u.path 

107 

108 stream_obsv_dict = {'/SWIFT': 'Swift', 

109 '/Fermi': 'Fermi', 

110 '/INTEGRAL': 'INTEGRAL'} 

111 event_observatory = stream_obsv_dict[stream_path] 

112 

113 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' 

114 

115 # Block Test INTEGRAL events on the production server to prevent 

116 # unneeded queries of old GW data during detchar check 

117 if event_observatory == 'INTEGRAL' and ext_group == 'Test' and \ 

118 app.conf['gracedb_host'] == 'gracedb.ligo.org': 

119 return 

120 # Get TrigID 

121 elif event_observatory == 'INTEGRAL' and \ 

122 not any([x in u.fragment for x in ['O3-replay', 'MDC-test']]): 

123 # FIXME: revert all this if INTEGRAL fixes their GCN notices 

124 # If INTEGRAL, get trigger ID from ivorn rather than the TrigID field 

125 # unless O3 replay or MDC event 

126 trig_id = u.fragment.split('_')[-1].split('-')[0] 

127 # Modify the TrigID field so GraceDB has the correct value 

128 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

129 str(trig_id).encode() 

130 # Apply changes to payload delivered to GraceDB 

131 payload = etree.tostring(root, xml_declaration=True, encoding="UTF-8") 

132 else: 

133 try: 

134 trig_id = \ 

135 root.find("./What/Param[@name='TrigID']").attrib['value'] 

136 except AttributeError: 

137 trig_id = \ 

138 root.find("./What/Param[@name='Trans_Num']").attrib['value'] 

139 

140 notice_type = \ 

141 int(root.find("./What/Param[@name='Packet_Type']").attrib['value']) 

142 

143 reliability = root.find("./What/Param[@name='Reliability']") 

144 if reliability is not None and int(reliability.attrib['value']) <= 4: 

145 return 

146 

147 # Check if Fermi trigger is likely noise by checking classification 

148 # Most_Likely_Index of 4 is an astrophysical GRB 

149 # If not at least 50% chance of GRB we will not consider it for RAVEN 

150 likely_source = root.find("./What/Param[@name='Most_Likely_Index']") 

151 likely_prob = root.find("./What/Param[@name='Most_Likely_Prob']") 

152 not_likely_grb = likely_source is not None and \ 

153 (likely_source.attrib['value'] != FERMI_GRB_CLASS_VALUE 

154 or likely_prob.attrib['value'] < FERMI_GRB_CLASS_THRESH) 

155 

156 # Check if initial Fermi alert. These are generally unreliable and should 

157 # never trigger a RAVEN alert, but will give us earlier warning of a 

158 # possible coincidence. Later notices could change this. 

159 initial_gbm_alert = notice_type == gcn.NoticeType.FERMI_GBM_ALERT 

160 

161 # Check if Swift has lost lock. If so then veto 

162 lost_lock = \ 

163 root.find("./What/Group[@name='Solution_Status']" + 

164 "/Param[@name='StarTrack_Lost_Lock']") 

165 swift_veto = lost_lock is not None and lost_lock.attrib['value'] == 'true' 

166 

167 # Only send alerts if likely a GRB, is not a low-confidence early Fermi 

168 # alert, and if not a Swift veto 

169 if not_likely_grb or initial_gbm_alert or swift_veto: 

170 label = 'NOT_GRB' 

171 else: 

172 label = None 

173 

174 ivorn = root.attrib['ivorn'] 

175 if 'subthresh' in ivorn.lower(): 

176 search = 'SubGRB' 

177 elif 'mdc-test_event' in ivorn.lower(): 

178 search = 'MDC' 

179 else: 

180 search = 'GRB' 

181 

182 if search == 'SubGRB' and event_observatory == 'Fermi': 

183 skymap_link = \ 

184 root.find("./What/Param[@name='HealPix_URL']").attrib['value'] 

185 else: 

186 skymap_link = None 

187 

188 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

189 event_observatory, trig_id) 

190 

191 ( 

192 gracedb.get_events.si(query=query) 

193 | 

194 _create_replace_external_event_and_skymap.s( 

195 payload, search, event_observatory, 

196 ext_group=ext_group, label=label, 

197 notice_date=root.find("./Who/Date").text, 

198 notice_type=notice_type, 

199 skymap_link=skymap_link, 

200 use_radec=search in {'GRB', 'MDC'} 

201 ) 

202 ).delay() 

203 

204 

205@igwn_alert.handler('superevent', 

206 'mdc_superevent', 

207 'external_fermi', 

208 'external_swift', 

209 'external_integral', 

210 shared=False) 

211def handle_grb_igwn_alert(alert): 

212 """Parse an IGWN alert message related to superevents/GRB external triggers 

213 and dispatch it to other tasks. 

214 

215 Notes 

216 ----- 

217 This IGWN alert message handler is triggered by creating a new superevent 

218 or GRB external trigger event, a label associated with completeness of 

219 skymaps or change in state, or if a sky map file is uploaded: 

220 

221 * New event/superevent triggers a coincidence search with 

222 :meth:`gwcelery.tasks.raven.coincidence_search`. 

223 * When both a GW and GRB sky map are available during a coincidence, 

224 indicated by the labels ``EM_READY`` and ``EXT_SKYMAP_READY`` 

225 respectively on the external event, this triggers the spacetime coinc 

226 FAR to be calculated and a combined GW-GRB sky map is created using 

227 :meth:`gwcelery.tasks.external_skymaps.create_combined_skymap`. 

228 * Re-run sky map comparison if complete, and either the GW or GRB sky 

229 map has been updated or if the preferred event changed. 

230 * Re-check RAVEN publishing conditions if the GRB was previously 

231 considered non-astrophycial but now should be considered. 

232 

233 Parameters 

234 ---------- 

235 alert : dict 

236 IGWN alert packet 

237 

238 """ 

239 # Determine GraceDB ID 

240 graceid = alert['uid'] 

241 

242 # launch searches 

243 if alert['alert_type'] == 'new': 

244 if alert['object'].get('group') == 'External': 

245 # launch search with MDC events and exit 

246 if alert['object']['search'] == 'MDC': 

247 raven.coincidence_search(graceid, alert['object'], 

248 group='CBC', se_searches=['MDC']) 

249 raven.coincidence_search(graceid, alert['object'], 

250 group='Burst', se_searches=['MDC']) 

251 return 

252 

253 if alert['object']['search'] in ['SubGRB', 'SubGRBTargeted']: 

254 # if sub-threshold GRB, launch search with that pipeline 

255 raven.coincidence_search( 

256 graceid, alert['object'], 

257 searches=['SubGRB', 'SubGRBTargeted'], 

258 se_searches=['AllSky', 'BBH'], 

259 pipelines=[alert['object']['pipeline']]) 

260 else: 

261 # launch standard Burst-GRB search 

262 raven.coincidence_search(graceid, alert['object'], 

263 group='Burst', se_searches=['AllSky']) 

264 

265 # launch standard CBC-GRB search and similar BBH search 

266 raven.coincidence_search(graceid, alert['object'], 

267 group='CBC', searches=['GRB']) 

268 raven.coincidence_search(graceid, alert['object'], 

269 group='Burst', searches=['GRB'], 

270 se_searches=['BBH']) 

271 elif 'S' in graceid: 

272 # launch standard GRB search based on group 

273 gw_group = alert['object']['preferred_event_data']['group'] 

274 search = alert['object']['preferred_event_data']['search'] 

275 CBC_like = gw_group == 'CBC' or search == 'BBH' 

276 

277 # launch search with MDC events and exit 

278 if search == 'MDC': 

279 raven.coincidence_search(graceid, alert['object'], 

280 group=gw_group, searches=['MDC']) 

281 return 

282 # Don't run search for IMBH Burst search 

283 elif gw_group == 'Burst' and \ 

284 search.lower() not in {'allsky', 'bbh'}: 

285 return 

286 

287 # Keep empty if CBC (field not needed), otherwise use AllSky or BBH 

288 se_searches = ([] if gw_group == 'CBC' else 

289 [alert['object']['preferred_event_data']['search']]) 

290 searches = (['SubGRB', 'SubGRBTargeted'] if CBC_like else 

291 ['SubGRBTargeted']) 

292 # launch standard GRB search 

293 raven.coincidence_search(graceid, alert['object'], 

294 group=gw_group, searches=['GRB'], 

295 se_searches=se_searches) 

296 # launch subthreshold search for Fermi and Swift separately to use 

297 # different time windows, for both CBC and Burst 

298 for pipeline in ['Fermi', 'Swift']: 

299 raven.coincidence_search( 

300 graceid, alert['object'], group=gw_group, 

301 searches=searches, pipelines=[pipeline]) 

302 # re-run raven pipeline and create combined sky map (if not a Swift event) 

303 # when sky maps are available 

304 elif alert['alert_type'] == 'label_added' and \ 

305 alert['object'].get('group') == 'External': 

306 if _skymaps_are_ready(alert['object'], alert['data']['name'], 

307 'compare'): 

308 # if both sky maps present and a coincidence, re-run RAVEN 

309 # pipeline and create combined sky maps 

310 ext_event = alert['object'] 

311 superevent_id, ext_id = _get_superevent_ext_ids( 

312 graceid, ext_event) 

313 superevent = gracedb.get_superevent(superevent_id) 

314 _relaunch_raven_pipeline_with_skymaps( 

315 superevent, ext_event, graceid) 

316 elif 'EM_COINC' in alert['object']['labels']: 

317 # if not complete, check if GW sky map; apply label to external 

318 # event if GW sky map 

319 se_labels = gracedb.get_labels(alert['object']['superevent']) 

320 if 'SKYMAP_READY' in se_labels: 

321 gracedb.create_label.si('SKYMAP_READY', graceid).delay() 

322 if 'EM_READY' in se_labels: 

323 gracedb.create_label.si('EM_READY', graceid).delay() 

324 # apply labels from superevent to external event to update state 

325 # and trigger functionality requiring sky maps, etc. 

326 elif alert['alert_type'] == 'label_added' and 'S' in graceid: 

327 if 'SKYMAP_READY' in alert['object']['labels']: 

328 # if sky map in superevent, apply label to all external events 

329 # at the time 

330 group( 

331 gracedb.create_label.si('SKYMAP_READY', ext_id) 

332 for ext_id in alert['object']['em_events'] 

333 ).delay() 

334 if 'EM_READY' in alert['object']['labels']: 

335 # if sky map not in superevent but in preferred event, apply label 

336 # to all external events at the time 

337 group( 

338 gracedb.create_label.si('EM_READY', ext_id) 

339 for ext_id in alert['object']['em_events'] 

340 ).delay() 

341 if _skymaps_are_ready(alert['object'], alert['data']['name'], 'SoG') \ 

342 and alert['object']['space_coinc_far'] is not None: 

343 # if a superevent is vetted by ADVOK and a spatial joint FAR is 

344 # available, check if SoG publishing conditions are met 

345 ( 

346 gracedb.get_event.si(alert['object']['em_type']) 

347 | 

348 raven.sog_paper_pipeline.s(alert['object']) 

349 ).delay() 

350 # if new GW or external sky map after first being available, try to remake 

351 # combine sky map and rerun raven pipeline 

352 elif alert['alert_type'] == 'log' and \ 

353 'EM_COINC' in alert['object']['labels'] and \ 

354 'fit' in alert['data']['filename'] and \ 

355 'flat' not in alert['data']['comment'].lower() and \ 

356 (alert['data']['filename'] != 

357 external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER): 

358 superevent_id, external_id = _get_superevent_ext_ids( 

359 graceid, alert['object']) 

360 if 'S' in graceid: 

361 superevent = alert['object'] 

362 else: 

363 superevent = gracedb.get_superevent(alert['object']['superevent']) 

364 external_event = alert['object'] 

365 # check if combined sky map already made, with the exception of Swift 

366 # which will fail 

367 if 'S' in graceid: 

368 # Rerun for all eligible external events 

369 for ext_id in superevent['em_events']: 

370 external_event = gracedb.get_event(ext_id) 

371 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

372 set(external_event['labels'])): 

373 _relaunch_raven_pipeline_with_skymaps( 

374 superevent, external_event, graceid, 

375 use_superevent_skymap=True) 

376 else: 

377 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

378 set(external_event['labels'])): 

379 _relaunch_raven_pipeline_with_skymaps( 

380 superevent, external_event, graceid) 

381 # Rerun the coincidence FAR calculation if possible with combined sky map 

382 # if the preferred event changes 

383 # We don't want to run this logic if PE results are present 

384 elif alert['alert_type'] == 'log' and \ 

385 'PE_READY' not in alert['object']['labels'] and \ 

386 'EM_COINC' in alert['object']['labels']: 

387 new_log_comment = alert['data'].get('comment', '') 

388 if 'S' in graceid and \ 

389 new_log_comment.startswith('Updated superevent parameters: ' 

390 'preferred_event: '): 

391 superevent = alert['object'] 

392 # Rerun for all eligible external events 

393 for ext_id in superevent['em_events']: 

394 external_event = gracedb.get_event(ext_id) 

395 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

396 set(external_event['labels'])): 

397 _relaunch_raven_pipeline_with_skymaps( 

398 superevent, external_event, graceid, 

399 use_superevent_skymap=False) 

400 elif alert['alert_type'] == 'label_removed' and \ 

401 alert['object'].get('group') == 'External': 

402 if alert['data']['name'] == 'NOT_GRB' and \ 

403 'EM_COINC' in alert['object']['labels']: 

404 # if NOT_GRB is removed, re-check publishing conditions 

405 superevent_id = alert['object']['superevent'] 

406 superevent = gracedb.get_superevent(superevent_id) 

407 gw_group = superevent['preferred_event_data']['group'] 

408 coinc_far_dict = { 

409 'temporal_coinc_far': superevent['time_coinc_far'], 

410 'spatiotemporal_coinc_far': superevent['space_coinc_far'] 

411 } 

412 raven.trigger_raven_alert(coinc_far_dict, superevent, graceid, 

413 alert['object'], gw_group) 

414 

415 

416@igwn_alert.handler('superevent', 

417 'mdc_superevent', 

418 'external_snews', 

419 shared=False) 

420def handle_snews_igwn_alert(alert): 

421 """Parse an IGWN alert message related to superevents/Supernovae external 

422 triggers and dispatch it to other tasks. 

423 

424 Notes 

425 ----- 

426 This igwn_alert message handler is triggered whenever a new superevent 

427 or Supernovae external event is created: 

428 

429 * New event triggers a coincidence search with 

430 :meth:`gwcelery.tasks.raven.coincidence_search`. 

431 

432 Parameters 

433 ---------- 

434 alert : dict 

435 IGWN alert packet 

436 

437 """ 

438 # Determine GraceDB ID 

439 graceid = alert['uid'] 

440 

441 if alert['alert_type'] == 'new': 

442 if alert['object'].get('superevent_id'): 

443 group = alert['object']['preferred_event_data']['group'] 

444 search = alert['object']['preferred_event_data']['search'] 

445 searches = ['MDC'] if search == 'MDC' else ['Supernova'] 

446 se_searches = ['MDC'] if search == 'MDC' else ['AllSky'] 

447 # Run only on Test and Burst superevents 

448 if group in {'Burst', 'Test'} and search in {'MDC', 'AllSky'}: 

449 raven.coincidence_search(graceid, alert['object'], 

450 group='Burst', searches=searches, 

451 se_searches=se_searches, 

452 pipelines=['SNEWS']) 

453 else: 

454 # Run on SNEWS event, either real or test 

455 search = alert['object']['search'] 

456 if search == 'MDC': 

457 raven.coincidence_search(graceid, alert['object'], 

458 group='Burst', searches=['MDC'], 

459 se_searches=['MDC'], 

460 pipelines=['SNEWS']) 

461 elif search == 'Supernova': 

462 raven.coincidence_search(graceid, alert['object'], 

463 group='Burst', searches=['Supernova'], 

464 se_searches=['AllSky'], 

465 pipelines=['SNEWS']) 

466 

467 

468@alerts.handler('fermi_targeted', 

469 'swift_targeted') 

470def handle_targeted_kafka_alert(alert): 

471 """Parse an alert sent via Kafka from a MOU partner in our joint 

472 subthreshold targeted search. 

473 

474 Parameters 

475 ---------- 

476 alert : dict 

477 Kafka alert packet 

478 

479 """ 

480 # Convert alert to VOEvent format 

481 # FIXME: This is required until native ingesting of kafka events in GraceDB 

482 payload, pipeline, time, trig_id = _kafka_to_voevent(alert) 

483 

484 # Veto events that don't pass GRB FAR threshold 

485 far_grb = alert['far'] 

486 veto_event = \ 

487 app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] < far_grb 

488 label = ('NOT_GRB' if alert['alert_type'] == "retraction" or veto_event 

489 else None) 

490 

491 # Look whether a previous event with the same ID exists 

492 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

493 pipeline, trig_id) 

494 

495 ( 

496 gracedb.get_events.si(query=query) 

497 | 

498 _create_replace_external_event_and_skymap.s( 

499 payload, 'SubGRBTargeted', pipeline, 

500 label=label, notice_date=time, 

501 skymap=alert.get('healpix_file'), 

502 use_radec=('ra' in alert and 'dec' in alert) 

503 ) 

504 ).delay() 

505 

506 

507def _skymaps_are_ready(event, label, task): 

508 """Determine whether labels are complete to launch a certain task. 

509 

510 Parameters 

511 ---------- 

512 event : dict 

513 Either Superevent or external event dictionary 

514 graceid : str 

515 GraceDB ID 

516 task : str 

517 Determines which label schmema to check for completeness 

518 

519 Returns 

520 ------- 

521 labels_pass : bool 

522 True if all the require labels are present and the given label is part 

523 of that set 

524 """ 

525 label_set = set(event['labels']) 

526 required_labels = REQUIRED_LABELS_BY_TASK[task] 

527 return required_labels.issubset(label_set) and label in required_labels 

528 

529 

530def _get_superevent_ext_ids(graceid, event): 

531 """Grab superevent and external event IDs from a given event. 

532 

533 Parameters 

534 ---------- 

535 graceid : str 

536 GraceDB ID 

537 event : dict 

538 Either Superevent or external event dictionary 

539 

540 Returns 

541 ------- 

542 se_id, ext_id : tuple 

543 Tuple of superevent and external event GraceDB IDs 

544 

545 """ 

546 if 'S' in graceid: 

547 se_id = event['superevent_id'] 

548 ext_id = event['em_type'] 

549 else: 

550 se_id = event['superevent'] 

551 ext_id = event['graceid'] 

552 return se_id, ext_id 

553 

554 

555@app.task(shared=False) 

556def _launch_external_detchar(event): 

557 """Launch detchar tasks for an external event. 

558 

559 Parameters 

560 ---------- 

561 event : dict 

562 External event dictionary 

563 

564 Returns 

565 ------- 

566 event : dict 

567 External event dictionary 

568 

569 """ 

570 start = event['gpstime'] 

571 if event['pipeline'] == 'SNEWS': 

572 start, end = event['gpstime'], event['gpstime'] 

573 else: 

574 integration_time = \ 

575 event['extra_attributes']['GRB']['trigger_duration'] or 4.0 

576 end = start + integration_time 

577 detchar.check_vectors.si(event, event['graceid'], start, end).delay() 

578 

579 return event 

580 

581 

582def _relaunch_raven_pipeline_with_skymaps(superevent, ext_event, graceid, 

583 use_superevent_skymap=None): 

584 """Relaunch the RAVEN sky map comparison workflow, include recalculating 

585 the joint FAR with updated sky map info and creating a new combined sky 

586 map. 

587 

588 Parameters 

589 ---------- 

590 superevent : dict 

591 Superevent dictionary 

592 exttrig : dict 

593 External event dictionary 

594 graceid : str 

595 GraceDB ID of event 

596 use_superevent_skymap : bool 

597 If True/False, use/don't use skymap info from superevent. 

598 Else if None, check SKYMAP_READY label in external event. 

599 

600 """ 

601 gw_group = superevent['preferred_event_data']['group'] 

602 tl, th = raven._time_window( 

603 graceid, gw_group, [ext_event['pipeline']], [ext_event['search']], 

604 [superevent['preferred_event_data']['search']]) 

605 # FIXME: both overlap integral and combined sky map could be 

606 # done by the same function since they are so similar 

607 use_superevent = (use_superevent_skymap 

608 if use_superevent_skymap is not None else 

609 'SKYMAP_READY' in ext_event['labels']) 

610 canvas = raven.raven_pipeline.si( 

611 [ext_event] if 'S' in graceid else [superevent], 

612 graceid, 

613 superevent if 'S' in graceid else ext_event, 

614 tl, th, gw_group, use_superevent_skymap=use_superevent) 

615 # Create new updated combined sky map 

616 canvas |= external_skymaps.create_combined_skymap.si( 

617 superevent['superevent_id'], ext_event['graceid'], 

618 preferred_event=( 

619 None if use_superevent 

620 else superevent['preferred_event'])) 

621 canvas.delay() 

622 

623 

624@app.task(shared=False) 

625def _create_replace_external_event_and_skymap( 

626 events, payload, search, pipeline, 

627 label=None, ext_group='External', notice_date=None, notice_type=None, 

628 skymap=None, skymap_link=None, use_radec=False): 

629 """Either create a new external event or replace an old one if applicable 

630 Then either uploads a given sky map, try to download one given a link, or 

631 create one given coordinates. 

632 

633 Parameters 

634 ---------- 

635 events : list 

636 List of external events sharing the same trigger ID 

637 payload : str 

638 VOEvent of event being considered 

639 search : str 

640 Search of external event 

641 pipeline : str 

642 Pipeline of external evevent 

643 label : list 

644 Label to be uploaded along with external event. If None, removes 

645 'NOT_GRB' label from event 

646 ext_group : str 

647 Group of external event, 'External' or 'Test' 

648 notice_date : str 

649 External event trigger time in ISO format 

650 notice_type : int 

651 GCN notice type integer 

652 skymap : str 

653 Base64 encoded sky map 

654 skymap_link : str 

655 Link to external sky map to be downloaded 

656 use_radec : bool 

657 If True, try to create sky map using given coordinates 

658 

659 """ 

660 skymap_detchar_canvas = () 

661 upload_new_skymap = True 

662 # If previous event, try to append 

663 if events and ext_group == 'External': 

664 assert len(events) == 1, 'Found more than one matching GraceDB entry' 

665 event, = events 

666 graceid = event['graceid'] 

667 # If previous Fermi sky map, check if from official analysis 

668 if 'EXT_SKYMAP_READY' in event['labels'] and \ 

669 event['pipeline'] == 'Fermi': 

670 # If True, block further sky maps from being uploaded automatically 

671 # Note that sky maps can also be uploaded via the dashboard 

672 upload_new_skymap = \ 

673 (external_skymaps.FERMI_OFFICIAL_SKYMAP_FILENAME not in 

674 external_skymaps.get_skymap_filename(graceid, is_gw=False)) 

675 if label: 

676 create_replace_canvas = gracedb.create_label.si(label, graceid) 

677 else: 

678 create_replace_canvas = gracedb.remove_label.si('NOT_GRB', graceid) 

679 

680 # Prevent SubGRBs from appending GRBs, also append if same search 

681 if search == 'GRB' or search == event['search']: 

682 # Replace event and pass already existing event dictionary 

683 create_replace_canvas |= gracedb.replace_event.si(graceid, payload) 

684 create_replace_canvas |= gracedb.get_event.si(graceid) 

685 else: 

686 # If not appending just exit 

687 return 

688 

689 # If new event, create new entry in GraceDB and launch detchar 

690 else: 

691 create_replace_canvas = gracedb.create_event.si( 

692 filecontents=payload, 

693 search=search, 

694 group=ext_group, 

695 pipeline=pipeline, 

696 labels=[label] if label else None) 

697 skymap_detchar_canvas += _launch_external_detchar.s(), 

698 

699 # Use sky map if provided 

700 if skymap: 

701 skymap_detchar_canvas += \ 

702 external_skymaps.read_upload_skymap_from_base64.s(skymap), 

703 # Otherwise if no official Fermi sky map, upload one based on given info 

704 elif upload_new_skymap: 

705 # Otherwise grab sky map from provided link 

706 if skymap_link: 

707 skymap_detchar_canvas += \ 

708 external_skymaps.get_upload_external_skymap.s(skymap_link), 

709 # Otherwise if threshold Fermi try to grab sky map 

710 elif pipeline == 'Fermi' and search == 'GRB': 

711 skymap_detchar_canvas += \ 

712 external_skymaps.get_upload_external_skymap.s(None), 

713 # Otherwise create sky map from given coordinates 

714 if use_radec: 

715 skymap_detchar_canvas += \ 

716 external_skymaps.create_upload_external_skymap.s( 

717 notice_type, notice_date), 

718 

719 ( 

720 create_replace_canvas 

721 | 

722 group(skymap_detchar_canvas) 

723 ).delay() 

724 

725 

726def _kafka_to_voevent(alert): 

727 """Parse an alert sent via Kafka from a MOU partner in our joint 

728 subthreshold targeted search and convert to an equivalent XML string 

729 GCN VOEvent. 

730 

731 Parameters 

732 ---------- 

733 alert : dict 

734 Kafka alert packet 

735 

736 Returns 

737 ------- 

738 payload : str 

739 XML GCN notice alert packet in string format 

740 

741 """ 

742 # Define basic values 

743 pipeline = alert['mission'] 

744 start_time = alert['trigger_time'] 

745 alert_time = alert['alert_datetime'] 

746 far = alert['far'] 

747 duration = alert['rate_duration'] 

748 id = '_'.join(str(x) for x in alert['id']) 

749 # Use central time since starting time is not well measured 

750 central_time = \ 

751 Time(start_time, format='isot', scale='utc').to_value('gps') + \ 

752 .5 * duration 

753 trigger_time = \ 

754 str(Time(central_time, format='gps', scale='utc').isot) + 'Z' 

755 

756 # sky localization may not be available 

757 ra = alert.get('ra') 

758 dec = alert.get('dec') 

759 # Try to get dec first then ra, None if both misssing 

760 error = alert.get('dec_uncertainty') 

761 if error is None: 

762 error = alert.get('ra_uncertainty') 

763 # Argument should be list if not None 

764 if isinstance(error, list): 

765 error = error[0] 

766 # if any missing sky map info, set to zeros so will be ignored later 

767 if ra is None or dec is None or error is None: 

768 ra, dec, error = 0., 0., 0. 

769 

770 # Load template 

771 fname = str(Path(__file__).parent / 

772 '../tests/data/{}_subgrbtargeted_template.xml'.format( 

773 pipeline.lower())) 

774 root = etree.parse(fname) 

775 

776 # Update template values 

777 # Change ivorn to indicate this is a subthreshold targeted event 

778 root.xpath('.')[0].attrib['ivorn'] = \ 

779 'ivo://lvk.internal/{0}#targeted_subthreshold-{1}'.format( 

780 pipeline.lower(), trigger_time).encode() 

781 

782 # Update ID 

783 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

784 id.encode() 

785 

786 # Change times to chosen time 

787 root.find("./Who/Date").text = str(alert_time).encode() 

788 root.find(("./WhereWhen/ObsDataLocation/" 

789 "ObservationLocation/AstroCoords/Time/TimeInstant/" 

790 "ISOTime")).text = str(trigger_time).encode() 

791 

792 root.find("./What/Param[@name='FAR']").attrib['value'] = \ 

793 str(far).encode() 

794 

795 root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \ 

796 str(duration).encode() 

797 

798 # Sky position 

799 root.find(("./WhereWhen/ObsDataLocation/" 

800 "ObservationLocation/AstroCoords/Position2D/Value2/" 

801 "C1")).text = str(ra).encode() 

802 root.find(("./WhereWhen/ObsDataLocation/" 

803 "ObservationLocation/AstroCoords/Position2D/Value2/" 

804 "C2")).text = str(dec).encode() 

805 root.find(("./WhereWhen/ObsDataLocation/" 

806 "ObservationLocation/AstroCoords/Position2D/" 

807 "Error2Radius")).text = str(error).encode() 

808 

809 return (etree.tostring(root, xml_declaration=True, encoding="UTF-8", 

810 pretty_print=True), 

811 pipeline, trigger_time.replace('Z', ''), id)