Coverage for gwcelery/tasks/orchestrator.py: 95%

404 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Tasks that comprise the alert orchestrator. 

2 

3The orchestrator is responsible for the vetting and annotation workflow to 

4produce preliminary, initial, and update alerts for gravitational-wave event 

5candidates. 

6""" 

7import json 

8import re 

9 

10from astropy.time import Time 

11from celery import chain, group 

12from ligo.rrt_chat import channel_creation 

13from rapidpe_rift_pipe import pastro as rpe_pastro 

14 

15from .. import app 

16from . import (alerts, bayestar, circulars, detchar, em_bright, 

17 external_skymaps, gcn, gracedb, igwn_alert, inference, p_astro, 

18 rrt_utils, skymaps, superevents) 

19from .core import get_first, identity 

20 

21 

22@igwn_alert.handler('superevent', 

23 'mdc_superevent', 

24 shared=False) 

25def handle_superevent(alert): 

26 """Schedule annotations for new superevents. 

27 

28 After waiting for a time specified by the 

29 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the 

30 choice of preferred event to settle down, this task performs data quality 

31 checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls 

32 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_alert` to send 

33 a preliminary notice. 

34 """ 

35 superevent_id = alert['uid'] 

36 # launch PE and detchar based on new type superevents 

37 if alert['alert_type'] == 'new': 

38 # launching rapidpe 30s after merger. 

39 timeout = max( 

40 alert['object']['t_0'] - Time.now().gps + 

41 app.conf['rapidpe_timeout'], 0 

42 ) 

43 ( 

44 gracedb.get_superevent.si(superevent_id).set( 

45 countdown=timeout 

46 ) 

47 | 

48 _get_pe_far_and_event.s() 

49 | 

50 parameter_estimation.s(superevent_id, 'rapidpe') 

51 ).apply_async() 

52 

53 ( 

54 gracedb.get_superevent.si(superevent_id).set( 

55 countdown=app.conf['pe_timeout'] 

56 ) 

57 | 

58 _get_pe_far_and_event.s() 

59 | 

60 parameter_estimation.s(superevent_id, 'bilby') 

61 ).apply_async() 

62 

63 # run check_vectors. Create and upload omegascans 

64 group( 

65 detchar.omegascan.si(alert['object']['t_0'], superevent_id), 

66 

67 detchar.check_vectors.si( 

68 alert['object']['preferred_event_data'], 

69 superevent_id, 

70 alert['object']['t_start'], 

71 alert['object']['t_end'] 

72 ) 

73 ).delay() 

74 

75 elif alert['alert_type'] == 'label_added': 

76 label_name = alert['data']['name'] 

77 query = f'superevent: {superevent_id} group: CBC Burst' 

78 if alert['object']['category'] == 'MDC': 

79 query += ' MDC' 

80 elif alert['object']['category'] == 'Test': 

81 query += ' Test' 

82 

83 # launch less-significant preliminary alerts on LOW_SIGNIF_LOCKED 

84 if label_name == superevents.FROZEN_LABEL: 

85 # don't launch if EARLY_WARNING or SIGNIF_LOCKED is present 

86 skipping_labels = { 

87 superevents.SIGNIFICANT_LABEL, 

88 superevents.EARLY_WARNING_LABEL 

89 }.intersection(alert['object']['labels']) 

90 if skipping_labels: 

91 gracedb.upload.delay( 

92 None, None, superevent_id, 

93 "The superevent already has a significant/EW event, " 

94 "skipping launching less-significant alert" 

95 ) 

96 return 

97 ( 

98 gracedb.upload.s( 

99 None, 

100 None, 

101 superevent_id, 

102 "Automated DQ check before sending less-significant " 

103 "preliminary alert. New results supersede old results.", 

104 tags=['data_quality'] 

105 ) 

106 | 

107 detchar.check_vectors.si( 

108 alert['object']['preferred_event_data'], 

109 superevent_id, 

110 alert['object']['t_start'], 

111 alert['object']['t_end'] 

112 ) 

113 | 

114 earlywarning_preliminary_alert.s( 

115 alert, alert_type='less-significant') 

116 ).apply_async() 

117 

118 # launch significant alert on SIGNIF_LOCKED 

119 elif label_name == superevents.SIGNIFICANT_LABEL: 

120 # ensure superevent is locked before starting alert workflow 

121 r = chain() 

122 if superevents.FROZEN_LABEL not in alert['object']['labels']: 

123 r |= gracedb.create_label.si(superevents.FROZEN_LABEL, 

124 superevent_id) 

125 

126 r |= ( 

127 gracedb.get_events.si(query) 

128 | 

129 superevents.select_preferred_event.s() 

130 | 

131 _update_superevent_and_return_event_dict.s(superevent_id) 

132 | 

133 _leave_log_message_and_return_event_dict.s( 

134 superevent_id, 

135 "Superevent cleaned up after significant event. " 

136 ) 

137 | 

138 _leave_log_message_and_return_event_dict.s( 

139 superevent_id, 

140 "Automated DQ check before sending significant alert. " 

141 "New results supersede old results.", 

142 tags=['data_quality'] 

143 ) 

144 | 

145 detchar.check_vectors.s( 

146 superevent_id, 

147 alert['object']['t_start'], 

148 alert['object']['t_end'] 

149 ) 

150 | 

151 earlywarning_preliminary_alert.s( 

152 alert, alert_type='preliminary') 

153 ) 

154 r.apply_async() 

155 

156 # launch second preliminary on GCN_PRELIM_SENT 

157 elif label_name == 'GCN_PRELIM_SENT': 

158 ( 

159 identity.si().set( 

160 # https://git.ligo.org/emfollow/gwcelery/-/issues/478 

161 # FIXME: remove this task once https://github.com/celery/celery/issues/7851 is resolved # noqa: E501 

162 countdown=app.conf['superevent_clean_up_timeout'] 

163 ) 

164 | 

165 gracedb.get_events.si(query) 

166 | 

167 superevents.select_preferred_event.s() 

168 | 

169 _update_superevent_and_return_event_dict.s(superevent_id) 

170 | 

171 group( 

172 _leave_log_message_and_return_event_dict.s( 

173 superevent_id, 

174 "Superevent cleaned up after first preliminary alert" 

175 ), 

176 

177 gracedb.create_label.si('DQR_REQUEST', superevent_id) 

178 ) 

179 | 

180 get_first.s() 

181 | 

182 earlywarning_preliminary_alert.s( 

183 alert, alert_type='preliminary') 

184 ).apply_async() 

185 

186 # set pipeline preferred events 

187 # FIXME: Ideally this should combined with the previous canvas. 

188 # However, incorporating that group prevents canvas from executing 

189 # maybe related to https://github.com/celery/celery/issues/7851 

190 ( 

191 gracedb.get_events.si(query) 

192 | 

193 superevents.select_pipeline_preferred_event.s() 

194 | 

195 _set_pipeline_preferred_events.s(superevent_id) 

196 ).apply_async(countdown=app.conf['superevent_clean_up_timeout']) 

197 

198 elif label_name == 'LOW_SIGNIF_PRELIM_SENT': 

199 # similar workflow as the GCN_PRELIM_SENT 

200 # except block by condition evaluated at the end of timeout 

201 _revise_and_send_second_less_significant_alert.si( 

202 alert, query, superevent_id, 

203 ).apply_async(countdown=app.conf['superevent_clean_up_timeout']) 

204 

205 elif label_name == superevents.EARLY_WARNING_LABEL: 

206 if superevents.SIGNIFICANT_LABEL in alert['object']['labels']: 

207 # stop if full BW significant event already present 

208 gracedb.upload.delay( 

209 None, None, superevent_id, 

210 "Superevent superseded by full BW event, skipping EW." 

211 ) 

212 return 

213 # start the EW alert pipeline; is blocked by SIGNIF_LOCKED 

214 # ensure superevent is locked before starting pipeline 

215 r = chain() 

216 if superevents.FROZEN_LABEL not in alert['object']['labels']: 

217 r |= gracedb.create_label.si(superevents.FROZEN_LABEL, 

218 superevent_id) 

219 

220 r |= ( 

221 gracedb.get_events.si(query) 

222 | 

223 superevents.select_preferred_event.s() 

224 | 

225 _update_superevent_and_return_event_dict.s(superevent_id) 

226 | 

227 _leave_log_message_and_return_event_dict.s( 

228 superevent_id, 

229 "Superevent cleaned up before sending EW alert." 

230 ) 

231 | 

232 earlywarning_preliminary_alert.s( 

233 alert, alert_type='earlywarning') 

234 ) 

235 r.apply_async() 

236 

237 # launch initial/retraction alert on ADVOK/ADVNO 

238 elif label_name == 'ADVOK': 

239 initial_alert((None, None, None), alert) 

240 elif label_name == 'ADVNO': 

241 retraction_alert(alert) 

242 elif label_name == 'ADVREQ': 

243 if app.conf['create_mattermost_channel']: 

244 _create_mattermost_channel.si(superevent_id).delay() 

245 

246 # check DQV label on superevent, run check_vectors if required 

247 elif alert['alert_type'] == 'event_added': 

248 # FIXME Check if this should be changed to 'update' alert_types instead 

249 # of 'event_added'. 'event_added' seems like it's just a new event in 

250 # the superevent window, not necessarily an event that should be 

251 # promoted to preferred event 

252 start = alert['data']['t_start'] 

253 end = alert['data']['t_end'] 

254 

255 if 'DQV' in gracedb.get_labels(superevent_id): 

256 ( 

257 detchar.check_vectors.s( 

258 alert['object']['preferred_event_data'], 

259 superevent_id, 

260 start, 

261 end 

262 ) 

263 | 

264 _update_if_dqok.s(superevent_id) 

265 ).apply_async() 

266 

267 

268@igwn_alert.handler('cbc_gstlal', 

269 'cbc_spiir', 

270 'cbc_pycbc', 

271 'cbc_mbta', 

272 shared=False) 

273def handle_cbc_event(alert): 

274 """Perform annotations for CBC events that depend on pipeline-specific 

275 matched-filter parameter estimates. 

276 

277 Notes 

278 ----- 

279 This IGWN alert message handler is triggered by a new upload or by updates 

280 that include the file ``pipeline.p_astro.json``. If also generates 

281 pipeline.p_astro.json information for pipelines that do not provide 

282 such information. 

283 

284 The table below lists which files are created as a result of a new upload, 

285 and which tasks generate them. 

286 

287 ============================== ================================================== 

288 File Task 

289 ============================== ================================================== 

290 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize` 

