Coverage for gwcelery/tasks/orchestrator.py: 94%

400 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1"""Tasks that comprise the alert orchestrator. 

2 

3The orchestrator is responsible for the vetting and annotation workflow to 

4produce preliminary, initial, and update alerts for gravitational-wave event 

5candidates. 

6""" 

7import json 

8import re 

9 

10from astropy.time import Time 

11from celery import chain, group 

12from ligo.rrt_chat import channel_creation 

13from rapidpe_rift_pipe import pastro as rpe_pastro 

14 

15from .. import app 

16from . import (alerts, bayestar, circulars, detchar, em_bright, 

17 external_skymaps, gcn, gracedb, igwn_alert, inference, p_astro, 

18 rrt_utils, skymaps, superevents) 

19from .core import get_first, identity 

20 

21 

22@igwn_alert.handler('superevent', 

23 'mdc_superevent', 

24 shared=False) 

25def handle_superevent(alert): 

26 """Schedule annotations for new superevents. 

27 

28 After waiting for a time specified by the 

29 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the 

30 choice of preferred event to settle down, this task performs data quality 

31 checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls 

32 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_alert` to send 

33 a preliminary notice. 

34 """ 

35 superevent_id = alert['uid'] 

36 # launch PE and detchar based on new type superevents 

37 if alert['alert_type'] == 'new': 

38 # launching rapidpe 30s after merger. 

39 timeout = max( 

40 alert['object']['t_0'] - Time.now().gps + 

41 app.conf['rapidpe_timeout'], 0 

42 ) 

43 ( 

44 gracedb.get_superevent.si(superevent_id).set( 

45 countdown=timeout 

46 ) 

47 | 

48 _get_pe_far_and_event.s() 

49 | 

50 parameter_estimation.s(superevent_id, 'rapidpe') 

51 ).apply_async() 

52 

53 ( 

54 gracedb.get_superevent.si(superevent_id).set( 

55 countdown=app.conf['pe_timeout'] 

56 ) 

57 | 

58 _get_pe_far_and_event.s() 

59 | 

60 parameter_estimation.s(superevent_id, 'bilby') 

61 ).apply_async() 

62 

63 # run check_vectors. Create and upload omegascans 

64 group( 

65 detchar.omegascan.si(alert['object']['t_0'], superevent_id), 

66 

67 detchar.check_vectors.si( 

68 alert['object']['preferred_event_data'], 

69 superevent_id, 

70 alert['object']['t_start'], 

71 alert['object']['t_end'] 

72 ) 

73 ).delay() 

74 

75 elif alert['alert_type'] == 'label_added': 

76 label_name = alert['data']['name'] 

77 query = f'superevent: {superevent_id} group: CBC Burst' 

78 if alert['object']['category'] == 'MDC': 

79 query += ' MDC' 

80 elif alert['object']['category'] == 'Test': 

81 query += ' Test' 

82 

83 # launch less-significant preliminary alerts on LOW_SIGNIF_LOCKED 

84 if label_name == superevents.FROZEN_LABEL: 

85 # don't launch if EARLY_WARNING or SIGNIF_LOCKED is present 

86 skipping_labels = { 

87 superevents.SIGNIFICANT_LABEL, 

88 superevents.EARLY_WARNING_LABEL 

89 }.intersection(alert['object']['labels']) 

90 if skipping_labels: 

91 gracedb.upload.delay( 

92 None, None, superevent_id, 

93 "The superevent already has a significant/EW event, " 

94 "skipping launching less-significant alert" 

95 ) 

96 return 

97 ( 

98 gracedb.upload.s( 

99 None, 

100 None, 

101 superevent_id, 

102 "Automated DQ check before sending less-significant " 

103 "preliminary alert. New results supersede old results.", 

104 tags=['data_quality'] 

105 ) 

106 | 

107 detchar.check_vectors.si( 

108 alert['object']['preferred_event_data'], 

109 superevent_id, 

110 alert['object']['t_start'], 

111 alert['object']['t_end'] 

112 ) 

113 | 

114 earlywarning_preliminary_alert.s( 

115 alert, alert_type='less-significant') 

116 ).apply_async() 

117 

118 # launch significant alert on SIGNIF_LOCKED 

119 elif label_name == superevents.SIGNIFICANT_LABEL: 

120 # ensure superevent is locked before starting alert workflow 

121 r = chain() 

122 if superevents.FROZEN_LABEL not in alert['object']['labels']: 

123 r |= gracedb.create_label.si(superevents.FROZEN_LABEL, 

124 superevent_id) 

125 

126 r |= ( 

127 gracedb.get_events.si(query) 

128 | 

129 superevents.select_preferred_event.s() 

130 | 

131 _update_superevent_and_return_event_dict.s(superevent_id) 

132 | 

133 _leave_log_message_and_return_event_dict.s( 

134 superevent_id, 

135 "Superevent cleaned up after significant event. " 

136 ) 

137 | 

138 _leave_log_message_and_return_event_dict.s( 

139 superevent_id, 

140 "Automated DQ check before sending significant alert. " 

141 "New results supersede old results.", 

142 tags=['data_quality'] 

143 ) 

144 | 

145 detchar.check_vectors.s( 

146 superevent_id, 

147 alert['object']['t_start'], 

148 alert['object']['t_end'] 

149 ) 

150 | 

151 earlywarning_preliminary_alert.s( 

152 alert, alert_type='preliminary') 

153 ) 

154 r.apply_async() 

155 

156 # launch second preliminary on GCN_PRELIM_SENT 

157 elif label_name == 'GCN_PRELIM_SENT': 

158 ( 

159 identity.si().set( 

160 # https://git.ligo.org/emfollow/gwcelery/-/issues/478 

161 # FIXME: remove this task once https://github.com/celery/celery/issues/7851 is resolved # noqa: E501 

162 countdown=app.conf['superevent_clean_up_timeout'] 

163 ) 

164 | 

165 gracedb.get_events.si(query) 

166 | 

167 superevents.select_preferred_event.s() 

168 | 

169 _update_superevent_and_return_event_dict.s(superevent_id) 

170 | 

171 group( 

172 _leave_log_message_and_return_event_dict.s( 

173 superevent_id, 

174 "Superevent cleaned up after first preliminary alert" 

175 ), 

176 

177 gracedb.create_label.si('DQR_REQUEST', superevent_id) 

178 ) 

179 | 

180 get_first.s() 

181 | 

182 earlywarning_preliminary_alert.s( 

183 alert, alert_type='preliminary') 

184 ).apply_async() 

185 

186 # set pipeline preferred events 

187 # FIXME: Ideally this should combined with the previous canvas. 

188 # However, incorporating that group prevents canvas from executing 

189 # maybe related to https://github.com/celery/celery/issues/7851 

190 ( 

191 gracedb.get_events.si(query) 

192 | 

193 superevents.select_pipeline_preferred_event.s() 

194 | 

195 _set_pipeline_preferred_events.s(superevent_id) 

196 ).apply_async(countdown=app.conf['superevent_clean_up_timeout']) 

197 

198 elif label_name == 'LOW_SIGNIF_PRELIM_SENT': 

199 # similar workflow as the GCN_PRELIM_SENT 

200 # except block by condition evaluated at the end of timeout 

201 _revise_and_send_second_less_significant_alert.si( 

202 alert, query, superevent_id, 

203 ).apply_async(countdown=app.conf['superevent_clean_up_timeout']) 

