Coverage for gwcelery/tasks/inference.py: 99%

335 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1"""Source Parameter Estimation with LALInference, Bilby, and RapidPE.""" 

2import glob 

3import json 

4import os 

5import subprocess 

6import urllib 

7from shutil import which 

8 

9import numpy as np 

10import platformdirs 

11from bilby_pipe.bilbyargparser import BilbyConfigFileParser 

12from bilby_pipe.utils import convert_string_to_dict 

13from celery import group 

14from celery.exceptions import Ignore 

15 

16from .. import app 

17from ..jinja import env 

18from . import condor, gracedb 

19 

20_RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE = 100 

21 

22RAPIDPE_GETENV = [ 

23 "DEFAULT_SEGMENT_SERVER", "GWDATAFIND_SERVER", 

24 "LAL_DATA_PATH", "LD_LIBRARY_PATH", "LIBRARY_PATH", 

25 "NDSSERVER", "PATH", "PYTHONPATH", 

26] 

27"""Names of environment variables to include in RapidPE HTCondor submit 

28files.""" 

29 

30RAPIDPE_ENVIRONMENT = { 

31 "RIFT_LOWLATENCY": "True", 

32} 

33"""Names and values of environment variables to include in RapidPE HTCondor 

34submit files.""" 

35 

36 

37def _find_appropriate_cal_env(trigtime, dir_name): 

38 """Return the path to the calibration uncertainties estimated at the time 

39 before and closest to the trigger time. If there are no calibration 

40 uncertainties estimated before the trigger time, return the oldest one. The 

41 gpstimes at which the calibration uncertainties were estimated and the 

42 names of the files containing the uncertaintes are saved in 

43 [HLV]_CalEnvs.txt. 

44 

45 Parameters 

46 ---------- 

47 trigtime : float 

48 The trigger time of a target event 

49 dir_name : str 

50 The path to the directory where files containing calibration 

51 uncertainties exist 

52 

53 Return 

54 ------ 

55 path : str 

56 The path to the calibration uncertainties appropriate for a target 

57 event 

58 

59 """ 

60 filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt')) 

61 calibration_index = np.atleast_1d( 

62 np.genfromtxt(filename, dtype='object', names=['gpstime', 'filename']) 

63 ) 

64 gpstimes = calibration_index['gpstime'].astype(np.int32) 

65 candidate_gpstimes = gpstimes < trigtime 

66 if np.any(candidate_gpstimes): 

67 idx = np.argmax(gpstimes * candidate_gpstimes) 

68 appropriate_cal = calibration_index['filename'][idx] 

69 else: 

70 appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)] 

71 return os.path.join(dir_name, appropriate_cal.decode('utf-8')) 

72 

73 

74def prepare_lalinference_ini(event, superevent_id): 

75 """Determine LALInference configurations and return ini file content 

76 

77 Parameters 

78 ---------- 

79 event : dict 

80 The json contents of a target G event retrieved from 

81 gracedb.get_event(), whose mass and spin information are used to 

82 determine analysis settings. 

83 superevent_id : str 

84 The GraceDB ID of a target superevent 

85 

86 Returns 

87 ------- 

88 ini_contents : str 

89 

90 """ 

91 # Get template of .ini file 

92 ini_template = env.get_template('lalinference.jinja2') 

93 

94 # fill out the ini template and return the resultant content 

95 singleinspiraltable = event['extra_attributes']['SingleInspiral'] 

96 trigtime = event['gpstime'] 

97 executables = {'datafind': 'gw_data_find', 

98 'mergeNSscript': 'lalinference_nest2pos', 

99 'mergeMCMCscript': 'cbcBayesMCMC2pos', 

100 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s', 

101 'resultspage': 'cbcBayesPostProc', 

102 'segfind': 'ligolw_segment_query', 

103 'ligolw_print': 'ligolw_print', 

104 'coherencetest': 'lalinference_coherence_test', 

105 'lalinferencenest': 'lalinference_nest', 

106 'lalinferencemcmc': 'lalinference_mcmc', 

107 'lalinferencebambi': 'lalinference_bambi', 

108 'lalinferencedatadump': 'lalinference_datadump', 

109 'ligo-skymap-from-samples': 'true', 

110 'ligo-skymap-plot': 'true', 

111 'processareas': 'process_areas', 

112 'computeroqweights': 'lalinference_compute_roq_weights', 

113 'mpiwrapper': 'lalinference_mpi_wrapper', 

114 'gracedb': 'gracedb', 

115 'ppanalysis': 'cbcBayesPPAnalysis', 

116 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral', 

117 'bayeswave': 'BayesWave', 

118 'bayeswavepost': 'BayesWavePost'} 