291 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.source_properties` 

292 ``pipeline.p_astro.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro` 

293 ============================== ================================================== 

294 

295 """ # noqa: E501 

296 graceid = alert['uid'] 

297 pipeline = alert['object']['pipeline'].lower() 

298 search = alert['object']['search'].lower() 

299 

300 # no annotations for events used in VT analysis 

301 if search == superevents.VT_SEARCH_NAME.lower(): 

302 return 

303 

304 priority = 0 if superevents.should_publish(alert['object']) else 1 

305 

306 # Pipelines that use the GWCelery p-astro method 

307 # - spiir (all searches) 

308 # - pycbc for EarlyWarning search 

309 # - periodic MDC generated by first-two-years (based on gstlal) 

310 # FIXME: Remove this once all pipelines compute their own p-astro 

311 pipelines_stock_p_astro = {('spiir', 'earlywarning'), 

312 ('pycbc', 'earlywarning'), 

313 ('gstlal', 'mdc')} 

314 

315 # em_bright and p_astro calculation 

316 if alert['alert_type'] == 'new': 

317 instruments = superevents.get_instruments_in_ranking_statistic( 

318 alert['object']) 

319 extra_attributes = alert['object']['extra_attributes'] 

320 snr = superevents.get_snr(alert['object']) 

321 far = alert['object']['far'] 

322 mass1 = extra_attributes['SingleInspiral'][0]['mass1'] 

323 mass2 = extra_attributes['SingleInspiral'][0]['mass2'] 

324 chi1 = extra_attributes['SingleInspiral'][0]['spin1z'] 

325 chi2 = extra_attributes['SingleInspiral'][0]['spin2z'] 

326 

327 ( 

328 em_bright.source_properties.si(mass1, mass2, chi1, chi2, snr, 

329 pipeline=pipeline, search=search) 

330 | 

331 gracedb.upload.s( 

332 'em_bright.json', graceid, 

333 'em bright complete', ['em_bright', 'public'] 

334 ) 

335 | 

336 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

337 ).apply_async(priority=priority) 

338 

339 # p_astro calculation for pipelines that does not provide a 

340 # stock p_astro (upload pipeline.p_astro.json) 

341 if (pipeline, search) in pipelines_stock_p_astro: 

342 ( 

343 p_astro.compute_p_astro.s(snr, 

344 far, 

345 mass1, 

346 mass2, 

347 pipeline, 

348 instruments) 

349 | 

350 gracedb.upload.s( 

351 f'{pipeline}.p_astro.json', graceid, 

352 'p_astro computation complete', ['p_astro', 'public'] 

353 ) 

354 | 

355 gracedb.create_label.si('PASTRO_READY', graceid) 

356 ).apply_async(priority=priority) 

357 

358 # Start BAYESTAR for all CBC pipelines. 

359 ( 

360 gracedb.download.s('coinc.xml', graceid) 

361 | 

362 bayestar.localize.s(graceid) 

363 | 

364 gracedb.upload.s( 

365 'bayestar.multiorder.fits', graceid, 

366 'sky localization complete', ['sky_loc', 'public'] 

367 ) 

368 | 

369 gracedb.create_label.si('SKYMAP_READY', graceid) 

370 ).apply_async(priority=priority) 

371 

372 

373@igwn_alert.handler('burst_olib', 

374 'burst_cwb', 

375 'burst_mly', 

376 shared=False) 

377def handle_burst_event(alert): 

378 """Perform annotations for burst events that depend on pipeline-specific 