204 

205 elif label_name == superevents.EARLY_WARNING_LABEL: 

206 if superevents.SIGNIFICANT_LABEL in alert['object']['labels']: 

207 # stop if full BW significant event already present 

208 gracedb.upload.delay( 

209 None, None, superevent_id, 

210 "Superevent superseded by full BW event, skipping EW." 

211 ) 

212 return 

213 # start the EW alert pipeline; is blocked by SIGNIF_LOCKED 

214 # ensure superevent is locked before starting pipeline 

215 r = chain() 

216 if superevents.FROZEN_LABEL not in alert['object']['labels']: 

217 r |= gracedb.create_label.si(superevents.FROZEN_LABEL, 

218 superevent_id) 

219 

220 r |= ( 

221 gracedb.get_events.si(query) 

222 | 

223 superevents.select_preferred_event.s() 

224 | 

225 _update_superevent_and_return_event_dict.s(superevent_id) 

226 | 

227 _leave_log_message_and_return_event_dict.s( 

228 superevent_id, 

229 "Superevent cleaned up before sending EW alert." 

230 ) 

231 | 

232 earlywarning_preliminary_alert.s( 

233 alert, alert_type='earlywarning') 

234 ) 

235 r.apply_async() 

236 

237 # launch initial/retraction alert on ADVOK/ADVNO 

238 elif label_name == 'ADVOK': 

239 initial_alert((None, None, None), alert) 

240 elif label_name == 'ADVNO': 

241 retraction_alert(alert) 

242 elif label_name == 'ADVREQ': 

243 if app.conf['create_mattermost_channel']: 

244 _create_mattermost_channel.si(superevent_id).delay() 

245 

246 # check DQV label on superevent, run check_vectors if required 

247 elif alert['alert_type'] == 'event_added': 

248 # FIXME Check if this should be changed to 'update' alert_types instead 

249 # of 'event_added'. 'event_added' seems like it's just a new event in 

250 # the superevent window, not necessarily an event that should be 

251 # promoted to preferred event 

252 start = alert['data']['t_start'] 

253 end = alert['data']['t_end'] 

254 

255 if 'DQV' in gracedb.get_labels(superevent_id): 

256 ( 

257 detchar.check_vectors.s( 

258 alert['object']['preferred_event_data'], 

259 superevent_id, 

260 start, 

261 end 

262 ) 

263 | 

264 _update_if_dqok.s(superevent_id) 

265 ).apply_async() 

266 

267 

268@igwn_alert.handler('cbc_gstlal', 

269 'cbc_spiir', 

270 'cbc_pycbc', 

271 'cbc_mbta', 

272 shared=False) 

273def handle_cbc_event(alert): 

274 """Perform annotations for CBC events that depend on pipeline-specific 

275 matched-filter parameter estimates. 

276 

277 Notes 

278 ----- 

279 This IGWN alert message handler is triggered by a new upload or by updates 

280 that include the file ``pipeline.p_astro.json``. If also generates 

281 pipeline.p_astro.json information for pipelines that do not provide 

282 such information. 

283 

284 The table below lists which files are created as a result of a new upload, 

285 and which tasks generate them. 

286 

287 ============================== ================================================== 

288 File Task 

289 ============================== ================================================== 

290 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize` 

291 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.source_properties` 

292 ``pipeline.p_astro.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro` 

293 ============================== ================================================== 

294 

295 """ # noqa: E501 

296 graceid = alert['uid'] 

297 pipeline = alert['object']['pipeline'].lower() 

298 search = alert['object']['search'].lower() 

299 

300 # no annotations for events used in VT analysis 

301 if search == superevents.VT_SEARCH_NAME.lower(): 

302 return 

303 

304 priority = 0 if superevents.should_publish(alert['object']) else 1 

305 

306 # Pipelines that use the GWCelery p-astro method 

307 # - spiir (all searches) 

308 # - pycbc for EarlyWarning search 

309 # - periodic MDC generated by first-two-years (based on gstlal) 

310 # FIXME: Remove this once all pipelines compute their own p-astro 

311 pipelines_stock_p_astro = {('spiir', 'earlywarning'), 

312 ('pycbc', 'earlywarning'), 

313 ('gstlal', 'mdc')} 

314 

315 # em_bright and p_astro calculation 

316 if alert['alert_type'] == 'new': 

317 instruments = superevents.get_instruments_in_ranking_statistic( 

318 alert['object']) 

319 extra_attributes = alert['object']['extra_attributes'] 

320 snr = superevents.get_snr(alert['object']) 

321 far = alert['object']['far'] 

322 mass1 = extra_attributes['SingleInspiral'][0]['mass1'] 

323 mass2 = extra_attributes['SingleInspiral'][0]['mass2'] 

324 chi1 = extra_attributes['SingleInspiral'][0]['spin1z'] 

325 chi2 = extra_attributes['SingleInspiral'][0]['spin2z'] 

326 

327 ( 

328 em_bright.source_properties.si(mass1, mass2, chi1, chi2, snr, 

329 pipeline=pipeline, search=search) 

330 | 

331 gracedb.upload.s( 

332 'em_bright.json', graceid, 

333 'em bright complete', ['em_bright', 'public'] 

334 ) 

335 | 

336 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

337 ).apply_async(priority=priority) 

338 

339 # p_astro calculation for pipelines that does not provide a 

340 # stock p_astro (upload pipeline.p_astro.json) 

341 if (pipeline, search) in pipelines_stock_p_astro: 

342 ( 

343 p_astro.compute_p_astro.s(snr, 

344 far, 

345 mass1, 

346 mass2, 

347 pipeline, 

348 instruments) 

349 | 

350 gracedb.upload.s( 

351 f'{pipeline}.p_astro.json', graceid, 

352 'p_astro computation complete', ['p_astro', 'public'] 

353 ) 

354 | 

355 gracedb.create_label.si('PASTRO_READY', graceid) 

356 ).apply_async(priority=priority) 

357 

358 # Start BAYESTAR for all CBC pipelines. 

359 ( 

360 gracedb.download.s('coinc.xml', graceid) 

361 | 

362 bayestar.localize.s(graceid) 

363 | 

364 gracedb.upload.s( 

365 'bayestar.multiorder.fits', graceid, 

366 'sky localization complete', ['sky_loc', 'public'] 

367 ) 

368 | 

369 gracedb.create_label.si('SKYMAP_READY', graceid) 

370 ).apply_async(priority=priority) 

371 

372 

373@igwn_alert.handler('burst_olib', 

374 'burst_cwb', 

375 'burst_mly', 

376 shared=False) 

377def handle_burst_event(alert): 

378 """Perform annotations for burst events that depend on pipeline-specific 

379 """ # noqa: E501 

380 graceid = alert['uid'] 

381 pipeline = alert['object']['pipeline'].lower() 

382 search = alert['object']['search'].lower() 

383 priority = 0 if superevents.should_publish(alert['object']) else 1 

384 

385 # em_bright calculation for Burst-cWB-BBH 

386 if alert['alert_type'] == 'new': 

387 if (pipeline, search) in [('cwb', 'bbh')]: 

388 extra_attributes = alert['object']['extra_attributes'] 

389 multiburst = extra_attributes['MultiBurst'] 

390 snr = multiburst.get('snr') 

391 mchirp = multiburst.get('mchirp', 0.0) 

392 # FIXME once ingestion of mchip is finalised 