119 ini_settings = { 

120 'gracedb_host': app.conf['gracedb_host'], 

121 'types': app.conf['low_latency_frame_types'], 

122 'channels': app.conf['strain_channel_names'], 

123 'state_vector_channels': app.conf['state_vector_channel_names'], 

124 'webdir': os.path.join( 

125 app.conf['pe_results_path'], superevent_id, 'lalinference' 

126 ), 

127 'paths': [{'name': name, 'path': which(executable)} 

128 for name, executable in executables.items()], 

129 'h1_calibration': _find_appropriate_cal_env( 

130 trigtime, 

131 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford' 

132 ), 

133 'l1_calibration': _find_appropriate_cal_env( 

134 trigtime, 

135 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston' 

136 ), 

137 'v1_calibration': _find_appropriate_cal_env( 

138 trigtime, 

139 '/home/cbc/pe/O3/calibrationenvelopes/Virgo' 

140 ), 

141 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]), 

142 'q': min([sngl['mass2'] / sngl['mass1'] 

143 for sngl in singleinspiraltable]), 

144 'mpirun': which('mpirun') 

145 } 

146 return ini_template.render(ini_settings) 

147 

148 

149@app.task(shared=False) 

150def _setup_dag_for_lalinference(coinc, rundir, event, superevent_id): 

151 """Create DAG for a lalinference run and return the path to DAG. 

152 

153 Parameters 

154 ---------- 

155 coinc : byte contents 

156 Byte contents of ``coinc.xml``. The PSD is expected to be embedded. 

157 rundir : str 

158 The path to a run directory where the DAG file is created. 

159 event : dict 

160 The json contents of a target G event retrieved from 

161 gracedb.get_event(), whose mass and spin information are used to 

162 determine analysis settings. 

163 superevent_id : str 

164 The GraceDB ID of a target superevent 

165 

166 Returns 

167 ------- 

168 path_to_dag : str 

169 The path to the .dag file 

170 

171 """ 

172 # write down coinc.xml in the run directory 

173 path_to_coinc = os.path.join(rundir, 'coinc.xml') 

174 with open(path_to_coinc, 'wb') as f: 

175 f.write(coinc) 

176 

177 # write down and upload ini file 

178 ini_contents = prepare_lalinference_ini(event, superevent_id) 

179 path_to_ini = os.path.join(rundir, 'online_lalinference_pe.ini') 

180 with open(path_to_ini, 'w') as f: 

181 f.write(ini_contents) 

182 gracedb.upload.delay( 

183 ini_contents, filename=os.path.basename(path_to_ini), 

184 graceid=superevent_id, 

185 message=('Automatically generated LALInference configuration file' 

186 ' for this event.'), 

187 tags='pe') 

188 

189 try: 

190 subprocess.run( 

191 ['lalinference_pipe', '--run-path', rundir, 

192 '--coinc', path_to_coinc, path_to_ini, '--psd', path_to_coinc], 

193 capture_output=True, check=True) 

194 except subprocess.CalledProcessError as e: 

195 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

196 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

197 gracedb.upload.delay( 

198 filecontents=contents, filename='lalinference_dag.log', 

199 graceid=superevent_id, 

200 message='Failed to prepare DAG for lalinference', tags='pe' 

201 ) 

202 raise 

203 

204 return os.path.join(rundir, 'multidag.dag') 

205 

206 

207@app.task(shared=False) 

208def _setup_dag_for_bilby( 

209 coinc_bayestar, rundir, event, superevent_id, mode="production" 

210): 

211 """Create DAG for a bilby run and return the path to DAG. 

212 

213 Parameters 

214 ---------- 

215 coinc_bayestar : tuple 

216 Byte contents of ``coinc.xml`` and ``bayestar.multiorder.fits``. 

217 rundir : str 

218 The path to a run directory where the DAG file is created 

219 event : dict 

220 The json contents of a target G event retrieved from 

221 gracedb.get_event(), whose mass and spin information are used to 

222 determine analysis settings. 

223 superevent_id : str 

224 The GraceDB ID of a target superevent 

225 mode : str 

226 Analysis mode, allowed options are "production" and "fast_test", 

227 default is "production". 

228 

229 Returns 

230 ------- 

231 path_to_dag : str 

232 The path to the .dag file 

233 

234 Notes 

235 ----- 

236 `--channel-dict o3replay` is added to bilby_pipe_gracedb arguments when the 

237 gracedb host is different from `gracedb.ligo.org` or 

238 `gracedb-test.ligo.org`. Condor queue is set to `Online_PE` if gracedb host 

239 is `gracedb.ligo.org`, and `Online_PE_MDC` otherwise. 

240 

241 """ 

242 path_to_json = os.path.join(rundir, 'event.json') 

243 with open(path_to_json, 'w') as f: 

244 json.dump(event, f, indent=2) 

245 

246 coinc, bayestar = coinc_bayestar 

247 path_to_psd = os.path.join(rundir, 'coinc.xml') 

248 with open(path_to_psd, 'wb') as f: 

249 f.write(coinc) 

250 path_to_bayestar = os.path.join(rundir, 'bayestar.multiorder.fits') 

251 with open(path_to_bayestar, 'wb') as f: 

252 f.write(bayestar) 

253 