379 """ # noqa: E501 

380 graceid = alert['uid'] 

381 pipeline = alert['object']['pipeline'].lower() 

382 search = alert['object']['search'].lower() 

383 priority = 0 if superevents.should_publish(alert['object']) else 1 

384 

385 # em_bright calculation for Burst-cWB-BBH 

386 if alert['alert_type'] == 'new': 

387 if (pipeline, search) in [('cwb', 'bbh')]: 

388 extra_attributes = alert['object']['extra_attributes'] 

389 multiburst = extra_attributes['MultiBurst'] 

390 snr = multiburst.get('snr') 

391 mchirp = multiburst.get('mchirp', 0.0) 

392 # FIXME once ingestion of mchip is finalised 

393 # In case mchirp is not there or zero 

394 # produce em_brigth with all zero 

395 if mchirp == 0.0: 

396 m12 = 30.0 

397 else: 

398 m12 = 2**(0.2) * mchirp 

399 ( 

400 em_bright.source_properties.si(m12, m12, 0.0, 0.0, snr) 

401 | 

402 gracedb.upload.s( 

403 'em_bright.json', graceid, 

404 'em bright complete', ['em_bright', 'public'] 

405 ) 

406 | 

407 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

408 ).apply_async(priority=priority) 

409 

410 if alert['alert_type'] != 'log': 

411 return 

412 

413 filename = alert['data']['filename'] 

414 

415 # Pipeline is uploading a flat resultion skymap file 

416 # Converting to a multiorder ones with proper name. 

417 # FIXME: Remove block when CWB starts to upload skymaps 

418 # in multiorder format 

419 if filename.endswith('.fits.gz'): 

420 new_filename = filename.replace('.fits.gz', '.multiorder.fits') 

421 flatten_msg = ( 

422 'Multi-resolution FITS file created from ' 

423 '<a href="/api/events/{graceid}/files/' 

424 '{filename}">{filename}</a>').format( 

425 graceid=graceid, filename=filename) 

426 tags = ['sky_loc', 'lvem', 'public'] 

427 ( 

428 gracedb.download.si(filename, graceid) 

429 | 

430 skymaps.unflatten.s(new_filename) 

431 | 

432 gracedb.upload.s( 

433 new_filename, graceid, flatten_msg, tags) 

434 | 

435 gracedb.create_label.si('SKYMAP_READY', graceid) 

436 ).apply_async(priority=priority) 

437 

438 

439@igwn_alert.handler('superevent', 

440 'mdc_superevent', 

441 shared=False) 

442def handle_posterior_samples(alert): 

443 """Generate multi-resolution and flat-resolution FITS files and skymaps 

444 from an uploaded HDF5 file containing posterior samples. 

445 """ 

446 if alert['alert_type'] != 'log' or \ 

447 not alert['data']['filename'].endswith('.posterior_samples.hdf5'): 

448 return 

449 superevent_id = alert['uid'] 

450 filename = alert['data']['filename'] 

451 info = '{} {}'.format(alert['data']['comment'], filename) 

452 prefix, _ = filename.rsplit('.posterior_samples.') 

453 skymap_filename = f'{prefix}.multiorder.fits' 

454 labels = ['pe', 'sky_loc'] 

455 ifos = superevents.get_instruments(alert['object']['preferred_event_data']) 

456 

457 ( 

458 gracedb.download.si(filename, superevent_id) 

459 | 

460 skymaps.skymap_from_samples.s(superevent_id, ifos) 

461 | 

462 group( 

463 skymaps.annotate_fits.s( 

464 skymap_filename, superevent_id, labels 

465 ), 

466 

467 gracedb.upload.s( 

468 skymap_filename, superevent_id, 

469 'Multiresolution FITS file generated from "{}"'.format(info), 

470 labels 

471 ) 

472 ) 

473 ).delay() 

474 

475 # em_bright from LALInference posterior samples 

476 ( 

477 gracedb.download.si(filename, superevent_id) 

478 | 

479 em_bright.em_bright_posterior_samples.s() 

480 | 

481 gracedb.upload.s( 

482 '{}.em_bright.json'.format(prefix), superevent_id, 

483 'em-bright computed from "{}"'.format(info), 

484 'pe' 

485 ) 

486 ).delay() 

487 

488 

489@app.task(bind=True, shared=False) 

490def _create_mattermost_channel(self, superevent_id): 

491 """ 

492 Creates a mattermost channel when ADVREQ label is applied and 

493 posts a cooresponding gracedb link of that event in the channel 

494 

495 Channel name : O4 RRT {superevent_id} 

496 

497 Parameters: 

498 ------------ 

499 superevent_id: str 

500 The superevent id 

501 """ 

502 gracedb_url = self.app.conf['gracedb_host'] 

503 channel_creation.rrt_channel_creation( 

504 superevent_id, gracedb_url) 

505 

506 

507@app.task(shared=False) 

508def _set_pipeline_preferred_events(pipeline_event, superevent_id): 

509 """Return group for setting pipeline preferred event using 

510 :meth:`gracedb.add_pipeline_preferred_event`. 

511 

512 Parameters 

513 ---------- 

514 pipeline_event: dict 

515 {pipeline: event_dict} key value pairs, returned by 

516 :meth:`superevents.select_pipeline_preferred_event`. 

517 

518 superevent_id: str 

519 The superevent id 

520 """ 

521 return group( 

522 gracedb.add_pipeline_preferred_event(superevent_id, 

523 event['graceid']) 

524 for event in pipeline_event.values() 

525 ) 

526 

527 

528@app.task(shared=False, ignore_result=True) 

529def _update_if_dqok(event, superevent_id): 

530 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK` 

531 label has been applied. 

532 """ 

533 if 'DQOK' in gracedb.get_labels(superevent_id): 

534 event_id = event['graceid'] 

535 gracedb.update_superevent(superevent_id, 

536 preferred_event=event_id, 

537 t_0=event["gpstime"]) 

538 gracedb.upload.delay( 

539 None, None, superevent_id, 

540 comment=f'DQOK applied based on new event {event_id}') 

541 

542 

543@gracedb.task(shared=False) 

544def _create_voevent(classification, *args, **kwargs): 

545 r"""Create a VOEvent record from an EM bright JSON file. 

546 

547 Parameters 

548 ---------- 

549 classification : tuple, None 

550 A collection of JSON strings, generated by 

551 :meth:`gwcelery.tasks.em_bright.source_properties` and 

552 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or 

553 content of ``{gstlal,mbta}.p_astro.json`` uploaded 

554 by {gstlal,mbta} respectively; or None 

555 \*args 

556 Additional positional arguments passed to 

557 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

558 \*\*kwargs 

559 Additional keyword arguments passed to 

560 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

561 

562 Returns 

563 ------- 

564 str 

565 The filename of the newly created VOEvent. 

566 

567 """ 

568 kwargs = dict(kwargs) 

569 

570 if classification is not None: 

571 # Merge source classification and source properties into kwargs. 

572 for text in classification: 

573 # Ignore filenames, only load dict in bytes form 

574 if text is not None: 

575 kwargs.update(json.loads(text)) 

576 

577 # FIXME: These keys have differ between em_bright.json 

578 # and the GraceDB REST API. 

579 try: 

580 kwargs['ProbHasNS'] = kwargs.pop('HasNS') 

581 except KeyError: 

582 pass 

583 

584 try: 

585 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant') 

586 except KeyError: 

587 pass 

588 

589 skymap_filename = kwargs.get('skymap_filename') 

590 if skymap_filename is not None: 

591 skymap_type = re.sub( 

592 r'(\.multiorder)?\.fits(\..+)?(,[0-9]+)?$', '', skymap_filename) 

593 kwargs.setdefault('skymap_type', skymap_type) 

594 

595 # FIXME: remove ._orig_run when this bug is fixed: 

596 # https://github.com/getsentry/sentry-python/issues/370 

597 return gracedb.create_voevent._orig_run(*args, **kwargs) 

598 

599 

600@app.task(shared=False) 

601def _create_label_and_return_filename(filename, label, graceid): 

602 gracedb.create_label.delay(label, graceid) 

603 return filename 

604 

605 

606@app.task(shared=False) 