393 # In case mchirp is not there or zero 

394 # produce em_brigth with all zero 

395 if mchirp == 0.0: 

396 m12 = 30.0 

397 else: 

398 m12 = 2**(0.2) * mchirp 

399 ( 

400 em_bright.source_properties.si(m12, m12, 0.0, 0.0, snr) 

401 | 

402 gracedb.upload.s( 

403 'em_bright.json', graceid, 

404 'em bright complete', ['em_bright', 'public'] 

405 ) 

406 | 

407 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

408 ).apply_async(priority=priority) 

409 

410 if alert['alert_type'] != 'log': 

411 return 

412 

413 filename = alert['data']['filename'] 

414 

415 # Pipeline is uploading a flat resultion skymap file 

416 # Converting to a multiorder ones with proper name. 

417 # FIXME: Remove block when CWB starts to upload skymaps 

418 # in multiorder format 

419 if filename.endswith('.fits.gz'): 

420 new_filename = filename.replace('.fits.gz', '.multiorder.fits') 

421 flatten_msg = ( 

422 'Multi-resolution FITS file created from ' 

423 '<a href="/api/events/{graceid}/files/' 

424 '{filename}">{filename}</a>').format( 

425 graceid=graceid, filename=filename) 

426 tags = ['sky_loc', 'lvem', 'public'] 

427 ( 

428 gracedb.download.si(filename, graceid) 

429 | 

430 skymaps.unflatten.s(new_filename) 

431 | 

432 gracedb.upload.s( 

433 new_filename, graceid, flatten_msg, tags) 

434 | 

435 gracedb.create_label.si('SKYMAP_READY', graceid) 

436 ).apply_async(priority=priority) 

437 

438 

439@igwn_alert.handler('superevent', 

440 'mdc_superevent', 

441 shared=False) 

442def handle_posterior_samples(alert): 

443 """Generate multi-resolution and flat-resolution FITS files and skymaps 

444 from an uploaded HDF5 file containing posterior samples. 

445 """ 

446 if alert['alert_type'] != 'log' or \ 

447 not alert['data']['filename'].endswith('.posterior_samples.hdf5'): 

448 return 

449 superevent_id = alert['uid'] 

450 filename = alert['data']['filename'] 

451 info = '{} {}'.format(alert['data']['comment'], filename) 

452 prefix, _ = filename.rsplit('.posterior_samples.') 

453 skymap_filename = f'{prefix}.multiorder.fits' 

454 labels = ['pe', 'sky_loc'] 

455 ifos = superevents.get_instruments(alert['object']['preferred_event_data']) 

456 

457 ( 

458 gracedb.download.si(filename, superevent_id) 

459 | 

460 skymaps.skymap_from_samples.s(superevent_id, ifos) 

461 | 

462 group( 

463 skymaps.annotate_fits.s( 

464 skymap_filename, superevent_id, labels 

465 ), 

466 

467 gracedb.upload.s( 

468 skymap_filename, superevent_id, 

469 'Multiresolution FITS file generated from "{}"'.format(info), 

470 labels 

471 ) 

472 ) 

473 ).delay() 

474 

475 # em_bright from LALInference posterior samples 

476 ( 

477 gracedb.download.si(filename, superevent_id) 

478 | 

479 em_bright.em_bright_posterior_samples.s() 

480 | 

481 gracedb.upload.s( 

482 '{}.em_bright.json'.format(prefix), superevent_id, 

483 'em-bright computed from "{}"'.format(info), 

484 'pe' 

485 ) 

486 ).delay() 

487 

488 

489@app.task(bind=True, shared=False) 

490def _create_mattermost_channel(self, superevent_id): 

491 """ 

492 Creates a mattermost channel when ADVREQ label is applied and 

493 posts a cooresponding gracedb link of that event in the channel 

494 

495 Channel name : O4 RRT {superevent_id} 

496 

497 Parameters: 

498 ------------ 

499 superevent_id: str 

500 The superevent id 

501 """ 

502 gracedb_url = self.app.conf['gracedb_host'] 

503 channel_creation.rrt_channel_creation( 

504 superevent_id, gracedb_url) 

505 

506 

507@app.task(shared=False) 

508def _set_pipeline_preferred_events(pipeline_event, superevent_id): 

509 """Return group for setting pipeline preferred event using 

510 :meth:`gracedb.add_pipeline_preferred_event`. 

511 

512 Parameters 

513 ---------- 

514 pipeline_event: dict 

515 {pipeline: event_dict} key value pairs, returned by 

516 :meth:`superevents.select_pipeline_preferred_event`. 

517 

518 superevent_id: str 

519 The superevent id 

520 """ 

521 return group( 

522 gracedb.add_pipeline_preferred_event(superevent_id, 

523 event['graceid']) 

524 for event in pipeline_event.values() 

525 ) 

526 

527 

528@app.task(shared=False, ignore_result=True) 

529def _update_if_dqok(event, superevent_id): 