254 path_to_webdir = os.path.join( 

255 app.conf['pe_results_path'], superevent_id, 'bilby', mode 

256 ) 

257 

258 path_to_settings = os.path.join(rundir, 'settings.json') 

259 setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir, 

260 '--outdir', rundir, '--json', path_to_json, 

261 '--psd-file', path_to_psd, '--skymap-file', path_to_bayestar, 

262 '--settings', path_to_settings] 

263 settings = {'summarypages_arguments': {'gracedb': event['graceid'], 

264 'no_ligo_skymap': True}, 

265 'accounting_user': 'soichiro.morisaki', 

266 'tukey_roll_off': 1.0} 

267 if app.conf['gracedb_host'] != 'gracedb.ligo.org': 

268 settings['queue'] = 'Online_PE_MDC' 

269 else: 

270 settings['queue'] = 'Online_PE' 

271 settings['accounting'] = 'ligo.prod.o4.cbc.pe.bilby' 

272 # FIXME: using live data for gracedb-test events should be reconsidered 

273 # when we have a better idea to differentiate MDC and real events. 

274 if app.conf['gracedb_host'] not in [ 

275 'gracedb.ligo.org', 'gracedb-test.ligo.org' 

276 ]: 

277 setup_arg += ['--channel-dict', 'o3replay'] 

278 

279 trigger_chirp_mass = event['extra_attributes']['CoincInspiral']['mchirp'] 

280 if trigger_chirp_mass < 0.6: 

281 raise ValueError( 

282 "No bilby settings available for trigger chirp mass of" 

283 f" {trigger_chirp_mass}Msun." 

284 ) 

285 if mode == 'production': 

286 settings.update( 

287 { 

288 'sampler_kwargs': {'naccept': 60, 'nlive': 500, 

289 'npool': 24, 'sample': 'acceptance-walk'}, 

290 'n_parallel': 3, 

291 'request_cpus': 24, 

292 'spline_calibration_nodes': 10, 

293 'request_memory_generation': 8.0 

294 } 

295 ) 

296 # use low-spin IMRPhenomD below chirp mass of m1=3Msun, m2=1Msun 

297 # assuming binary neutron star 

298 if trigger_chirp_mass < 1.465: 

299 likelihood_mode = 'lowspin_phenomd_fhigh1024_roq' 

300 settings['sampler_kwargs']['naccept'] = 10 

301 # use IMRPhenomPv2 with mass ratio upper bound of 8 below chirp mass of 

302 # m1=8Msun, m2=1Msun 

303 elif trigger_chirp_mass < 2.243: 

304 likelihood_mode = 'phenompv2_bns_roq' 

305 # use IMRPhenomPv2 with mass ratio upper bound of 20 in chirp-mass 

306 # range where IMRPhenomXPHM ROQ bases are not available 

307 elif trigger_chirp_mass < 12: 

308 likelihood_mode = 'low_q_phenompv2_roq' 

309 else: 

310 likelihood_mode = 'phenomxphm_roq' 

311 if trigger_chirp_mass > 16: 

312 settings['request_memory_generation'] = 36.0 

313 else: 

314 settings['request_memory_generation'] = 50.0 

315 if trigger_chirp_mass > 25: 

316 settings['request_memory'] = 16.0 

317 elif trigger_chirp_mass > 16: 

318 settings['request_memory'] = 24.0 

319 else: 

320 settings['request_memory'] = 36.0 

321 setup_arg += ['--cbc-likelihood-mode', likelihood_mode] 

322 elif mode == 'fast_test': 

323 setup_arg += ["--sampler-kwargs", "FastTest"] 

324 if trigger_chirp_mass < 3.9: 

325 setup_arg += ['--cbc-likelihood-mode', 'phenompv2_bns_roq'] 

326 settings['request_memory_generation'] = 8.0 

327 else: 

328 raise ValueError(f"mode: {mode} not recognized.") 

329 

330 with open(path_to_settings, 'w') as f: 

331 json.dump(settings, f, indent=2) 

332 

333 try: 

334 subprocess.run(setup_arg, capture_output=True, check=True) 

335 except subprocess.CalledProcessError as e: 

336 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

337 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

338 gracedb.upload.delay( 

339 filecontents=contents, filename='bilby_dag.log', 

340 graceid=superevent_id, 

341 message=f'Failed to prepare DAG for {mode}-mode bilby', tags='pe' 

342 ) 

343 raise 

344 else: 

345 # Uploads bilby ini file to GraceDB 

346 with open(os.path.join(rundir, 'bilby_config.ini'), 'r') as f: 

347 ini_contents = f.read() 

348 if mode == 'production': 

349 filename = 'bilby_config.ini' 

350 else: 

351 filename = f'bilby_{mode}_config.ini' 

352 gracedb.upload.delay( 

353 ini_contents, filename=filename, graceid=superevent_id, 

354 message=(f'Automatically generated {mode}-mode Bilby configuration' 

355 ' file for this event.'), 

356 tags='pe') 