607def _leave_log_message_and_return_event_dict(event, superevent_id, 

608 message, **kwargs): 

609 """Wrapper around :meth:`gracedb.upload` 

610 that returns the event dictionary. 

611 """ 

612 gracedb.upload.delay(None, None, superevent_id, message, **kwargs) 

613 return event 

614 

615 

616@gracedb.task(shared=False) 

617def _update_superevent_and_return_event_dict(event, superevent_id): 

618 """Wrapper around :meth:`gracedb.update_superevent` 

619 that returns the event dictionary. 

620 """ 

621 gracedb.update_superevent(superevent_id, 

622 preferred_event=event['graceid'], 

623 t_0=event['gpstime']) 

624 return event 

625 

626 

627@gracedb.task(shared=False) 

628def _proceed_if_not_blocked_by(files, superevent_id, block_by): 

629 """Return files in case the superevent does not have labels `block_by` 

630 

631 Parameters 

632 ---------- 

633 files : tuple 

634 List of files 

635 superevent_id : str 

636 The superevent id corresponding to files 

637 block_by : set 

638 Set of blocking labels. E.g. `{'ADVOK', 'ADVNO'}` 

639 """ 

640 superevent_labels = gracedb.get_labels(superevent_id) 

641 blocking_labels = block_by.intersection(superevent_labels) 

642 if blocking_labels: 

643 gracedb.upload.delay( 

644 None, None, superevent_id, 

645 f"Blocking automated notice due to labels {blocking_labels}" 

646 ) 

647 return None 

648 else: 

649 return files 

650 

651 

652@gracedb.task(shared=False) 

653def _revise_and_send_second_less_significant_alert(alert, query, 

654 superevent_id): 

655 superevent_labels = gracedb.get_labels(superevent_id) 

656 blocking_labels = { 

657 'ADVREQ', 'ADVOK', 'ADVNO', 

658 superevents.SIGNIFICANT_LABEL, 

659 superevents.EARLY_WARNING_LABEL, 

660 } 

661 if blocking_labels.intersection(superevent_labels): 

662 return 

663 

664 ( 

665 gracedb.get_events.si(query) 

666 | 

667 superevents.select_preferred_event.s() 

668 | 

669 _update_superevent_and_return_event_dict.s(superevent_id) 

670 | 

671 _leave_log_message_and_return_event_dict.s( 

672 superevent_id, 

673 "Superevent cleaned up before second less-significant alert" 

674 ) 

675 | 

676 earlywarning_preliminary_alert.s( 

677 alert, alert_type='less-significant') 

678 ).delay() 

679 

680 # set pipeline preferred events 

681 # FIXME: Ideally this should combined with the previous canvas. 

682 # However, incorporating that group prevents canvas from executing 

683 # maybe related to https://github.com/celery/celery/issues/7851 

684 ( 

685 gracedb.get_events.si(query) 

686 | 

687 superevents.select_pipeline_preferred_event.s() 

688 | 

689 _set_pipeline_preferred_events.s(superevent_id) 

690 ).delay() 

691 

692 

693@app.task(shared=False) 

694def _annotate_fits_and_return_input(input_list, superevent_id): 

695 """Unpack the output of the skymap, embright, p-astro download group in the 

696 beginning of the 

697 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas 

698 and call :meth:`~gwcelery.tasks.skymaps.annotate_fits`. 

699 

700 

701 Parameters 

702 ---------- 

703 input_list : list 

704 The output of the group that downloads the skymap, embright, and 

705 p-astro files. This list is in the form [skymap, skymap_filename], 

706 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename], 

707 though the em-bright and p-astro lists can be populated by Nones 

708 superevent_id : str 

709 A list of the sky map, em_bright, and p_astro filenames. 

710 """ 

711 

712 skymaps.annotate_fits_tuple( 

713 input_list[0], 

714 superevent_id, 

715 ['sky_loc', 'public'] 

716 ) 

717 

718 return input_list 

719 

720 

721@gracedb.task(shared=False) 

722def _unpack_args_and_send_earlywarning_preliminary_alert(input_list, alert, 

723 alert_type): 

724 """Unpack the output of the skymap, embright, p-astro download group in the 

725 beginning of the 

726 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas 

727 and call 

728 :meth:`gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`. 

729 

730 

731 Parameters 

732 ---------- 

733 input_list : list 

734 The output of the group that downloads the skymap, embright, and 

735 p-astro files. This list is in the form [skymap, skymap_filename], 

736 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename], 

737 though the em-bright and p-astro lists can be populated by Nones 

738 alert : dict 

739 IGWN-Alert dictionary 

740 alert_type : str 

741 alert_type passed to 

742 :meth:`earlywarning_preliminary_initial_update_alert` 

743 """ 

744 if input_list is None: # alert is blocked by blocking labels 

745 return 

746 

747 [skymap, skymap_filename], [em_bright, em_bright_filename], \ 

748 [p_astro_dict, p_astro_filename] = input_list 

749 

750 # Update to latest state after downloading files 

751 superevent = gracedb.get_superevent(alert['object']['superevent_id']) 

752 

753 earlywarning_preliminary_initial_update_alert.delay( 

754 [skymap_filename, em_bright_filename, p_astro_filename], 

755 superevent, alert_type, 

756 filecontents=[skymap, em_bright, p_astro_dict] 

757 ) 

758 

759 

760@app.task(ignore_result=True, shared=False) 

761def earlywarning_preliminary_alert(event, alert, alert_type='preliminary', 

762 initiate_voevent=True): 

763 """Produce a preliminary alert by copying any sky maps. 

764 

765 This consists of the following steps: 

766 

767 1. Copy any sky maps and source classification from the preferred event 

768 to the superevent. 

769 2. Create standard annotations for sky maps including all-sky plots by 

770 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`. 

771 3. Create a preliminary VOEvent. 

772 4. Send the VOEvent to GCN and notices to SCiMMA and GCN. 

773 5. Apply the GCN_PRELIM_SENT or LOW_SIGNIF_PRELIM_SENT 

774 depending on the significant or less-significant alert 

775 respectively. 

776 6. Create and upload a GCN Circular draft. 

777 """ 

778 priority = 0 if superevents.should_publish(event) else 1 

779 preferred_event_id = event['graceid'] 

780 superevent_id = alert['uid'] 

781 

782 # Define alert payloads depending on group-pipeline-search 

783 # cbc-*-* : p_astro/em_bright 

784 # burst.cwb-bbh : p_astro/em_bright 

785 # burst-*-* : NO p_astro/em_bright 

786 alert_group = event['group'].lower() 

787 alert_pipeline = event['pipeline'].lower() 

788 alert_search = event['search'].lower() 

789 if alert_pipeline == 'cwb' and alert_search == 'bbh': 

790 skymap_filename = alert_pipeline + '.multiorder.fits' 

791 p_astro_filename = alert_pipeline + '.p_astro.json' 

792 em_bright_filename = 'em_bright.json' 

793 elif alert_group == 'cbc': 

794 skymap_filename = 'bayestar.multiorder.fits' 

795 p_astro_filename = alert_pipeline + '.p_astro.json' \ 

796 if alert_search != 'ssm' else None 

797 em_bright_filename = 'em_bright.json' 

798 elif alert_group == 'burst': 

799 skymap_filename = event['pipeline'].lower() + '.multiorder.fits' 

800 p_astro_filename = None 

801 em_bright_filename = None 

802 else: 

803 raise NotImplementedError( 

804 'Valid skymap required for preliminary alert' 

805 ) 