530 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK` 

531 label has been applied. 

532 """ 

533 if 'DQOK' in gracedb.get_labels(superevent_id): 

534 event_id = event['graceid'] 

535 gracedb.update_superevent(superevent_id, 

536 preferred_event=event_id, 

537 t_0=event["gpstime"]) 

538 gracedb.upload.delay( 

539 None, None, superevent_id, 

540 comment=f'DQOK applied based on new event {event_id}') 

541 

542 

543@gracedb.task(shared=False) 

544def _create_voevent(classification, *args, **kwargs): 

545 r"""Create a VOEvent record from an EM bright JSON file. 

546 

547 Parameters 

548 ---------- 

549 classification : tuple, None 

550 A collection of JSON strings, generated by 

551 :meth:`gwcelery.tasks.em_bright.source_properties` and 

552 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or 

553 content of ``{gstlal,mbta}.p_astro.json`` uploaded 

554 by {gstlal,mbta} respectively; or None 

555 \*args 

556 Additional positional arguments passed to 

557 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

558 \*\*kwargs 

559 Additional keyword arguments passed to 

560 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

561 

562 Returns 

563 ------- 

564 str 

565 The filename of the newly created VOEvent. 

566 

567 """ 

568 kwargs = dict(kwargs) 

569 

570 if classification is not None: 

571 # Merge source classification and source properties into kwargs. 

572 for text in classification: 

573 # Ignore filenames, only load dict in bytes form 

574 if text is not None: 

575 kwargs.update(json.loads(text)) 

576 

577 # FIXME: These keys have differ between em_bright.json 

578 # and the GraceDB REST API. 

579 try: 

580 kwargs['ProbHasNS'] = kwargs.pop('HasNS') 

581 except KeyError: 

582 pass 

583 

584 try: 

585 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant') 

586 except KeyError: 

587 pass 

588 

589 skymap_filename = kwargs.get('skymap_filename') 

590 if skymap_filename is not None: 

591 skymap_type = re.sub( 

592 r'(\.multiorder)?\.fits(\..+)?(,[0-9]+)?$', '', skymap_filename) 

593 kwargs.setdefault('skymap_type', skymap_type) 

594 

595 # FIXME: remove ._orig_run when this bug is fixed: 

596 # https://github.com/getsentry/sentry-python/issues/370 

597 return gracedb.create_voevent._orig_run(*args, **kwargs) 

598 

599 

600@app.task(shared=False) 

601def _create_label_and_return_filename(filename, label, graceid): 

602 gracedb.create_label.delay(label, graceid) 

603 return filename 

604 

605 

606@app.task(shared=False) 

607def _leave_log_message_and_return_event_dict(event, superevent_id, 

608 message, **kwargs): 

609 """Wrapper around :meth:`gracedb.upload` 

610 that returns the event dictionary. 

611 """ 

612 gracedb.upload.delay(None, None, superevent_id, message, **kwargs) 

613 return event 

614 

615 

616@gracedb.task(shared=False) 

617def _update_superevent_and_return_event_dict(event, superevent_id): 

618 """Wrapper around :meth:`gracedb.update_superevent` 

619 that returns the event dictionary. 

620 """ 

621 gracedb.update_superevent(superevent_id, 

622 preferred_event=event['graceid'], 

623 t_0=event['gpstime']) 

624 return event 

625 

626 

627@gracedb.task(shared=False) 

628def _proceed_if_not_blocked_by(files, superevent_id, block_by): 

629 """Return files in case the superevent does not have labels `block_by` 

630 

631 Parameters 

632 ---------- 

633 files : tuple 

634 List of files 

635 superevent_id : str 

636 The superevent id corresponding to files 

637 block_by : set 

638 Set of blocking labels. E.g. `{'ADVOK', 'ADVNO'}` 

639 """ 

640 superevent_labels = gracedb.get_labels(superevent_id) 

641 blocking_labels = block_by.intersection(superevent_labels) 

642 if blocking_labels: 

643 gracedb.upload.delay( 

644 None, None, superevent_id, 

645 f"Blocking automated notice due to labels {blocking_labels}" 

646 ) 

647 return None 

648 else: 

649 return files 

650 

651 

652@gracedb.task(shared=False) 

653def _revise_and_send_second_less_significant_alert(alert, query, 

654 superevent_id): 

655 superevent_labels = gracedb.get_labels(superevent_id) 

656 blocking_labels = { 

657 'ADVREQ', 'ADVOK', 'ADVNO', 

658 superevents.SIGNIFICANT_LABEL, 

659 superevents.EARLY_WARNING_LABEL, 

660 } 

661 if blocking_labels.intersection(superevent_labels): 

662 return 

663 

664 ( 

665 gracedb.get_events.si(query) 

666 | 

667 superevents.select_preferred_event.s() 

668 | 

669 _update_superevent_and_return_event_dict.s(superevent_id) 

670 | 

671 _leave_log_message_and_return_event_dict.s( 

672 superevent_id, 

673 "Superevent cleaned up before second less-significant alert" 

674 ) 

675 | 

676 earlywarning_preliminary_alert.s( 

677 alert, alert_type='less-significant') 

678 ).delay() 

679 

680 # set pipeline preferred events 

681 # FIXME: Ideally this should combined with the previous canvas. 

682 # However, incorporating that group prevents canvas from executing 

683 # maybe related to https://github.com/celery/celery/issues/7851 

684 ( 

685 gracedb.get_events.si(query) 

686 | 

687 superevents.select_pipeline_preferred_event.s() 

688 | 

689 _set_pipeline_preferred_events.s(superevent_id) 

690 ).delay() 

691 

692 

693@app.task(shared=False) 

694def _annotate_fits_and_return_input(input_list, superevent_id): 

695 """Unpack the output of the skymap, embright, p-astro download group in the 

696 beginning of the 

697 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas 

698 and call :meth:`~gwcelery.tasks.skymaps.annotate_fits`. 

699 

700 

701 Parameters 

702 ---------- 

703 input_list : list 

704 The output of the group that downloads the skymap, embright, and 

705 p-astro files. This list is in the form [skymap, skymap_filename], 

706 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename], 

707 though the em-bright and p-astro lists can be populated by Nones 

708 superevent_id : str 

709 A list of the sky map, em_bright, and p_astro filenames. 

710 """ 

711 

712 skymaps.annotate_fits_tuple( 

713 input_list[0], 

714 superevent_id, 

715 ['sky_loc', 'public'] 

716 ) 

717 

718 return input_list 

719 

720 

721@gracedb.task(shared=False) 

722def _unpack_args_and_send_earlywarning_preliminary_alert(input_list, alert, 

723 alert_type): 

724 """Unpack the output of the skymap, embright, p-astro download group in the 

725 beginning of the 

726 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas 

727 and call 

728 :meth:`gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`. 

729 

730 

731 Parameters 

732 ---------- 

733 input_list : list 

734 The output of the group that downloads the skymap, embright, and 

735 p-astro files. This list is in the form [skymap, skymap_filename], 

736 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename], 

737 though the em-bright and p-astro lists can be populated by Nones 

738 alert : dict 

739 IGWN-Alert dictionary 

740 alert_type : str 

741 alert_type passed to 

742 :meth:`earlywarning_preliminary_initial_update_alert` 

743 """ 

744 if input_list is None: # alert is blocked by blocking labels 

745 return 

746 

747 [skymap, skymap_filename], [em_bright, em_bright_filename], \ 

748 [p_astro_dict, p_astro_filename] = input_list 

749 

750 # Update to latest state after downloading files 

751 superevent = gracedb.get_superevent(alert['object']['superevent_id']) 

752 

753 earlywarning_preliminary_initial_update_alert.delay( 

754 [skymap_filename, em_bright_filename, p_astro_filename], 

755 superevent, alert_type, 

756 filecontents=[skymap, em_bright, p_astro_dict] 

757 ) 

758 

759 

760@app.task(ignore_result=True, shared=False) 

761def earlywarning_preliminary_alert(event, alert, alert_type='preliminary', 

762 initiate_voevent=True): 

763 """Produce a preliminary alert by copying any sky maps. 

764 

765 This consists of the following steps: 

766 

767 1. Copy any sky maps and source classification from the preferred event 

768 to the superevent. 

769 2. Create standard annotations for sky maps including all-sky plots by 

770 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`. 

771 3. Create a preliminary VOEvent. 

772 4. Send the VOEvent to GCN and notices to SCiMMA and GCN. 

773 5. Apply the GCN_PRELIM_SENT or LOW_SIGNIF_PRELIM_SENT 

774 depending on the significant or less-significant alert 

775 respectively. 

776 6. Create and upload a GCN Circular draft. 