357 

358 path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit')) 

359 return path_to_dag 

360 

361 

362@app.task(shared=False) 

363def _setup_dag_for_rapidpe(rundir, superevent_id, event): 

364 """Create DAG for a rapidpe run and return the path to DAG. 

365 

366 Parameters 

367 ---------- 

368 rundir : str 

369 The path to a run directory where the DAG file is created 

370 superevent_id : str 

371 The GraceDB ID of a target superevent 

372 

373 Returns 

374 ------- 

375 path_to_dag : str 

376 The path to the .dag file 

377 

378 """ 

379 gracedb_host = app.conf['gracedb_host'] 

380 

381 settings = app.conf['rapidpe_settings'] 

382 trigger_snr = event['extra_attributes']['CoincInspiral']['snr'] 

383 high_snr_trigger = trigger_snr >= 37.5 

384 

385 # dump ini file 

386 ini_template = env.get_template('rapidpe.jinja2') 

387 ini_contents = ini_template.render( 

388 {'rundir': rundir, 

389 'webdir': os.path.join( 

390 app.conf['pe_results_path'], superevent_id, 'rapidpe' 

391 ), 

392 'gracedb_url': f'https://{gracedb_host}/api', 

393 'superevent_id': superevent_id, 

394 'run_mode': settings['run_mode'], 

395 'frame_data_types': app.conf['low_latency_frame_types'], 

396 'accounting_group': settings['accounting_group'], 

397 'use_cprofile': settings['use_cprofile'], 

398 'gracedb_host': gracedb_host, 

399 'high_snr_trigger': high_snr_trigger, 

400 'getenv': RAPIDPE_GETENV, 

401 'environment': RAPIDPE_ENVIRONMENT}) 

402 path_to_ini = os.path.join(rundir, 'rapidpe.ini') 

403 with open(path_to_ini, 'w') as f: 

404 f.write(ini_contents) 

405 gracedb.upload.delay( 

406 ini_contents, filename=os.path.basename(path_to_ini), 

407 graceid=superevent_id, 

408 message=('Automatically generated RapidPE-RIFT configuration file' 

409 ' for this event.'), 

410 tags='pe') 

411 

412 # set up dag 

413 try: 

414 subprocess.run(['rapidpe-rift-pipe', path_to_ini], 

415 capture_output=True, check=True) 

416 except subprocess.CalledProcessError as e: 

417 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

418 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

419 

420 message = 'Failed to prepare DAG for Rapid PE' 

421 

422 fail_gracefully = False 

423 if e.returncode == _RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE: 

424 fail_gracefully = True 

425 message += ": no GstLAL trigger available" 

426 

427 gracedb.upload.delay( 

428 filecontents=contents, filename='rapidpe_dag.log', 

429 graceid=superevent_id, 

430 message=message, tags='pe' 

431 ) 

432 

433 if fail_gracefully: 

434 # Ends task but without logging as a failure 

435 raise Ignore() 

436 else: 

437 # Ends task with the unhandled error logged 

438 raise 

439 

440 # return path to dag 

441 dag = os.path.join(rundir, "event_all_iterations.dag") 

442 return dag 

443 

444 

445@app.task(shared=False) 

446def _condor_no_submit(path_to_dag, include_env=None): 

447 """Run 'condor_submit_dag -no_submit' and return the path to .sub file.""" 

448 args = ['condor_submit_dag'] 

449 

450 if include_env is not None: 

451 args += ['-include_env', ','.join(include_env)] 

452 

453 args += ['-no_submit', path_to_dag] 

454 subprocess.run(args, capture_output=True, check=True) 

455 return '{}.condor.sub'.format(path_to_dag) 

456 

457 

458def dag_prepare_task(rundir, event, superevent_id, pe_pipeline, **kwargs): 

459 """Return a canvas of tasks to prepare DAG. 

460 

461 Parameters 

462 ---------- 

463 rundir : str 

464 The path to a run directory where the DAG file is created 

465 event : dict 

466 The json contents of a target G event retrieved from 

467 gracedb.get_event(), whose mass and spin information are used to 

468 determine analysis settings. 

469 superevent_id : str 

470 The GraceDB ID of a target superevent 

471 pe_pipeline : str 

472 The parameter estimation pipeline used, 

473 lalinference, bilby, or rapidpe. 

474 

475 Returns 

476 ------- 

477 canvas : canvas of tasks 

478 The canvas of tasks to prepare DAG 

479 

480 """ 

481 # List of environment variables `condor_submit_dag` should be aware of. 

482 include_env = None 

483 

484 if pe_pipeline == 'lalinference': 

485 canvas = gracedb.download.si('coinc.xml', event['graceid']) | \ 

486 _setup_dag_for_lalinference.s(rundir, event, superevent_id) 

487 elif pe_pipeline == 'bilby': 

488 canvas = group( 

489 gracedb.download.si('coinc.xml', event['graceid']), 

490 gracedb.download.si('bayestar.multiorder.fits', event['graceid']) 

491 ) | _setup_dag_for_bilby.s( 

492 rundir, event, superevent_id, kwargs['bilby_mode'] 

493 ) 