806 

807 # Determine if the event should be made public. 

808 is_publishable = (superevents.should_publish( 

809 event, significant=alert_type != 'less-significant') and 

810 {'DQV', 'INJ'}.isdisjoint( 

811 event['labels'])) 

812 

813 # Download files from events and upload to superevent with relevant 

814 # annotations. Pass file contents down the chain so alerts task doesn't 

815 # need to download them again. 

816 # Note: this is explicitly made a chain to fix an issue described in #464. 

817 canvas = chain( 

818 group( 

819 gracedb.download.si(skymap_filename, preferred_event_id) 

820 | 

821 group( 

822 identity.s(), 

823 

824 gracedb.upload.s( 

825 skymap_filename, 

826 superevent_id, 

827 message='Localization copied from {}'.format( 

828 preferred_event_id), 

829 tags=['sky_loc', 'public'] 

830 ) 

831 | 

832 _create_label_and_return_filename.s('SKYMAP_READY', 

833 superevent_id) 

834 ), 

835 

836 ( 

837 gracedb.download.si(em_bright_filename, preferred_event_id) 

838 | 

839 group( 

840 identity.s(), 

841 

842 gracedb.upload.s( 

843 em_bright_filename, 

844 superevent_id, 

845 message='Source properties copied from {}'.format( 

846 preferred_event_id), 

847 tags=['em_bright', 'public'] 

848 ) 

849 | 

850 _create_label_and_return_filename.s('EMBRIGHT_READY', 

851 superevent_id) 

852 ) 

853 ) if em_bright_filename is not None else 

854 identity.s([None, None]), 

855 

856 ( 

857 gracedb.download.si(p_astro_filename, preferred_event_id) 

858 | 

859 group( 

860 identity.s(), 

861 

862 gracedb.upload.s( 

863 p_astro_filename, 

864 superevent_id, 

865 message='Source classification copied from {}'.format( 

866 preferred_event_id), 

867 tags=['p_astro', 'public'] 

868 ) 

869 | 

870 _create_label_and_return_filename.s('PASTRO_READY', 

871 superevent_id) 

872 ) 

873 ) if p_astro_filename is not None else 

874 identity.s([None, None]) 

875 ) 

876 | 

877 # Need annotate skymap task in body of chord instead of header because 

878 # this task simply calls another task, which is to be avoided in chord 

879 # headers. Note that any group that chains to a task is automatically 

880 # upgraded to a chord. 

881 _annotate_fits_and_return_input.s(superevent_id) 

882 ) 

883 

884 # Switch for disabling all but MDC alerts. 

885 if app.conf['only_alert_for_mdc']: 

886 if event.get('search') != 'MDC': 

887 canvas |= gracedb.upload.si( 

888 None, None, superevent_id, 

889 ("Skipping alert because gwcelery has been configured to only" 

890 " send alerts for MDC events.")) 

891 canvas.apply_async(priority=priority) 

892 return 

893 

894 # Send notice and upload GCN circular draft for online events. 

895 if is_publishable and initiate_voevent: 

896 # presence of advocate action blocks significant prelim alert 

897 # presence of adv action or significant event blocks EW alert 

898 # presence of adv action or significant event or EW event blocks 

899 # less significant alert 

900 if alert_type == 'earlywarning': 

901 # Launch DQR for significant early warning events after a timeout. 

902 # If a full BW significant trigger arrives before the end of the 

903 # timeout, the latter will apply the label instead, and this call 

904 # is a noop. 

905 gracedb.create_label.si( 

906 'DQR_REQUEST', 

907 superevent_id 

908 ).apply_async(countdown=600) 

909 blocking_labels = ( 

910 {'ADVOK', 'ADVNO'} if alert_type == 'preliminary' 

911 else 

912 {superevents.SIGNIFICANT_LABEL, 'ADVOK', 'ADVNO'} 

913 if alert_type == 'earlywarning' 

914 else 

915 {superevents.EARLY_WARNING_LABEL, superevents.SIGNIFICANT_LABEL, 

916 'ADVOK', 'ADVNO'} 

917 if alert_type == 'less-significant' 

918 else 

919 set() 

920 ) 

921 canvas |= ( 

922 _proceed_if_not_blocked_by.s(superevent_id, blocking_labels) 

923 | 

924 _unpack_args_and_send_earlywarning_preliminary_alert.s( 

925 alert, alert_type 

926 ) 

927 ) 

928 

929 canvas.apply_async(priority=priority) 

930 

931 

932@gracedb.task(shared=False) 

933def _get_pe_far_and_event(superevent): 

934 """Return FAR and event input to PE workflow. 

935 

936 The input FAR is the lowest FAR among CBC and Burst-BBH triggers. 

937 The input event is the preferred event if it is a CBC trigger, otherwise 

938 the CBC trigger with the lowest FAR is returned. 

939 """ 

940 # FIXME: remove ._orig_run when this bug is fixed: 

941 # https://github.com/getsentry/sentry-python/issues/370 

942 events = [ 

943 gracedb.get_event._orig_run(gid) for gid in superevent['gw_events'] 

944 ] 

945 events = [ 

946 e for e in events if e['group'].lower() == 'cbc' or ( 

947 e['group'].lower() == 'burst' and 

948 e.get('search', 'None').lower() == 'bbh' 

949 ) 

950 ] 

951 events.sort(key=lambda e: e['far']) 

952 preferred_event = superevent['preferred_event_data'] 

953 

954 if preferred_event['group'].lower() == 'cbc': 

955 return events[0]['far'], preferred_event 

956 for e in events: 

957 if e['group'].lower() == 'cbc': 

958 return events[0]['far'], e 

959 return None 

960 

961 

962@app.task(ignore_result=True, shared=False) 

963def parameter_estimation(far_event, superevent_id, pe_pipeline): 

964 """Parameter Estimation with Bilby and RapidPE-RIFT. Parameter estimation 

965 runs are triggered for CBC triggers which pass the FAR threshold and are 

966 not mock uploads. For those which do not pass these criteria, this task 

967 uploads messages explaining why parameter estimation is not started. 

968 """ 

969 if far_event is None: 

970 gracedb.upload.delay( 

971 filecontents=None, filename=None, 

972 graceid=superevent_id, 

973 message='Parameter estimation will not start since no CBC triggers' 

974 ' are found.', 

975 tags='pe' 

976 ) 

977 return 

978 far, event = far_event 

979 search = event['search'].lower() 

980 if search in app.conf['significant_alert_far_threshold']['cbc']: 

981 threshold = ( 

982 app.conf['significant_alert_far_threshold']['cbc'][search] / 

983 app.conf['significant_alert_trials_factor']['cbc'][search] 

984 ) 

985 else: 

986 # Fallback in case an event is uploaded to an unlisted search 

987 threshold = -1 * float('inf') 

988 if far > threshold: 

989 gracedb.upload.delay( 

990 filecontents=None, filename=None, 

991 graceid=superevent_id, 

992 message='Parameter estimation will not start since FAR is larger ' 

993 'than the PE threshold, {} Hz.'.format(threshold), 

994 tags='pe' 

995 ) 

996 elif search == 'mdc': 

997 gracedb.upload.delay( 

998 filecontents=None, filename=None, 

999 graceid=superevent_id, 

1000 message='Parameter estimation will not start since parameter ' 

1001 'estimation is disabled for mock uploads.', 

1002 tags='pe' 

1003 ) 