777 """ 

778 priority = 0 if superevents.should_publish(event) else 1 

779 preferred_event_id = event['graceid'] 

780 superevent_id = alert['uid'] 

781 

782 # Define alert payloads depending on group-pipeline-search 

783 # cbc-*-* : p_astro/em_bright 

784 # burst.cwb-bbh : p_astro/em_bright 

785 # burst-*-* : NO p_astro/em_bright 

786 alert_group = event['group'].lower() 

787 alert_pipeline = event['pipeline'].lower() 

788 alert_search = event['search'].lower() 

789 if alert_pipeline == 'cwb' and alert_search == 'bbh': 

790 skymap_filename = alert_pipeline + '.multiorder.fits' 

791 p_astro_filename = alert_pipeline + '.p_astro.json' 

792 em_bright_filename = 'em_bright.json' 

793 elif alert_group == 'cbc': 

794 skymap_filename = 'bayestar.multiorder.fits' 

795 p_astro_filename = alert_pipeline + '.p_astro.json' 

796 em_bright_filename = 'em_bright.json' 

797 elif alert_group == 'burst': 

798 skymap_filename = event['pipeline'].lower() + '.multiorder.fits' 

799 p_astro_filename = None 

800 em_bright_filename = None 

801 else: 

802 raise NotImplementedError( 

803 'Valid skymap required for preliminary alert' 

804 ) 

805 

806 # Determine if the event should be made public. 

807 is_publishable = (superevents.should_publish( 

808 event, significant=alert_type != 'less-significant') and 

809 {'DQV', 'INJ'}.isdisjoint( 

810 event['labels'])) 

811 

812 # Download files from events and upload to superevent with relevant 

813 # annotations. Pass file contents down the chain so alerts task doesn't 

814 # need to download them again. 

815 # Note: this is explicitly made a chain to fix an issue described in #464. 

816 canvas = chain( 

817 group( 

818 gracedb.download.si(skymap_filename, preferred_event_id) 

819 | 

820 group( 

821 identity.s(), 

822 

823 gracedb.upload.s( 

824 skymap_filename, 

825 superevent_id, 

826 message='Localization copied from {}'.format( 

827 preferred_event_id), 

828 tags=['sky_loc', 'public'] 

829 ) 

830 | 

831 _create_label_and_return_filename.s('SKYMAP_READY', 

832 superevent_id) 

833 ), 

834 

835 ( 

836 gracedb.download.si(em_bright_filename, preferred_event_id) 

837 | 

838 group( 

839 identity.s(), 

840 

841 gracedb.upload.s( 

842 em_bright_filename, 

843 superevent_id, 

844 message='Source properties copied from {}'.format( 

845 preferred_event_id), 

846 tags=['em_bright', 'public'] 

847 ) 

848 | 

849 _create_label_and_return_filename.s('EMBRIGHT_READY', 

850 superevent_id) 

851 ) 

852 ) if em_bright_filename is not None else 

853 identity.s([None, None]), 

854 

855 ( 

856 gracedb.download.si(p_astro_filename, preferred_event_id) 

857 | 

858 group( 

859 identity.s(), 

860 

861 gracedb.upload.s( 

862 p_astro_filename, 

863 superevent_id, 

864 message='Source classification copied from {}'.format( 

865 preferred_event_id), 

866 tags=['p_astro', 'public'] 

867 ) 

868 | 

869 _create_label_and_return_filename.s('PASTRO_READY', 

870 superevent_id) 

871 ) 

872 ) if p_astro_filename is not None else 

873 identity.s([None, None]) 

874 ) 

875 | 

876 # Need annotate skymap task in body of chord instead of header because 

877 # this task simply calls another task, which is to be avoided in chord 

878 # headers. Note that any group that chains to a task is automatically 

879 # upgraded to a chord. 

880 _annotate_fits_and_return_input.s(superevent_id) 

881 ) 

882 

883 # Switch for disabling all but MDC alerts. 

884 if app.conf['only_alert_for_mdc']: 

885 if event.get('search') != 'MDC': 

886 canvas |= gracedb.upload.si( 

887 None, None, superevent_id, 

888 ("Skipping alert because gwcelery has been configured to only" 

889 " send alerts for MDC events.")) 

890 canvas.apply_async(priority=priority) 

891 return 

892 

893 # Send notice and upload GCN circular draft for online events. 

894 if is_publishable and initiate_voevent: 

895 # presence of advocate action blocks significant prelim alert 

896 # presence of adv action or significant event blocks EW alert 

897 # presence of adv action or significant event or EW event blocks 

898 # less significant alert 

899 if alert_type == 'earlywarning': 

900 # Launch DQR for significant early warning events after a timeout. 

901 # If a full BW significant trigger arrives before the end of the 

902 # timeout, the latter will apply the label instead, and this call 

903 # is a noop. 

904 gracedb.create_label.si( 

905 'DQR_REQUEST', 

906 superevent_id 

907 ).apply_async(countdown=600) 

908 blocking_labels = ( 

909 {'ADVOK', 'ADVNO'} if alert_type == 'preliminary' 

910 else 

911 {superevents.SIGNIFICANT_LABEL, 'ADVOK', 'ADVNO'} 

912 if alert_type == 'earlywarning' 

913 else 

914 {superevents.EARLY_WARNING_LABEL, superevents.SIGNIFICANT_LABEL, 

915 'ADVOK', 'ADVNO'} 

916 if alert_type == 'less-significant' 

917 else 

918 set() 

919 ) 

920 canvas |= ( 

921 _proceed_if_not_blocked_by.s(superevent_id, blocking_labels) 

922 | 

923 _unpack_args_and_send_earlywarning_preliminary_alert.s( 

924 alert, alert_type 

925 ) 

926 ) 

927 

928 canvas.apply_async(priority=priority) 

929 

930 

931@gracedb.task(shared=False) 

932def _get_pe_far_and_event(superevent): 

933 """Return FAR and event input to PE workflow. 

934 

935 The input FAR is the lowest FAR among CBC and Burst-BBH triggers. 

936 The input event is the preferred event if it is a CBC trigger, otherwise 

937 the CBC trigger with the lowest FAR is returned. 

938 """ 

939 # FIXME: remove ._orig_run when this bug is fixed: 

940 # https://github.com/getsentry/sentry-python/issues/370 

941 events = [ 

942 gracedb.get_event._orig_run(gid) for gid in superevent['gw_events'] 

943 ] 

944 events = [ 

945 e for e in events if e['group'].lower() == 'cbc' or ( 

946 e['group'].lower() == 'burst' and 

947 e.get('search', 'None').lower() == 'bbh' 

948 ) 

949 ] 

950 events.sort(key=lambda e: e['far']) 

951 preferred_event = superevent['preferred_event_data'] 

952 

953 if preferred_event['group'].lower() == 'cbc': 

954 return events[0]['far'], preferred_event 

955 for e in events: 

956 if e['group'].lower() == 'cbc': 

957 return events[0]['far'], e 

958 return None 

959 

960 

961@app.task(ignore_result=True, shared=False) 

962def parameter_estimation(far_event, superevent_id, pe_pipeline): 

963 """Parameter Estimation with Bilby and RapidPE-RIFT. Parameter estimation 

964 runs are triggered for CBC triggers which pass the FAR threshold and are 

965 not mock uploads. For those which do not pass these criteria, this task 

966 uploads messages explaining why parameter estimation is not started. 