494 elif pe_pipeline == 'rapidpe': 

495 canvas = _setup_dag_for_rapidpe.s(rundir, superevent_id, event) 

496 include_env = RAPIDPE_GETENV 

497 else: 

498 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

499 

500 canvas |= _condor_no_submit.s(include_env=include_env) 

501 

502 return canvas 

503 

504 

505def _find_paths_from_name(directory, name): 

506 """Return the paths of files or directories with given name under the 

507 specfied directory 

508 

509 Parameters 

510 ---------- 

511 directory : string 

512 Name of directory under which the target file or directory is searched 

513 for. 

514 name : string 

515 Name of target files or directories 

516 

517 Returns 

518 ------- 

519 paths : generator 

520 Paths to the target files or directories 

521 

522 """ 

523 return glob.iglob(os.path.join(directory, '**', name), recursive=True) 

524 

525 

526@app.task(ignore_result=True, shared=False) 

527def _clean_up_bilby(rundir): 

528 """Remove large data products produced by bilby 

529 

530 Parameters 

531 ---------- 

532 rundir : str 

533 

534 """ 

535 for p in glob.glob( 

536 os.path.join(rundir, "data/*_generation_roq_weights.hdf5") 

537 ): 

538 os.remove(p) 

539 for p in glob.glob( 

540 os.path.join(rundir, "data/*_generation_data_dump.pickle") 

541 ): 

542 os.remove(p) 

543 

544 

545@app.task(ignore_result=True, shared=False) 

546def job_error_notification(request, exc, traceback, 

547 superevent_id, rundir, analysis): 

548 """Upload notification when condor.submit terminates unexpectedly. 

549 

550 Parameters 

551 ---------- 

552 request : Context (placeholder) 

553 Task request variables 

554 exc : Exception 

555 Exception raised by condor.submit 

556 traceback : str (placeholder) 

557 Traceback message from a task 

558 superevent_id : str 

559 The GraceDB ID of a target superevent 

560 rundir : str 

561 The run directory for PE 

562 analysis : str 

563 Analysis name used as a label in uploaded messages 

564 

565 Notes 

566 ----- 

567 Some large bilby data products are cleaned up after the notification if the 

568 gracedb host is different from `gracedb.ligo.org`. 

569 

570 """ 

571 if isinstance(exc, condor.JobRunning): 

572 subprocess.run(['condor_rm', str(exc.args[0]['Cluster'])]) 

573 canvas = gracedb.upload.si( 

574 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

575 message=f'The {analysis} condor job was aborted by gwcelery, ' 

576 'due to its long run time.' 

577 ) 

578 elif isinstance(exc, condor.JobAborted): 

579 canvas = gracedb.upload.si( 

580 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

581 message=f'The {analysis} condor job was aborted.' 

582 ) 

583 else: 

584 canvas = gracedb.upload.si( 

585 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

586 message=f'The {analysis} condor job failed.' 

587 ) 

588 

589 if analysis == "rapidpe": 

590 to_upload = [ 

591 'event_all_iterations.dag.lib.err', 

592 'marginalize_extrinsic_parameters_iteration_*.dag.lib.err' 

593 ] 

594 else: 

595 to_upload = ['*.log', '*.err', '*.out'] 

596 for filename in to_upload: 

597 tasks = [] 

598 for path in _find_paths_from_name(rundir, filename): 

599 with open(path, 'rb') as f: 

600 contents = f.read() 

601 if contents: 

602 # put .log suffix in log file names so that users can directly 

603 # read the contents instead of downloading them when they click 

604 # file names 

605 tasks.append(gracedb.upload.si( 

606 filecontents=contents, 

607 filename=os.path.basename(path) + '.log', 

608 graceid=superevent_id, 

609 message=f'A log file for {analysis} condor job.', 

610 tags='pe' 

611 )) 

612 canvas |= group(tasks) 

613 

614 if "bilby" in analysis and app.conf['gracedb_host'] != 'gracedb.ligo.org': 

615 canvas |= _clean_up_bilby.si(rundir) 

616 

617 canvas.delay() 

618 

619 

620def _upload_tasks_lalinference(rundir, superevent_id): 

621 """Return canvas of tasks to upload LALInference results 

622 

623 Parameters 

624 ---------- 

625 rundir : str 

626 The path to a run directory 

627 superevent_id : str 

628 The GraceDB ID of a target superevent 

629 

630 Returns 

631 ------- 

632 tasks : canvas 

633 The work-flow for uploading LALInference results 

634 

635 """ 

636 pe_results_path = os.path.join( 

637 app.conf['pe_results_path'], superevent_id, 'lalinference' 

638 ) 

639 

640 # posterior samples 

641 path, = glob.glob( 

642 os.path.join(rundir, '**', 'posterior*.hdf5'), recursive=True) 