1004 elif event.get('offline', False): 

1005 gracedb.upload.delay( 

1006 filecontents=None, filename=None, 

1007 graceid=superevent_id, 

1008 message='Parameter estimation will not start since parameter ' 

1009 'estimation is disabled for OFFLINE events.', 

1010 tags='pe' 

1011 ) 

1012 elif ( 

1013 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' 

1014 and event['pipeline'] == 'MBTA' 

1015 ): 

1016 # FIXME: Remove this block once multiple channels can be handled on 

1017 # one gracedb instance 

1018 gracedb.upload.delay( 

1019 filecontents=None, filename=None, 

1020 graceid=superevent_id, 

1021 message='Parameter estimation is disabled for MBTA triggers ' 

1022 'on playground as MBTA analyses live data + online ' 

1023 'injections not O3 replay data + MDC injections', 

1024 tags='pe' 

1025 ) 

1026 elif pe_pipeline == 'rapidpe' and search == 'earlywarning': 

1027 # Remove this if rapidpe can ingest early warning events 

1028 gracedb.upload.delay( 

1029 filecontents=None, filename=None, 

1030 graceid=superevent_id, 

1031 message='Parameter estimation by RapidPE-RIFT is disabled for ' 

1032 'earlywarning triggers.', 

1033 tags='pe' 

1034 ) 

1035 else: 

1036 inference.start_pe.delay(event, superevent_id, pe_pipeline) 

1037 

1038 

1039@gracedb.task(shared=False) 

1040def earlywarning_preliminary_initial_update_alert( 

1041 filenames, 

1042 superevent, 

1043 alert_type, 

1044 filecontents=None 

1045): 

1046 """ 

1047 Create a canvas that sends an earlywarning, preliminary, initial, or update 

1048 notice. 

1049 

1050 Parameters 

1051 ---------- 

1052 filenames : tuple 

1053 A list of the sky map, em_bright, and p_astro filenames. 

1054 superevent : dict 

1055 The superevent dictionary, typically obtained from an IGWN Alert or 

1056 from querying GraceDB. 

1057 alert_type : {'less-significant', 'earlywarning', 'preliminary', 'initial', 'update'} # noqa: E501 

1058 The alert type. 

1059 

1060 Notes 

1061 ----- 

1062 Tasks that call this function should be decorated with 

1063 :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so 

1064 that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is 

1065 retried in the event of GraceDB API failures. If `EM_COINC` is in labels 

1066 will create a RAVEN circular. 

1067 

1068 """ 

1069 labels = superevent['labels'] 

1070 superevent_id = superevent['superevent_id'] 

1071 search = superevent['preferred_event_data']['search'].lower() 

1072 

1073 if 'INJ' in labels: 

1074 return 

1075 

1076 if filecontents: 

1077 assert alert_type in { 

1078 'less-significant', 'earlywarning', 'preliminary'} 

1079 

1080 skymap_filename, em_bright_filename, p_astro_filename = filenames 

1081 rapidpe_pastro_filename = None 

1082 rapidpe_pastro_needed = True 

1083 combined_skymap_filename = None 

1084 combined_skymap_needed = False 

1085 skymap_needed = (skymap_filename is None) 

1086 em_bright_needed = (em_bright_filename is None) 

1087 p_astro_needed = False if search == 'ssm' else (p_astro_filename is None) 

1088 raven_coinc = ('RAVEN_ALERT' in labels and bool(superevent['em_type'])) 

1089 if raven_coinc: 

1090 ext_labels = gracedb.get_labels(superevent['em_type']) 

1091 combined_skymap_needed = \ 

1092 {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(ext_labels)) 

1093 

1094 # FIXME: This if statement is always True, we should get rid of it. 

1095 if skymap_needed or em_bright_needed or p_astro_needed or \ 

1096 combined_skymap_needed or rapidpe_pastro_needed: 

1097 for message in gracedb.get_log(superevent_id): 

1098 t = message['tag_names'] 

1099 f = message['filename'] 

1100 v = message['file_version'] 

1101 fv = '{},{}'.format(f, v) 

1102 if not f: 

1103 continue 

1104 if skymap_needed \ 

1105 and {'sky_loc', 'public'}.issubset(t) \ 

1106 and f.endswith('.multiorder.fits') \ 

1107 and 'combined' not in f: 

1108 skymap_filename = fv 

1109 if em_bright_needed \ 

1110 and 'em_bright' in t \ 

1111 and f.endswith('.json'): 

1112 em_bright_filename = fv 

1113 if p_astro_needed \ 

1114 and {'p_astro', 'public'}.issubset(t) \ 

1115 and f.endswith('.json'): 

1116 p_astro_filename = fv 

1117 if combined_skymap_needed \ 

1118 and {'sky_loc', 'ext_coinc'}.issubset(t) \ 

1119 and f.startswith('combined-ext.') \ 

1120 and 'fit' in f: 

1121 combined_skymap_filename = fv 

1122 if rapidpe_pastro_needed \ 

1123 and {'p_astro', 'public'}.issubset(t) \ 

1124 and f == 'RapidPE_RIFT.p_astro.json': 

1125 rapidpe_pastro_filename = fv 

1126 

1127 if combined_skymap_needed: 

1128 # for every alert, copy combined sky map over if applicable 

1129 # FIXME: use file inheritance once available 

1130 ext_id = superevent['em_type'] 

1131 if combined_skymap_filename: 

1132 # If previous sky map, increase version by 1 

1133 combined_skymap_filename_base, v = \ 

1134 combined_skymap_filename.split(',') 

1135 v = str(int(v) + 1) 

1136 combined_skymap_filename = \ 

1137 combined_skymap_filename_base + ',' + v 

1138 else: 

1139 combined_skymap_filename_base = \ 

1140 (external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER 

1141 if '.multiorder.fits' in skymap_filename else 

1142 external_skymaps.COMBINED_SKYMAP_FILENAME_FLAT) 

1143 combined_skymap_filename = combined_skymap_filename_base + ',0' 

1144 message = 'Combined LVK-external sky map copied from {0}'.format( 

1145 ext_id) 

1146 message_png = ( 

1147 'Mollweide projection of <a href="/api/superevents/{se_id}/files/' 

1148 '{filename}">{filename}</a>, copied from {ext_id}').format( 

1149 se_id=superevent_id, 

1150 ext_id=ext_id, 

1151 filename=combined_skymap_filename) 

1152 

1153 combined_skymap_canvas = group( 

1154 gracedb.download.si(combined_skymap_filename_base, ext_id) 

1155 | 

1156 gracedb.upload.s( 

1157 combined_skymap_filename_base, superevent_id, 

1158 message, ['sky_loc', 'ext_coinc', 'public']) 

1159 | 

1160 gracedb.create_label.si('COMBINEDSKYMAP_READY', superevent_id), 

1161 

1162 gracedb.download.si(external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, 

1163 ext_id) 

1164 | 

1165 gracedb.upload.s( 

1166 external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, superevent_id, 

1167 message_png, ['sky_loc', 'ext_coinc', 'public'] 

1168 ) 

1169 | 

1170 # Pass None to download_anor_expose group 

1171 identity.si() 

1172 ) 

1173 

1174 # circular template not needed for less-significant alerts 

1175 if alert_type in {'earlywarning', 'preliminary', 'initial'}: 

1176 if raven_coinc: 

1177 circular_task = circulars.create_emcoinc_circular.si(superevent_id) 

1178 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type) 