967 """ 

968 if far_event is None: 

969 gracedb.upload.delay( 

970 filecontents=None, filename=None, 

971 graceid=superevent_id, 

972 message='Parameter estimation will not start since no CBC triggers' 

973 ' are found.', 

974 tags='pe' 

975 ) 

976 return 

977 far, event = far_event 

978 search = event['search'].lower() 

979 if search in app.conf['significant_alert_far_threshold']['cbc']: 

980 threshold = ( 

981 app.conf['significant_alert_far_threshold']['cbc'][search] / 

982 app.conf['significant_alert_trials_factor']['cbc'][search] 

983 ) 

984 else: 

985 # Fallback in case an event is uploaded to an unlisted search 

986 threshold = -1 * float('inf') 

987 if far > threshold: 

988 gracedb.upload.delay( 

989 filecontents=None, filename=None, 

990 graceid=superevent_id, 

991 message='Parameter estimation will not start since FAR is larger ' 

992 'than the PE threshold, {} Hz.'.format(threshold), 

993 tags='pe' 

994 ) 

995 elif search == 'mdc': 

996 gracedb.upload.delay( 

997 filecontents=None, filename=None, 

998 graceid=superevent_id, 

999 message='Parameter estimation will not start since parameter ' 

1000 'estimation is disabled for mock uploads.', 

1001 tags='pe' 

1002 ) 

1003 elif event.get('offline', False): 

1004 gracedb.upload.delay( 

1005 filecontents=None, filename=None, 

1006 graceid=superevent_id, 

1007 message='Parameter estimation will not start since parameter ' 

1008 'estimation is disabled for OFFLINE events.', 

1009 tags='pe' 

1010 ) 

1011 elif ( 

1012 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' 

1013 and event['pipeline'] == 'MBTA' 

1014 ): 

1015 # FIXME: Remove this block once multiple channels can be handled on 

1016 # one gracedb instance 

1017 gracedb.upload.delay( 

1018 filecontents=None, filename=None, 

1019 graceid=superevent_id, 

1020 message='Parameter estimation is disabled for MBTA triggers ' 

1021 'on playground as MBTA analyses live data + online ' 

1022 'injections not O3 replay data + MDC injections', 

1023 tags='pe' 

1024 ) 

1025 elif pe_pipeline == 'rapidpe' and search == 'earlywarning': 

1026 # Remove this if rapidpe can ingest early warning events 

1027 gracedb.upload.delay( 

1028 filecontents=None, filename=None, 

1029 graceid=superevent_id, 

1030 message='Parameter estimation by RapidPE-RIFT is disabled for ' 

1031 'earlywarning triggers.', 

1032 tags='pe' 

1033 ) 

1034 else: 

1035 inference.start_pe.delay(event, superevent_id, pe_pipeline) 

1036 

1037 

1038@gracedb.task(shared=False) 

1039def earlywarning_preliminary_initial_update_alert( 

1040 filenames, 

1041 superevent, 

1042 alert_type, 

1043 filecontents=None 

1044): 

1045 """ 

1046 Create a canvas that sends an earlywarning, preliminary, initial, or update 

1047 notice. 

1048 

1049 Parameters 

1050 ---------- 

1051 filenames : tuple 

1052 A list of the sky map, em_bright, and p_astro filenames. 

1053 superevent : dict 

1054 The superevent dictionary, typically obtained from an IGWN Alert or 

1055 from querying GraceDB. 

1056 alert_type : {'less-significant', 'earlywarning', 'preliminary', 'initial', 'update'} # noqa: E501 

1057 The alert type. 

1058 

1059 Notes 

1060 ----- 

1061 Tasks that call this function should be decorated with 

1062 :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so 

1063 that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is 

1064 retried in the event of GraceDB API failures. If `EM_COINC` is in labels 

1065 will create a RAVEN circular. 

1066 