643 with open(path, 'rb') as f: 

644 canvas = gracedb.upload.si( 

645 f.read(), 'LALInference.posterior_samples.hdf5', 

646 superevent_id, 'LALInference posterior samples', 'pe') 

647 

648 # plots 

649 tasks = [] 

650 for filename, message in [ 

651 ('extrinsic.png', 'LALInference corner plot for extrinsic parameters'), 

652 ('intrinsic.png', 'LALInference corner plot for intrinsic parameters') 

653 ]: 

654 # Here it is not required that only a single png file exists, so that 

655 # posterior samples are uploaded whatever. This applies for the other 

656 # files. 

657 for path in _find_paths_from_name(pe_results_path, filename): 

658 with open(path, 'rb') as f: 

659 tasks.append(gracedb.upload.si( 

660 f.read(), f'LALInference.{filename}', superevent_id, 

661 message, 'pe' 

662 )) 

663 canvas |= group(tasks) 

664 

665 # psd 

666 tasks = [] 

667 for path in _find_paths_from_name(rundir, 'glitch_median_PSD_forLI_*.dat'): 

668 with open(path, 'r') as f: 

669 tasks.append(gracedb.upload.si( 

670 f.read(), os.path.basename(path), superevent_id, 

671 'Bayeswave PSD used for LALInference PE', 'pe' 

672 )) 

673 canvas |= group(tasks) 

674 

675 # dag 

676 tasks = [] 

677 for path in _find_paths_from_name(rundir, 'lalinference*.dag'): 

678 with open(path, 'r') as f: 

679 tasks.append(gracedb.upload.si( 

680 f.read(), os.path.basename(path), superevent_id, 

681 'LALInference DAG', 'pe' 

682 )) 

683 canvas |= group(tasks) 

684 

685 # link to results page 

686 tasks = [] 

687 for path in _find_paths_from_name(pe_results_path, 'posplots.html'): 

688 baseurl = urllib.parse.urljoin( 

689 app.conf['pe_results_url'], 

690 os.path.relpath(path, app.conf['pe_results_path']) 

691 ) 

692 tasks.append(gracedb.upload.si( 

693 None, None, superevent_id, 

694 'Online lalinference parameter estimation finished. ' 

695 f'<a href={baseurl}>results</a>' 

696 )) 

697 canvas |= group(tasks) 

698 

699 return canvas 

700 

701 

702def _upload_tasks_bilby(rundir, superevent_id, mode): 

703 """Return canvas of tasks to upload Bilby results 

704 

705 Parameters 

706 ---------- 

707 rundir : str 

708 The path to a run directory 

709 superevent_id : str 

710 The GraceDB ID of a target superevent 

711 mode : str 

712 Analysis mode 

713 

714 Returns 

715 ------- 

716 tasks : canvas 

717 The work-flow for uploading Bilby results 

718 

719 Notes 

720 ----- 

721 Some large bilby data products are cleaned up after posteterior file is 

722 uploaded if the gracedb host is different from `gracedb.ligo.org`. 

723 

724 """ 

725 # convert bilby sample file into one compatible with ligo-skymap 

726 samples_dir = os.path.join(rundir, 'final_result') 

727 if mode == 'production': 

728 samples_filename = 'Bilby.posterior_samples.hdf5' 

729 else: 

730 samples_filename = f'Bilby.{mode}.posterior_samples.hdf5' 

731 out_samples = os.path.join(samples_dir, samples_filename) 

732 in_samples, = glob.glob(os.path.join(samples_dir, '*result.hdf5')) 

733 subprocess.run( 

734 ['bilby_pipe_to_ligo_skymap_samples', in_samples, '--out', out_samples] 

735 ) 

736 

737 with open(out_samples, 'rb') as f: 

738 canvas = gracedb.upload.si( 

739 f.read(), samples_filename, 

740 superevent_id, f'{mode}-mode Bilby posterior samples', 'pe') 

741 

742 if app.conf['gracedb_host'] != 'gracedb.ligo.org': 

743 canvas |= _clean_up_bilby.si(rundir) 

744 

745 # pesummary 

746 pesummary_kwargs = {} 

747 path_to_ini, = glob.glob(os.path.join(rundir, "*_complete.ini")) 

748 pesummary_kwargs["config"] = path_to_ini 

749 config_parser = BilbyConfigFileParser() 

750 with open(path_to_ini, "r") as f: 

751 config_content, _, _, _ = config_parser.parse(f) 

752 pesummary_kwargs["psd"] = convert_string_to_dict( 

753 config_content["psd-dict"] 

754 ) 

755 pesummary_kwargs["calibration"] = convert_string_to_dict( 

756 config_content["spline-calibration-envelope-dict"] 

757 ) 

758 pesummary_kwargs["approximant"] = config_content["waveform-approximant"] 

759 pesummary_kwargs["f_low"] = config_content["minimum-frequency"] 

760 pesummary_kwargs["f_ref"] = config_content["reference-frequency"] 

761 pesummary_kwargs["label"] = "online" 