1179 tags = ['em_follow', 'ext_coinc'] 

1180 

1181 else: 

1182 circular_task = circulars.create_initial_circular.si(superevent_id) 

1183 circular_filename = '{}-circular.txt'.format(alert_type) 

1184 tags = ['em_follow'] 

1185 

1186 circular_canvas = ( 

1187 circular_task 

1188 | 

1189 gracedb.upload.s( 

1190 circular_filename, 

1191 superevent_id, 

1192 'Template for {} GCN Circular'.format(alert_type), 

1193 tags=tags) 

1194 ) 

1195 

1196 else: 

1197 circular_canvas = identity.si() 

1198 

1199 # less-significant alerts have "preliminary" voevent notice type 

1200 alert_type_voevent = 'preliminary' if alert_type == 'less-significant' \ 

1201 else alert_type 

1202 # set the significant field in the VOEvent based on 

1203 # less-significant/significant alert. 

1204 # For kafka alerts the analogous field is set in alerts.py. 

1205 # (see comment before defining kafka_alert_canvas) 

1206 voevent_significance = 0 if alert_type == 'less-significant' else 1 

1207 

1208 if filecontents and not combined_skymap_filename: 

1209 skymap, em_bright, p_astro_dict = filecontents 

1210 

1211 # check high profile and apply label if true 

1212 if alert_type == 'preliminary': 

1213 high_profile_canvas = rrt_utils.check_high_profile.si( 

1214 skymap, em_bright, p_astro_dict, superevent 

1215 ) 

1216 else: 

1217 high_profile_canvas = identity.si() 

1218 

1219 download_andor_expose_group = [] 

1220 if rapidpe_pastro_filename is None: 

1221 voevent_canvas = _create_voevent.si( 

1222 (em_bright, p_astro_dict), 

1223 superevent_id, 

1224 alert_type_voevent, 

1225 Significant=voevent_significance, 

1226 skymap_filename=skymap_filename, 

1227 internal=False, 

1228 open_alert=True, 

1229 raven_coinc=raven_coinc, 

1230 combined_skymap_filename=combined_skymap_filename 

1231 ) 

1232 rapidpe_canvas = _update_rapidpe_pastro_shouldnt_run.s() 

1233 

1234 # kafka alerts have a field called "significant" based on 

1235 # https://dcc.ligo.org/LIGO-G2300151/public 

1236 # The alert_type value passed to alerts.send is used to 

1237 # set this field in the alert dictionary 

1238 kafka_alert_canvas = alerts.send.si( 

1239 (skymap, em_bright, p_astro_dict), 

1240 superevent, 

1241 alert_type, 

1242 raven_coinc=raven_coinc 

1243 ) 

1244 else: 

1245 voevent_canvas = _create_voevent.s( 

1246 superevent_id, 

1247 alert_type_voevent, 

1248 Significant=voevent_significance, 

1249 skymap_filename=skymap_filename, 

1250 internal=False, 

1251 open_alert=True, 

1252 raven_coinc=raven_coinc, 

1253 combined_skymap_filename=combined_skymap_filename 

1254 ) 

1255 download_andor_expose_group += [ 

1256 gracedb.download.si(rapidpe_pastro_filename, superevent_id) 

1257 ] 

1258 

1259 kafka_alert_canvas = _check_pastro_and_send_alert.s( 

1260 skymap, 

1261 em_bright, 

1262 superevent, 

1263 alert_type, 

1264 raven_coinc=raven_coinc 

1265 ) 

1266 

1267 rapidpe_canvas = ( 

1268 _update_rapidpe_pastro.s( 

1269 em_bright=em_bright, 

1270 pipeline_pastro=p_astro_dict) 

1271 | 

1272 _upload_rapidpe_pastro_json.s( 

1273 superevent_id, 

1274 rapidpe_pastro_filename 

1275 ) 

1276 ) 

1277 else: 

1278 # Download em_bright and p_astro files here for voevent 

1279 download_andor_expose_group = [ 

1280 gracedb.download.si(em_bright_filename, superevent_id) if 

1281 em_bright_filename is not None else identity.s(None), 

1282 ] 

1283 if search != 'ssm': 

1284 download_andor_expose_group += [ 

1285 gracedb.download.si(p_astro_filename, superevent_id) if 

1286 p_astro_filename is not None else identity.s(None) 

1287 ] 

1288 else: 

1289 # for SSM events skip downloading p-astro file 

1290 download_andor_expose_group += [identity.s(None)] 

1291 high_profile_canvas = identity.si() 

1292 

1293 voevent_canvas = _create_voevent.s( 

1294 superevent_id, 

1295 alert_type_voevent, 

1296 Significant=voevent_significance, 

1297 skymap_filename=skymap_filename, 

1298 internal=False, 

1299 open_alert=True, 

1300 raven_coinc=raven_coinc, 

1301 combined_skymap_filename=combined_skymap_filename 

1302 ) 

1303 

1304 if rapidpe_pastro_filename: 

1305 download_andor_expose_group += [ 

1306 gracedb.download.si(rapidpe_pastro_filename, superevent_id) 

1307 ] 

1308 

1309 rapidpe_canvas = ( 

1310 _update_rapidpe_pastro.s() 

1311 | 

1312 _upload_rapidpe_pastro_json.s( 

1313 superevent_id, 

1314 rapidpe_pastro_filename 

1315 ) 

1316 ) 

1317 

1318 # The skymap has not been downloaded at this point, so we need to 

1319 # download it before we can assemble the kafka alerts and send them 

1320 kafka_alert_canvas = alerts.download_skymap_and_send_alert.s( 

1321 superevent, 

1322 alert_type, 

1323 skymap_filename=skymap_filename, 

1324 raven_coinc=raven_coinc, 

1325 combined_skymap_filename=combined_skymap_filename 

1326 ) 

1327 

1328 to_expose = [skymap_filename, em_bright_filename, p_astro_filename] 

1329 # Since PE skymap images, HTML, and gzip FITS are not made public when they 

1330 # are uploaded, we need to expose them here. 

1331 if ( 

1332 skymap_filename is not None and 'bilby' in skymap_filename.lower() 

1333 ): 

1334 prefix, _, _ = skymap_filename.partition('.multiorder.fits') 

1335 to_expose += [f'{prefix}.html', f'{prefix}.png', 

1336 f'{prefix}.volume.png', f'{prefix}.fits.gz'] 

1337 download_andor_expose_group += [ 

1338 gracedb.expose.si(superevent_id), 

1339 *( 

1340 gracedb.create_tag.si(filename, 'public', superevent_id) 

1341 for filename in to_expose if filename is not None 

1342 ) 

1343 ] 

1344 

1345 voevent_canvas |= group( 

1346 gracedb.download.s(superevent_id) 

1347 | 

1348 gcn.send.s(), 

1349 

1350 gracedb.create_tag.s('public', superevent_id) 

1351 ) 

1352 

1353 if combined_skymap_needed: 

1354 download_andor_expose_group += [combined_skymap_canvas] 

1355 

1356 sent_label_canvas = identity.si() 

1357 if alert_type == 'less-significant': 

1358 sent_label_canvas = gracedb.create_label.si( 

1359 'LOW_SIGNIF_PRELIM_SENT', 

1360 superevent_id 

1361 ) 

1362 elif alert_type == 'preliminary': 

1363 sent_label_canvas = gracedb.create_label.si( 

1364 'GCN_PRELIM_SENT', 

1365 superevent_id 

1366 ) 