1067 """ 

1068 labels = superevent['labels'] 

1069 superevent_id = superevent['superevent_id'] 

1070 

1071 if 'INJ' in labels: 

1072 return 

1073 

1074 if filecontents: 

1075 assert alert_type in { 

1076 'less-significant', 'earlywarning', 'preliminary'} 

1077 

1078 skymap_filename, em_bright_filename, p_astro_filename = filenames 

1079 rapidpe_pastro_filename = None 

1080 rapidpe_pastro_needed = True 

1081 combined_skymap_filename = None 

1082 combined_skymap_needed = False 

1083 skymap_needed = (skymap_filename is None) 

1084 em_bright_needed = (em_bright_filename is None) 

1085 p_astro_needed = (p_astro_filename is None) 

1086 raven_coinc = ('RAVEN_ALERT' in labels and bool(superevent['em_type'])) 

1087 if raven_coinc: 

1088 ext_labels = gracedb.get_labels(superevent['em_type']) 

1089 combined_skymap_needed = \ 

1090 {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(ext_labels)) 

1091 

1092 # FIXME: This if statement is always True, we should get rid of it. 

1093 if skymap_needed or em_bright_needed or p_astro_needed or \ 

1094 combined_skymap_needed or rapidpe_pastro_needed: 

1095 for message in gracedb.get_log(superevent_id): 

1096 t = message['tag_names'] 

1097 f = message['filename'] 

1098 v = message['file_version'] 

1099 fv = '{},{}'.format(f, v) 

1100 if not f: 

1101 continue 

1102 if skymap_needed \ 

1103 and {'sky_loc', 'public'}.issubset(t) \ 

1104 and f.endswith('.multiorder.fits') \ 

1105 and 'combined' not in f: 

1106 skymap_filename = fv 

1107 if em_bright_needed \ 

1108 and 'em_bright' in t \ 

1109 and f.endswith('.json'): 

1110 em_bright_filename = fv 

1111 if p_astro_needed \ 

1112 and {'p_astro', 'public'}.issubset(t) \ 

1113 and f.endswith('.json'): 

1114 p_astro_filename = fv 

1115 if combined_skymap_needed \ 

1116 and {'sky_loc', 'ext_coinc'}.issubset(t) \ 

1117 and f.startswith('combined-ext.') \ 

1118 and 'fit' in f: 

1119 combined_skymap_filename = fv 

1120 if rapidpe_pastro_needed \ 

1121 and {'p_astro', 'public'}.issubset(t) \ 

1122 and f == 'RapidPE_RIFT.p_astro.json': 

1123 rapidpe_pastro_filename = fv 

1124 

1125 if combined_skymap_needed: 

1126 # for every alert, copy combined sky map over if applicable 

1127 # FIXME: use file inheritance once available 

1128 ext_id = superevent['em_type'] 

1129 if combined_skymap_filename: 

1130 # If previous sky map, increase version by 1 

1131 combined_skymap_filename_base, v = \ 

1132 combined_skymap_filename.split(',') 

1133 v = str(int(v) + 1) 

1134 combined_skymap_filename = \ 

1135 combined_skymap_filename_base + ',' + v 

1136 else: 

1137 combined_skymap_filename_base = \ 

1138 (external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER 

1139 if '.multiorder.fits' in skymap_filename else 

1140 external_skymaps.COMBINED_SKYMAP_FILENAME_FLAT) 

1141 combined_skymap_filename = combined_skymap_filename_base + ',0' 

1142 message = 'Combined LVK-external sky map copied from {0}'.format( 

1143 ext_id) 

1144 message_png = ( 

1145 'Mollweide projection of <a href="/api/events/{se_id}/files/' 

1146 '{filename}">{filename}</a>, copied from {ext_id}').format( 

1147 se_id=superevent_id, 

1148 ext_id=ext_id, 

1149 filename=combined_skymap_filename) 

1150 

1151 combined_skymap_canvas = group( 

1152 gracedb.download.si(combined_skymap_filename_base, ext_id) 

1153 | 

1154 gracedb.upload.s( 

1155 combined_skymap_filename_base, superevent_id, 

1156 message, ['sky_loc', 'ext_coinc', 'public']) 

1157 | 

1158 gracedb.create_label.si('COMBINEDSKYMAP_READY', superevent_id), 

1159 

1160 gracedb.download.si(external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, 

1161 ext_id) 

1162 | 

1163 gracedb.upload.s( 

1164 external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, superevent_id, 

1165 message_png, ['sky_loc', 'ext_coinc', 'public'] 

1166 ) 

1167 | 

1168 # Pass None to download_anor_expose group 

1169 identity.si() 

1170 ) 

1171 

1172 # circular template not needed for less-significant alerts 

1173 if alert_type in {'earlywarning', 'preliminary', 'initial'}: 

1174 if raven_coinc: 

1175 circular_task = circulars.create_emcoinc_circular.si(superevent_id) 

1176 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type) 

1177 tags = ['em_follow', 'ext_coinc'] 

1178 

1179 else: 

1180 circular_task = circulars.create_initial_circular.si(superevent_id) 

1181 circular_filename = '{}-circular.txt'.format(alert_type) 

1182 tags = ['em_follow'] 

1183 

1184 circular_canvas = ( 

1185 circular_task 

1186 | 

1187 gracedb.upload.s( 

1188 circular_filename, 

1189 superevent_id, 

1190 'Template for {} GCN Circular'.format(alert_type), 

1191 tags=tags) 

1192 ) 

1193 

1194 else: 

1195 circular_canvas = identity.si() 

1196 

1197 # less-significant alerts have "preliminary" voevent notice type 

1198 alert_type_voevent = 'preliminary' if alert_type == 'less-significant' \ 

1199 else alert_type 

1200 # set the significant field in the VOEvent based on 

1201 # less-significant/significant alert. 

1202 # For kafka alerts the analogous field is set in alerts.py. 

1203 # (see comment before defining kafka_alert_canvas) 

1204 voevent_significance = 0 if alert_type == 'less-significant' else 1 

1205 

1206 if filecontents and not combined_skymap_filename: 

1207 skymap, em_bright, p_astro_dict = filecontents 

1208 

1209 # check high profile and apply label if true 

1210 if alert_type == 'preliminary': 

1211 high_profile_canvas = rrt_utils.check_high_profile.si( 

1212 skymap, em_bright, p_astro_dict, superevent 

1213 ) 

1214 else: 

1215 high_profile_canvas = identity.si() 

1216 

1217 download_andor_expose_group = [] 

1218 if rapidpe_pastro_filename is None: 

1219 voevent_canvas = _create_voevent.si( 

1220 (em_bright, p_astro_dict), 

1221 superevent_id, 

1222 alert_type_voevent, 

1223 Significant=voevent_significance, 

1224 skymap_filename=skymap_filename, 

1225 internal=False, 

1226 open_alert=True, 

1227 raven_coinc=raven_coinc, 

1228 combined_skymap_filename=combined_skymap_filename 

1229 ) 

1230 rapidpe_canvas = _update_rapidpe_pastro_shouldnt_run.s() 

1231 

1232 # kafka alerts have a field called "significant" based on 

1233 # https://dcc.ligo.org/LIGO-G2300151/public 

1234 # The alert_type value passed to alerts.send is used to 

1235 # set this field in the alert dictionary 

1236 kafka_alert_canvas = alerts.send.si( 

1237 (skymap, em_bright, p_astro_dict), 

1238 superevent, 

1239 alert_type, 

1240 raven_coinc=raven_coinc 

1241 ) 

1242 else: 

1243 voevent_canvas = _create_voevent.s( 

1244 superevent_id, 

1245 alert_type_voevent, 

1246 Significant=voevent_significance, 

1247 skymap_filename=skymap_filename, 

1248 internal=False, 

1249 open_alert=True, 

1250 raven_coinc=raven_coinc, 

1251 combined_skymap_filename=combined_skymap_filename 

1252 ) 

1253 download_andor_expose_group += [ 

1254 gracedb.download.si(rapidpe_pastro_filename, superevent_id) 

1255 ] 

1256 

1257 kafka_alert_canvas = _check_pastro_and_send_alert.s( 

1258 skymap, 

1259 em_bright, 

1260 superevent, 

1261 alert_type, 

1262 raven_coinc=raven_coinc 

1263 ) 

1264 

1265 rapidpe_canvas = ( 

1266 _update_rapidpe_pastro.s( 

1267 em_bright=em_bright, 

1268 pipeline_pastro=p_astro_dict) 

1269 | 

1270 _upload_rapidpe_pastro_json.s( 

1271 superevent_id, 

1272 rapidpe_pastro_filename 

1273 ) 

1274 ) 

1275 else: 

1276 # Download em_bright and p_astro files here for voevent 

1277 download_andor_expose_group = [ 

1278 gracedb.download.si(em_bright_filename, superevent_id) if 

1279 em_bright_filename is not None else identity.s(None), 

1280 gracedb.download.si(p_astro_filename, superevent_id) if 

1281 p_astro_filename is not None else identity.s(None), 

1282 ] 

1283 high_profile_canvas = identity.si() 

1284 

1285 voevent_canvas = _create_voevent.s( 

1286 superevent_id, 

1287 alert_type_voevent, 

1288 Significant=voevent_significance, 

1289 skymap_filename=skymap_filename, 

1290 internal=False, 

1291 open_alert=True, 

1292 raven_coinc=raven_coinc, 

1293 combined_skymap_filename=combined_skymap_filename 

1294 ) 

1295 

1296 if rapidpe_pastro_filename: 

1297 download_andor_expose_group += [ 

1298 gracedb.download.si(rapidpe_pastro_filename, superevent_id) 

1299 ] 

1300 

1301 rapidpe_canvas = ( 

1302 _update_rapidpe_pastro.s() 

1303 | 

1304 _upload_rapidpe_pastro_json.s( 

1305 superevent_id, 

1306 rapidpe_pastro_filename 

1307 ) 

1308 ) 

1309 

1310 # The skymap has not been downloaded at this point, so we need to 

1311 # download it before we can assemble the kafka alerts and send them 

1312 kafka_alert_canvas = alerts.download_skymap_and_send_alert.s( 

1313 superevent, 

1314 alert_type, 

1315 skymap_filename=skymap_filename, 

1316 raven_coinc=raven_coinc, 

1317 combined_skymap_filename=combined_skymap_filename 

1318 ) 

1319 

1320 to_expose = [skymap_filename, em_bright_filename, p_astro_filename] 

1321 # Since PE skymap images, HTML, and gzip FITS are not made public when they 

1322 # are uploaded, we need to expose them here. 

1323 if ( 

1324 skymap_filename is not None and 'bilby' in skymap_filename.lower() 

1325 ): 

1326 prefix, _, _ = skymap_filename.partition('.multiorder.fits') 

1327 to_expose += [f'{prefix}.html', f'{prefix}.png', 

1328 f'{prefix}.volume.png', f'{prefix}.fits.gz'] 

1329 download_andor_expose_group += [ 

1330 gracedb.expose.si(superevent_id), 

1331 *( 

1332 gracedb.create_tag.si(filename, 'public', superevent_id) 

1333 for filename in to_expose if filename is not None 

1334 ) 

1335 ] 

1336 

1337 voevent_canvas |= group( 

1338 gracedb.download.s(superevent_id) 

1339 | 

1340 gcn.send.s(), 

1341 

1342 gracedb.create_tag.s('public', superevent_id) 

1343 ) 

1344 

1345 if combined_skymap_needed: 

1346 download_andor_expose_group += [combined_skymap_canvas] 

1347 

1348 sent_label_canvas = identity.si() 

1349 if alert_type == 'less-significant': 

1350 sent_label_canvas = gracedb.create_label.si( 

1351 'LOW_SIGNIF_PRELIM_SENT', 

1352 superevent_id 

1353 ) 

1354 elif alert_type == 'preliminary': 

1355 sent_label_canvas = gracedb.create_label.si( 

1356 'GCN_PRELIM_SENT', 

1357 superevent_id 

1358 ) 

1359 

1360 # NOTE: The following canvas structure was used to fix #480 

1361 canvas = ( 

1362 group(download_andor_expose_group) 

1363 ) 

1364 if rapidpe_pastro_filename: 

1365 canvas |= rapidpe_canvas 

1366 

1367 canvas |= ( 

1368 group( 

1369 voevent_canvas 

1370 | 

1371 group( 

1372 circular_canvas, 

1373 

1374 sent_label_canvas 

1375 ), 

1376 

1377 kafka_alert_canvas, 

1378 

1379 high_profile_canvas 

1380 ) 

1381 ) 

1382 

1383 canvas.apply_async() 

1384 

1385 

1386@app.task(shared=False) 

1387def _update_rapidpe_pastro(input_list, em_bright=None, pipeline_pastro=None): 

1388 """ 