762 

763 webdir = os.path.join(config_content["webdir"], 'pesummary') 

764 url = urllib.parse.urljoin( 

765 app.conf['pe_results_url'], 

766 os.path.relpath( 

767 os.path.join(webdir, 'home.html'), 

768 app.conf['pe_results_path'] 

769 ) 

770 ) 

771 canvas = group( 

772 canvas, 

773 _pesummary_task(webdir, in_samples, **pesummary_kwargs) 

774 | 

775 gracedb.upload.si( 

776 None, None, superevent_id, 

777 f'PESummary page for {mode}-mode Bilby is available ' 

778 f'<a href={url}>here</a>', 

779 'pe' 

780 ) 

781 ) 

782 

783 return canvas 

784 

785 

786def _upload_tasks_rapidpe(rundir, superevent_id): 

787 summary_path = os.path.join(rundir, "summary") 

788 

789 url = urllib.parse.urljoin( 

790 app.conf['pe_results_url'], 

791 os.path.join(superevent_id, 'rapidpe', 'summarypage.html') 

792 ) 

793 canvas = gracedb.upload.si( 

794 None, None, superevent_id, 

795 f'Summary page for RapidPE-RIFT is available <a href={url}>here</a>', 

796 ('pe',)) 

797 to_upload = [ 

798 ( 

799 "p_astro.json", "RapidPE_RIFT.p_astro.json", 

800 "RapidPE-RIFT Pastro results", 

801 ("pe", "p_astro", "public"), 

802 ), 

803 ] 

804 tasks = [] 

805 for src_basename, dst_filename, description, tags in to_upload: 

806 src_filename = os.path.join(summary_path, src_basename) 

807 if os.path.isfile(src_filename): 

808 with open(src_filename, "rb") as f: 

809 tasks.append( 

810 gracedb.upload.si( 

811 f.read(), dst_filename, 

812 superevent_id, description, tags)) 

813 

814 canvas |= group(tasks) 

815 

816 return canvas 

817 

818 

819@app.task(ignore_result=True, shared=False) 

820def dag_finished(rundir, superevent_id, pe_pipeline, **kwargs): 

821 """Upload PE results 

822 

823 Parameters 

824 ---------- 

825 rundir : str 

826 The path to a run directory where the DAG file exits 

827 superevent_id : str 

828 The GraceDB ID of a target superevent 

829 pe_pipeline : str 

830 The parameter estimation pipeline used, 

831 lalinference, bilby, or rapidpe. 

832 

833 """ 

834 if pe_pipeline == 'lalinference': 

835 canvas = _upload_tasks_lalinference(rundir, superevent_id) 

836 elif pe_pipeline == 'bilby': 

837 canvas = _upload_tasks_bilby( 

838 rundir, superevent_id, kwargs['bilby_mode']) 

839 elif pe_pipeline == 'rapidpe': 

840 canvas = _upload_tasks_rapidpe(rundir, superevent_id) 

841 else: 

842 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

843 

844 canvas.delay() 

845 

846 # NOTE: check if this should include rapidpe as well 

847 if pe_pipeline == 'bilby': 

848 gracedb.create_label.delay('PE_READY', superevent_id) 

849 

850 

851def _pesummary_task(webdir, samples, **pesummary_kwargs): 

852 """Return a celery task to submit a pesummary condor job. 

853 

854 Parameters 

855 ---------- 

856 webdir : str 

857 output directory 

858 samples : str 

859 path to posterior sample file 

860 **pesummary_kwargs 

861 Extra arguments of summarypages 

862 

863 Returns 

864 ------- 

865 celery task 

866 

867 Notes 

868 ----- 

869 `--disable_interactive --disable_expert` are added and `--redshift_method 

870 exact --evolve_spins_forwards` are not added to `summarypages` arguments 

871 when the gracedb host is different from `gracedb.ligo.org`. Condor queue is 

872 set to `Online_PE` if gracedb host is `gracedb.ligo.org`, and 

873 `Online_PE_MDC` otherwise. 

874 

875 """ 

876 args = [ 

877 "summarypages", "--webdir", webdir, "--samples", samples, "--gw", 

878 "--no_ligo_skymap", "--multi_process", "6" 

879 ] 

880 for key in pesummary_kwargs: 

881 if key in ["psd", "calibration"]: 

882 # these values can be none if calibration envelopes 

883 # or PSD files aren't passed to bilby_pipe 

884 # see https://git.ligo.org/emfollow/gwcelery/-/issues/852 

885 # and related issues 

886 if pesummary_kwargs[key] is None: 

887 continue 

888 args += [f"--{key}"] 

889 for ifo in pesummary_kwargs[key]: 

890 args += [f'{ifo}:{pesummary_kwargs[key][ifo]}'] 

891 else: 

892 args += [f"--{key}", pesummary_kwargs[key]] 

893 condor_kwargs = dict( 

894 request_memory=16000, request_disk=5000, request_cpus=6, 

895 accounting_group_user='soichiro.morisaki' 

896 ) 