1367 

1368 # NOTE: The following canvas structure was used to fix #480 

1369 canvas = ( 

1370 group(download_andor_expose_group) 

1371 ) 

1372 if rapidpe_pastro_filename: 

1373 canvas |= rapidpe_canvas 

1374 

1375 canvas |= ( 

1376 group( 

1377 voevent_canvas 

1378 | 

1379 group( 

1380 circular_canvas, 

1381 

1382 sent_label_canvas 

1383 ), 

1384 

1385 kafka_alert_canvas, 

1386 

1387 high_profile_canvas 

1388 ) 

1389 ) 

1390 

1391 canvas.apply_async() 

1392 

1393 

1394@app.task(shared=False) 

1395def _update_rapidpe_pastro(input_list, em_bright=None, pipeline_pastro=None): 

1396 """ 

1397 If p_terr from rapidpe is different from the p_terr from the most 

1398 recent preferred event, replaces rapidpe's p_terr with pipeline p_terr. 

1399 Returns a tuple of em_bright, rapidpe pastro and a 

1400 boolean(rapidpe_pastro_updated) indicating if rapidpe pastro has been 

1401 updated. If p_terr in rapidpe has been updated, the return list contains 

1402 the updated pastro and the rapidpe_pastro_updated is True. Else, the 

1403 return list contains the rapidpe pastro from the input_list and 

1404 rapidpe_pastro_updated is False. 

1405 """ 

1406 # input_list is download_andor_expose_group in 

1407 # function earlywarning_preliminary_initial_update_alert 

1408 if pipeline_pastro is None: 

1409 em_bright, pipeline_pastro, rapidpe_pastro, *_ = input_list 

1410 else: 

1411 rapidpe_pastro, *_ = input_list 

1412 pipeline_pastro_contents = json.loads(pipeline_pastro) 

1413 rapidpe_pastro_contents = json.loads(rapidpe_pastro) 

1414 

1415 if (rapidpe_pastro_contents["Terrestrial"] 

1416 == pipeline_pastro_contents["Terrestrial"]): 

1417 rapidpe_pastro_updated = False 

1418 else: 

1419 rapidpe_pastro = json.dumps( 

1420 rpe_pastro.renormalize_pastro_with_pipeline_pterr( 

1421 rapidpe_pastro_contents, pipeline_pastro_contents 

1422 ) 

1423 ) 

1424 rapidpe_pastro_updated = True 

1425 

1426 return em_bright, rapidpe_pastro, rapidpe_pastro_updated 

1427 

1428 

1429@app.task(shared=False) 

1430def _update_rapidpe_pastro_shouldnt_run(): 

1431 raise RuntimeError( 

1432 "The `rapidpe_canvas' was executed where it should" 

1433 "not have been. A bug must have been introduced." 

1434 ) 

1435 

1436 

1437@gracedb.task(shared=False) 

1438def _upload_rapidpe_pastro_json( 

1439 input_list, 

1440 superevent_id, 

1441 rapidpe_pastro_filename 

1442): 

1443 """ 

1444 Add public tag to RapidPE_RIFT.p_astro.json if p_terr from the 

1445 preferred event is same as the p_terr in RapidPE_RIFT.p_astro.json. 

1446 Else, uploads an updated version of RapidPE_RIFT.p_astro.json 

1447 with file content from the task update_rapidpe_pastro. 

1448 """ 

1449 # input_list is output from update_rapidpe_pastro 

1450 *return_list, rapidpe_pastro_updated = input_list 

1451 if rapidpe_pastro_updated is True: 

1452 tags = ("pe", "p_astro", "public") 

1453 

1454 upload_filename = "RapidPE_RIFT.p_astro.json" 

1455 description = "RapidPE-RIFT Pastro results" 

1456 content = input_list[1] 

1457 gracedb.upload( 

1458 content, 

1459 upload_filename, 

1460 superevent_id, 

1461 description, 

1462 tags 

1463 ) 

1464 return return_list 

1465 

1466 

1467@app.task(shared=False) 

1468def _check_pastro_and_send_alert(input_classification, skymap, em_bright, 

1469 superevent, alert_type, raven_coinc=False): 

1470 """Wrapper for :meth:`~gwcelery.tasks.alerts.send` meant to take a 

1471 potentially new p-astro as input from the preceding task. 

1472 """ 

1473 _, p_astro = input_classification 

1474 alerts.send.delay( 

1475 (skymap, em_bright, p_astro), 

1476 superevent, 

1477 alert_type, 

1478 raven_coinc=raven_coinc 

1479 ) 

1480 

1481 

1482@gracedb.task(ignore_result=True, shared=False) 

1483def initial_alert(filenames, alert): 

1484 """Produce an initial alert. 

1485 

1486 This does nothing more than call 

1487 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert` 

1488 with ``alert_type='initial'``. 

1489 

1490 Parameters 

1491 ---------- 

1492 filenames : tuple 

1493 A list of the sky map, em_bright, and p_astro filenames. 

1494 alert : dict 

1495 IGWN-Alert dictionary 

1496 

1497 Notes 

1498 ----- 

1499 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

1500 than :obj:`gwcelery.app.task` so that a synchronous call to 

1501 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

1502 API failures. 

1503 

1504 """ 

1505 earlywarning_preliminary_initial_update_alert( 

1506 filenames, 

1507 alert['object'], 

1508 'initial' 

1509 ) 

1510 

1511 

1512@gracedb.task(ignore_result=True, shared=False) 

1513def update_alert(filenames, superevent_id): 

1514 """Produce an update alert. 

1515 

1516 This does nothing more than call 

1517 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert` 

1518 with ``alert_type='update'``. 

1519 

1520 Parameters 

1521 ---------- 

1522 filenames : tuple 

1523 A list of the sky map, em_bright, and p_astro filenames. 

1524 superevent_id : str 

1525 The superevent ID. 

1526 

1527 Notes 

1528 ----- 

1529 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

1530 than :obj:`gwcelery.app.task` so that a synchronous call to 

1531 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

1532 API failures. 

1533 

1534 """ 

1535 superevent = gracedb.get_superevent._orig_run(superevent_id) 

1536 earlywarning_preliminary_initial_update_alert( 

1537 filenames, 

1538 superevent, 

1539 'update' 

1540 ) 

1541 

1542 

1543@app.task(ignore_result=True, shared=False) 

1544def retraction_alert(alert): 

1545 """Produce a retraction alert.""" 

1546 superevent_id = alert['uid'] 

1547 group( 

1548 gracedb.expose.si(superevent_id) 

1549 | 

1550 group( 

1551 _create_voevent.si( 

1552 None, superevent_id, 'retraction', 

1553 internal=False, 

1554 open_alert=True 

1555 ) 

1556 | 

1557 group( 

1558 gracedb.download.s(superevent_id) 

1559 | 

1560 gcn.send.s(), 

1561 

1562 gracedb.create_tag.s('public', superevent_id) 

1563 ), 

1564 

1565 alerts.send.si( 

1566 None, 

1567 alert['object'], 

1568 'retraction' 

1569 ) 

1570 ), 

1571 

1572 circulars.create_retraction_circular.si(superevent_id) 

1573 | 

1574 gracedb.upload.s( 

1575 'retraction-circular.txt', 

1576 superevent_id, 

1577 'Template for retraction GCN Circular', 

1578 tags=['em_follow'] 

1579 ) 

1580 ).apply_async()