1389 If p_terr from rapidpe is different from the p_terr from the most 

1390 recent preferred event, replaces rapidpe's p_terr with pipeline p_terr. 

1391 Returns a tuple of em_bright, rapidpe pastro and a 

1392 boolean(rapidpe_pastro_updated) indicating if rapidpe pastro has been 

1393 updated. If p_terr in rapidpe has been updated, the return list contains 

1394 the updated pastro and the rapidpe_pastro_updated is True. Else, the 

1395 return list contains the rapidpe pastro from the input_list and 

1396 rapidpe_pastro_updated is False. 

1397 """ 

1398 # input_list is download_andor_expose_group in 

1399 # function earlywarning_preliminary_initial_update_alert 

1400 if pipeline_pastro is None: 

1401 em_bright, pipeline_pastro, rapidpe_pastro, *_ = input_list 

1402 else: 

1403 rapidpe_pastro, *_ = input_list 

1404 pipeline_pastro_contents = json.loads(pipeline_pastro) 

1405 rapidpe_pastro_contents = json.loads(rapidpe_pastro) 

1406 

1407 if (rapidpe_pastro_contents["Terrestrial"] 

1408 == pipeline_pastro_contents["Terrestrial"]): 

1409 rapidpe_pastro_updated = False 

1410 else: 

1411 rapidpe_pastro = json.dumps( 

1412 rpe_pastro.renormalize_pastro_with_pipeline_pterr( 

1413 rapidpe_pastro_contents, pipeline_pastro_contents 

1414 ) 

1415 ) 

1416 rapidpe_pastro_updated = True 

1417 

1418 return em_bright, rapidpe_pastro, rapidpe_pastro_updated 

1419 

1420 

1421@app.task(shared=False) 

1422def _update_rapidpe_pastro_shouldnt_run(): 

1423 raise RuntimeError( 

1424 "The `rapidpe_canvas' was executed where it should" 

1425 "not have been. A bug must have been introduced." 

1426 ) 

1427 

1428 

1429@gracedb.task(shared=False) 

1430def _upload_rapidpe_pastro_json( 

1431 input_list, 

1432 superevent_id, 

1433 rapidpe_pastro_filename 

1434): 

1435 """ 

1436 Add public tag to RapidPE_RIFT.p_astro.json if p_terr from the 

1437 preferred event is same as the p_terr in RapidPE_RIFT.p_astro.json. 

1438 Else, uploads an updated version of RapidPE_RIFT.p_astro.json 

1439 with file content from the task update_rapidpe_pastro. 

1440 """ 

1441 # input_list is output from update_rapidpe_pastro 

1442 *return_list, rapidpe_pastro_updated = input_list 

1443 if rapidpe_pastro_updated is True: 

1444 tags = ("pe", "p_astro", "public") 

1445 

1446 upload_filename = "RapidPE_RIFT.p_astro.json" 

1447 description = "RapidPE-RIFT Pastro results" 

1448 content = input_list[1] 

1449 gracedb.upload( 

1450 content, 

1451 upload_filename, 

1452 superevent_id, 

1453 description, 

1454 tags 

1455 ) 

1456 return return_list 

1457 

1458 

1459@app.task(shared=False) 

1460def _check_pastro_and_send_alert(input_classification, skymap, em_bright, 

1461 superevent, alert_type, raven_coinc=False): 

1462 """Wrapper for :meth:`~gwcelery.tasks.alerts.send` meant to take a 

1463 potentially new p-astro as input from the preceding task. 

1464 """ 

1465 _, p_astro = input_classification 

1466 alerts.send.delay( 

1467 (skymap, em_bright, p_astro), 

1468 superevent, 

1469 alert_type, 

1470 raven_coinc=raven_coinc 

1471 ) 

1472 

1473 

1474@gracedb.task(ignore_result=True, shared=False) 

1475def initial_alert(filenames, alert): 

1476 """Produce an initial alert. 

1477 

1478 This does nothing more than call 

1479 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert` 

1480 with ``alert_type='initial'``. 

1481 

1482 Parameters 

1483 ---------- 

1484 filenames : tuple 

1485 A list of the sky map, em_bright, and p_astro filenames. 

1486 alert : dict 

1487 IGWN-Alert dictionary 

1488 

1489 Notes 

1490 ----- 

1491 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

1492 than :obj:`gwcelery.app.task` so that a synchronous call to 

1493 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

1494 API failures. 

1495 

1496 """ 

1497 earlywarning_preliminary_initial_update_alert( 

1498 filenames, 

1499 alert['object'], 

1500 'initial' 

1501 ) 

1502 

1503 

1504@gracedb.task(ignore_result=True, shared=False) 

1505def update_alert(filenames, superevent_id): 

1506 """Produce an update alert. 

1507 

1508 This does nothing more than call 

1509 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert` 

1510 with ``alert_type='update'``. 

1511 

1512 Parameters 

1513 ---------- 

1514 filenames : tuple 

1515 A list of the sky map, em_bright, and p_astro filenames. 

1516 superevent_id : str 

1517 The superevent ID. 

1518 

1519 Notes 

1520 ----- 

1521 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

1522 than :obj:`gwcelery.app.task` so that a synchronous call to 

1523 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

1524 API failures. 

1525 

1526 """ 

1527 superevent = gracedb.get_superevent._orig_run(superevent_id) 

1528 earlywarning_preliminary_initial_update_alert( 

1529 filenames, 

1530 superevent, 

1531 'update' 

1532 ) 

1533 

1534 

1535@app.task(ignore_result=True, shared=False) 

1536def retraction_alert(alert): 

1537 """Produce a retraction alert.""" 

1538 superevent_id = alert['uid'] 

1539 group( 

1540 gracedb.expose.si(superevent_id) 

1541 | 

1542 group( 

1543 _create_voevent.si( 

1544 None, superevent_id, 'retraction', 

1545 internal=False, 

1546 open_alert=True 

1547 ) 

1548 | 

1549 group( 

1550 gracedb.download.s(superevent_id) 

1551 | 

1552 gcn.send.s(), 

1553 

1554 gracedb.create_tag.s('public', superevent_id) 

1555 ), 

1556 

1557 alerts.send.si( 

1558 None, 

1559 alert['object'], 

1560 'retraction' 

1561 ) 

1562 ), 

1563 

1564 circulars.create_retraction_circular.si(superevent_id) 

1565 | 

1566 gracedb.upload.s( 

1567 'retraction-circular.txt', 

1568 superevent_id, 

1569 'Template for retraction GCN Circular', 

1570 tags=['em_follow'] 

1571 ) 

1572 ).apply_async()