897 if app.conf['gracedb_host'] != 'gracedb.ligo.org': 

898 condor_kwargs['accounting_group'] = 'ligo.dev.o4.cbc.pe.bilby' 

899 condor_kwargs['requirements'] = '((TARGET.Online_PE_MDC =?= True))' 

900 condor_kwargs['+Online_PE_MDC'] = True 

901 args += ["--disable_interactive", "--disable_expert"] 

902 else: 

903 condor_kwargs['accounting_group'] = 'ligo.prod.o4.cbc.pe.bilby' 

904 condor_kwargs['requirements'] = '((TARGET.Online_PE =?= True))' 

905 condor_kwargs['+Online_PE'] = True 

906 args += ["--redshift_method", "exact", "--evolve_spins_forwards"] 

907 return condor.check_output.si(args, **condor_kwargs) 

908 

909 

910# Modified version of condor.submit task with retry kwargs overridden with 

911# RapidPE-specific settings. 

912submit_rapidpe = app.task( 

913 **condor.submit_kwargs, 

914 **app.conf['rapidpe_condor_retry_kwargs'], 

915)(condor.submit.run) 

916 

917 

918@app.task(ignore_result=True, shared=False) 

919def start_pe(event, superevent_id, pe_pipeline): 

920 """Run Parameter Estimation on a given event. 

921 

922 Parameters 

923 ---------- 

924 event : dict 

925 The json contents of a target G event retrieved from 

926 gracedb.get_event(), whose mass and spin information are used to 

927 determine analysis settings. 

928 superevent_id : str 

929 The GraceDB ID of a target superevent 

930 pe_pipeline : str 

931 The parameter estimation pipeline used, 

932 lalinference, bilby, or rapidpe. 

933 

934 """ 

935 # make an event directory 

936 pipeline_dir = platformdirs.user_cache_dir(pe_pipeline, ensure_exists=True) 

937 event_dir = os.path.join(pipeline_dir, superevent_id) 

938 

939 if pe_pipeline == 'bilby': 

940 if ( 

941 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' and 

942 event['extra_attributes']['CoincInspiral']['mchirp'] >= 12 

943 ): 

944 # Count the number of BBH jobs and do not start a run if it exceeds 

945 # 5 so that we do not use up disk space. We assume that the job is 

946 # running if a data dump pickle file exists under the run 

947 # directory, which is the largest file produced by PE and removed 

948 # when the run completes. 

949 number_of_bbh_running = 0 

950 for p in glob.glob( 

951 os.path.join( 

952 pipeline_dir, 

953 "*/*/data/*_generation_data_dump.pickle" 

954 ) 

955 ): 

956 path_to_ev = os.path.join(os.path.dirname(p), "../event.json") 

957 if os.path.exists(path_to_ev): 

958 with open(path_to_ev, "r") as f: 

959 ev = json.load(f) 

960 mc = ev['extra_attributes']['CoincInspiral']['mchirp'] 

961 if mc >= 12: 

962 number_of_bbh_running += 1 

963 if number_of_bbh_running > 5: 

964 gracedb.upload.delay( 

965 filecontents=None, filename=None, graceid=superevent_id, 

966 message='Parameter estimation will not start to save disk ' 

967 f'space (There are {number_of_bbh_running} BBH ' 

968 'jobs running).', 

969 tags='pe' 

970 ) 

971 return 

972 modes = ["production"] 

973 rundirs = [os.path.join(event_dir, m) for m in modes] 

974 kwargs_list = [{'bilby_mode': m} for m in modes] 

975 analyses = [f'{m}-mode bilby' for m in modes] 

976 condor_submit_task = condor.submit 

977 elif pe_pipeline == 'rapidpe': 

978 rundirs = [event_dir] 

979 kwargs_list = [{'event_pipeline': event["pipeline"]}] 

980 analyses = [pe_pipeline] 

981 condor_submit_task = submit_rapidpe 

982 else: 

983 rundirs = [event_dir] 

984 kwargs_list = [{}] 

985 analyses = [pe_pipeline] 

986 condor_submit_task = condor.submit 

987 

988 for rundir, kwargs, analysis in zip(rundirs, kwargs_list, analyses): 

989 os.makedirs(rundir, exist_ok=True) 

990 

991 gracedb.upload.delay( 

992 filecontents=None, filename=None, graceid=superevent_id, 

993 message=(f'Starting {analysis} parameter estimation ' 

994 f'for {event["graceid"]}'), 

995 tags='pe' 

996 ) 

997 

998 ( 

999 dag_prepare_task( 

1000 rundir, event, superevent_id, pe_pipeline, **kwargs 

1001 ) 

1002 | 

1003 condor_submit_task.s().on_error( 

1004 job_error_notification.s(superevent_id, rundir, analysis) 

1005 ) 

1006 | 

1007 dag_finished.si(rundir, superevent_id, pe_pipeline, **kwargs) 

1008 ).delay()