Coverage for gwcelery/tasks/inference.py: 99%
335 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Source Parameter Estimation with LALInference, Bilby, and RapidPE."""
2import glob
3import json
4import os
5import subprocess
6import urllib
7from shutil import which
9import numpy as np
10import platformdirs
11from bilby_pipe.bilbyargparser import BilbyConfigFileParser
12from bilby_pipe.utils import convert_string_to_dict
13from celery import group
14from celery.exceptions import Ignore
16from .. import app
17from ..jinja import env
18from . import condor, gracedb
20_RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE = 100
22RAPIDPE_GETENV = [
23 "DEFAULT_SEGMENT_SERVER", "GWDATAFIND_SERVER",
24 "LAL_DATA_PATH", "LD_LIBRARY_PATH", "LIBRARY_PATH",
25 "NDSSERVER", "PATH", "PYTHONPATH",
26]
27"""Names of environment variables to include in RapidPE HTCondor submit
28files."""
30RAPIDPE_ENVIRONMENT = {
31 "RIFT_LOWLATENCY": "True",
32}
33"""Names and values of environment variables to include in RapidPE HTCondor
34submit files."""
37def _find_appropriate_cal_env(trigtime, dir_name):
38 """Return the path to the calibration uncertainties estimated at the time
39 before and closest to the trigger time. If there are no calibration
40 uncertainties estimated before the trigger time, return the oldest one. The
41 gpstimes at which the calibration uncertainties were estimated and the
42 names of the files containing the uncertaintes are saved in
43 [HLV]_CalEnvs.txt.
45 Parameters
46 ----------
47 trigtime : float
48 The trigger time of a target event
49 dir_name : str
50 The path to the directory where files containing calibration
51 uncertainties exist
53 Return
54 ------
55 path : str
56 The path to the calibration uncertainties appropriate for a target
57 event
59 """
60 filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt'))
61 calibration_index = np.atleast_1d(
62 np.genfromtxt(filename, dtype='object', names=['gpstime', 'filename'])
63 )
64 gpstimes = calibration_index['gpstime'].astype(np.int32)
65 candidate_gpstimes = gpstimes < trigtime
66 if np.any(candidate_gpstimes):
67 idx = np.argmax(gpstimes * candidate_gpstimes)
68 appropriate_cal = calibration_index['filename'][idx]
69 else:
70 appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)]
71 return os.path.join(dir_name, appropriate_cal.decode('utf-8'))
74def prepare_lalinference_ini(event, superevent_id):
75 """Determine LALInference configurations and return ini file content
77 Parameters
78 ----------
79 event : dict
80 The json contents of a target G event retrieved from
81 gracedb.get_event(), whose mass and spin information are used to
82 determine analysis settings.
83 superevent_id : str
84 The GraceDB ID of a target superevent
86 Returns
87 -------
88 ini_contents : str
90 """
91 # Get template of .ini file
92 ini_template = env.get_template('lalinference.jinja2')
94 # fill out the ini template and return the resultant content
95 singleinspiraltable = event['extra_attributes']['SingleInspiral']
96 trigtime = event['gpstime']
97 executables = {'datafind': 'gw_data_find',
98 'mergeNSscript': 'lalinference_nest2pos',
99 'mergeMCMCscript': 'cbcBayesMCMC2pos',
100 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s',
101 'resultspage': 'cbcBayesPostProc',
102 'segfind': 'ligolw_segment_query',
103 'ligolw_print': 'ligolw_print',
104 'coherencetest': 'lalinference_coherence_test',
105 'lalinferencenest': 'lalinference_nest',
106 'lalinferencemcmc': 'lalinference_mcmc',
107 'lalinferencebambi': 'lalinference_bambi',
108 'lalinferencedatadump': 'lalinference_datadump',
109 'ligo-skymap-from-samples': 'true',
110 'ligo-skymap-plot': 'true',
111 'processareas': 'process_areas',
112 'computeroqweights': 'lalinference_compute_roq_weights',
113 'mpiwrapper': 'lalinference_mpi_wrapper',
114 'gracedb': 'gracedb',
115 'ppanalysis': 'cbcBayesPPAnalysis',
116 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral',
117 'bayeswave': 'BayesWave',
118 'bayeswavepost': 'BayesWavePost'}
119 ini_settings = {
120 'gracedb_host': app.conf['gracedb_host'],
121 'types': app.conf['low_latency_frame_types'],
122 'channels': app.conf['strain_channel_names'],
123 'state_vector_channels': app.conf['state_vector_channel_names'],
124 'webdir': os.path.join(
125 app.conf['pe_results_path'], superevent_id, 'lalinference'
126 ),
127 'paths': [{'name': name, 'path': which(executable)}
128 for name, executable in executables.items()],
129 'h1_calibration': _find_appropriate_cal_env(
130 trigtime,
131 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford'
132 ),
133 'l1_calibration': _find_appropriate_cal_env(
134 trigtime,
135 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston'
136 ),
137 'v1_calibration': _find_appropriate_cal_env(
138 trigtime,
139 '/home/cbc/pe/O3/calibrationenvelopes/Virgo'
140 ),
141 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]),
142 'q': min([sngl['mass2'] / sngl['mass1']
143 for sngl in singleinspiraltable]),
144 'mpirun': which('mpirun')
145 }
146 return ini_template.render(ini_settings)
149@app.task(shared=False)
150def _setup_dag_for_lalinference(coinc, rundir, event, superevent_id):
151 """Create DAG for a lalinference run and return the path to DAG.
153 Parameters
154 ----------
155 coinc : byte contents
156 Byte contents of ``coinc.xml``. The PSD is expected to be embedded.
157 rundir : str
158 The path to a run directory where the DAG file is created.
159 event : dict
160 The json contents of a target G event retrieved from
161 gracedb.get_event(), whose mass and spin information are used to
162 determine analysis settings.
163 superevent_id : str
164 The GraceDB ID of a target superevent
166 Returns
167 -------
168 path_to_dag : str
169 The path to the .dag file
171 """
172 # write down coinc.xml in the run directory
173 path_to_coinc = os.path.join(rundir, 'coinc.xml')
174 with open(path_to_coinc, 'wb') as f:
175 f.write(coinc)
177 # write down and upload ini file
178 ini_contents = prepare_lalinference_ini(event, superevent_id)
179 path_to_ini = os.path.join(rundir, 'online_lalinference_pe.ini')
180 with open(path_to_ini, 'w') as f:
181 f.write(ini_contents)
182 gracedb.upload.delay(
183 ini_contents, filename=os.path.basename(path_to_ini),
184 graceid=superevent_id,
185 message=('Automatically generated LALInference configuration file'
186 ' for this event.'),
187 tags='pe')
189 try:
190 subprocess.run(
191 ['lalinference_pipe', '--run-path', rundir,
192 '--coinc', path_to_coinc, path_to_ini, '--psd', path_to_coinc],
193 capture_output=True, check=True)
194 except subprocess.CalledProcessError as e:
195 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
196 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
197 gracedb.upload.delay(
198 filecontents=contents, filename='lalinference_dag.log',
199 graceid=superevent_id,
200 message='Failed to prepare DAG for lalinference', tags='pe'
201 )
202 raise
204 return os.path.join(rundir, 'multidag.dag')
207@app.task(shared=False)
208def _setup_dag_for_bilby(
209 coinc_bayestar, rundir, event, superevent_id, mode="production"
210):
211 """Create DAG for a bilby run and return the path to DAG.
213 Parameters
214 ----------
215 coinc_bayestar : tuple
216 Byte contents of ``coinc.xml`` and ``bayestar.multiorder.fits``.
217 rundir : str
218 The path to a run directory where the DAG file is created
219 event : dict
220 The json contents of a target G event retrieved from
221 gracedb.get_event(), whose mass and spin information are used to
222 determine analysis settings.
223 superevent_id : str
224 The GraceDB ID of a target superevent
225 mode : str
226 Analysis mode, allowed options are "production" and "fast_test",
227 default is "production".
229 Returns
230 -------
231 path_to_dag : str
232 The path to the .dag file
234 Notes
235 -----
236 `--channel-dict o3replay` is added to bilby_pipe_gracedb arguments when the
237 gracedb host is different from `gracedb.ligo.org` or
238 `gracedb-test.ligo.org`. Condor queue is set to `Online_PE` if gracedb host
239 is `gracedb.ligo.org`, and `Online_PE_MDC` otherwise.
241 """
242 path_to_json = os.path.join(rundir, 'event.json')
243 with open(path_to_json, 'w') as f:
244 json.dump(event, f, indent=2)
246 coinc, bayestar = coinc_bayestar
247 path_to_psd = os.path.join(rundir, 'coinc.xml')
248 with open(path_to_psd, 'wb') as f:
249 f.write(coinc)
250 path_to_bayestar = os.path.join(rundir, 'bayestar.multiorder.fits')
251 with open(path_to_bayestar, 'wb') as f:
252 f.write(bayestar)
254 path_to_webdir = os.path.join(
255 app.conf['pe_results_path'], superevent_id, 'bilby', mode
256 )
258 path_to_settings = os.path.join(rundir, 'settings.json')
259 setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir,
260 '--outdir', rundir, '--json', path_to_json,
261 '--psd-file', path_to_psd, '--skymap-file', path_to_bayestar,
262 '--settings', path_to_settings]
263 settings = {'summarypages_arguments': {'gracedb': event['graceid'],
264 'no_ligo_skymap': True},
265 'accounting_user': 'soichiro.morisaki',
266 'tukey_roll_off': 1.0}
267 if app.conf['gracedb_host'] != 'gracedb.ligo.org':
268 settings['queue'] = 'Online_PE_MDC'
269 else:
270 settings['queue'] = 'Online_PE'
271 settings['accounting'] = 'ligo.prod.o4.cbc.pe.bilby'
272 # FIXME: using live data for gracedb-test events should be reconsidered
273 # when we have a better idea to differentiate MDC and real events.
274 if app.conf['gracedb_host'] not in [
275 'gracedb.ligo.org', 'gracedb-test.ligo.org'
276 ]:
277 setup_arg += ['--channel-dict', 'o3replay']
279 trigger_chirp_mass = event['extra_attributes']['CoincInspiral']['mchirp']
280 if trigger_chirp_mass < 0.6:
281 raise ValueError(
282 "No bilby settings available for trigger chirp mass of"
283 f" {trigger_chirp_mass}Msun."
284 )
285 if mode == 'production':
286 settings.update(
287 {
288 'sampler_kwargs': {'naccept': 60, 'nlive': 500,
289 'npool': 24, 'sample': 'acceptance-walk'},
290 'n_parallel': 3,
291 'request_cpus': 24,
292 'spline_calibration_nodes': 10,
293 'request_memory_generation': 8.0
294 }
295 )
296 # use low-spin IMRPhenomD below chirp mass of m1=3Msun, m2=1Msun
297 # assuming binary neutron star
298 if trigger_chirp_mass < 1.465:
299 likelihood_mode = 'lowspin_phenomd_fhigh1024_roq'
300 settings['sampler_kwargs']['naccept'] = 10
301 # use IMRPhenomPv2 with mass ratio upper bound of 8 below chirp mass of
302 # m1=8Msun, m2=1Msun
303 elif trigger_chirp_mass < 2.243:
304 likelihood_mode = 'phenompv2_bns_roq'
305 # use IMRPhenomPv2 with mass ratio upper bound of 20 in chirp-mass
306 # range where IMRPhenomXPHM ROQ bases are not available
307 elif trigger_chirp_mass < 12:
308 likelihood_mode = 'low_q_phenompv2_roq'
309 else:
310 likelihood_mode = 'phenomxphm_roq'
311 if trigger_chirp_mass > 16:
312 settings['request_memory_generation'] = 36.0
313 else:
314 settings['request_memory_generation'] = 50.0
315 if trigger_chirp_mass > 25:
316 settings['request_memory'] = 16.0
317 elif trigger_chirp_mass > 16:
318 settings['request_memory'] = 24.0
319 else:
320 settings['request_memory'] = 36.0
321 setup_arg += ['--cbc-likelihood-mode', likelihood_mode]
322 elif mode == 'fast_test':
323 setup_arg += ["--sampler-kwargs", "FastTest"]
324 if trigger_chirp_mass < 3.9:
325 setup_arg += ['--cbc-likelihood-mode', 'phenompv2_bns_roq']
326 settings['request_memory_generation'] = 8.0
327 else:
328 raise ValueError(f"mode: {mode} not recognized.")
330 with open(path_to_settings, 'w') as f:
331 json.dump(settings, f, indent=2)
333 try:
334 subprocess.run(setup_arg, capture_output=True, check=True)
335 except subprocess.CalledProcessError as e:
336 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
337 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
338 gracedb.upload.delay(
339 filecontents=contents, filename='bilby_dag.log',
340 graceid=superevent_id,
341 message=f'Failed to prepare DAG for {mode}-mode bilby', tags='pe'
342 )
343 raise
344 else:
345 # Uploads bilby ini file to GraceDB
346 with open(os.path.join(rundir, 'bilby_config.ini'), 'r') as f:
347 ini_contents = f.read()
348 if mode == 'production':
349 filename = 'bilby_config.ini'
350 else:
351 filename = f'bilby_{mode}_config.ini'
352 gracedb.upload.delay(
353 ini_contents, filename=filename, graceid=superevent_id,
354 message=(f'Automatically generated {mode}-mode Bilby configuration'
355 ' file for this event.'),
356 tags='pe')
358 path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit'))
359 return path_to_dag
362@app.task(shared=False)
363def _setup_dag_for_rapidpe(rundir, superevent_id, event):
364 """Create DAG for a rapidpe run and return the path to DAG.
366 Parameters
367 ----------
368 rundir : str
369 The path to a run directory where the DAG file is created
370 superevent_id : str
371 The GraceDB ID of a target superevent
373 Returns
374 -------
375 path_to_dag : str
376 The path to the .dag file
378 """
379 gracedb_host = app.conf['gracedb_host']
381 settings = app.conf['rapidpe_settings']
382 trigger_snr = event['extra_attributes']['CoincInspiral']['snr']
383 high_snr_trigger = trigger_snr >= 37.5
385 # dump ini file
386 ini_template = env.get_template('rapidpe.jinja2')
387 ini_contents = ini_template.render(
388 {'rundir': rundir,
389 'webdir': os.path.join(
390 app.conf['pe_results_path'], superevent_id, 'rapidpe'
391 ),
392 'gracedb_url': f'https://{gracedb_host}/api',
393 'superevent_id': superevent_id,
394 'run_mode': settings['run_mode'],
395 'frame_data_types': app.conf['low_latency_frame_types'],
396 'accounting_group': settings['accounting_group'],
397 'use_cprofile': settings['use_cprofile'],
398 'gracedb_host': gracedb_host,
399 'high_snr_trigger': high_snr_trigger,
400 'getenv': RAPIDPE_GETENV,
401 'environment': RAPIDPE_ENVIRONMENT})
402 path_to_ini = os.path.join(rundir, 'rapidpe.ini')
403 with open(path_to_ini, 'w') as f:
404 f.write(ini_contents)
405 gracedb.upload.delay(
406 ini_contents, filename=os.path.basename(path_to_ini),
407 graceid=superevent_id,
408 message=('Automatically generated RapidPE-RIFT configuration file'
409 ' for this event.'),
410 tags='pe')
412 # set up dag
413 try:
414 subprocess.run(['rapidpe-rift-pipe', path_to_ini],
415 capture_output=True, check=True)
416 except subprocess.CalledProcessError as e:
417 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
418 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
420 message = 'Failed to prepare DAG for Rapid PE'
422 fail_gracefully = False
423 if e.returncode == _RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE:
424 fail_gracefully = True
425 message += ": no GstLAL trigger available"
427 gracedb.upload.delay(
428 filecontents=contents, filename='rapidpe_dag.log',
429 graceid=superevent_id,
430 message=message, tags='pe'
431 )
433 if fail_gracefully:
434 # Ends task but without logging as a failure
435 raise Ignore()
436 else:
437 # Ends task with the unhandled error logged
438 raise
440 # return path to dag
441 dag = os.path.join(rundir, "event_all_iterations.dag")
442 return dag
445@app.task(shared=False)
446def _condor_no_submit(path_to_dag, include_env=None):
447 """Run 'condor_submit_dag -no_submit' and return the path to .sub file."""
448 args = ['condor_submit_dag']
450 if include_env is not None:
451 args += ['-include_env', ','.join(include_env)]
453 args += ['-no_submit', path_to_dag]
454 subprocess.run(args, capture_output=True, check=True)
455 return '{}.condor.sub'.format(path_to_dag)
458def dag_prepare_task(rundir, event, superevent_id, pe_pipeline, **kwargs):
459 """Return a canvas of tasks to prepare DAG.
461 Parameters
462 ----------
463 rundir : str
464 The path to a run directory where the DAG file is created
465 event : dict
466 The json contents of a target G event retrieved from
467 gracedb.get_event(), whose mass and spin information are used to
468 determine analysis settings.
469 superevent_id : str
470 The GraceDB ID of a target superevent
471 pe_pipeline : str
472 The parameter estimation pipeline used,
473 lalinference, bilby, or rapidpe.
475 Returns
476 -------
477 canvas : canvas of tasks
478 The canvas of tasks to prepare DAG
480 """
481 # List of environment variables `condor_submit_dag` should be aware of.
482 include_env = None
484 if pe_pipeline == 'lalinference':
485 canvas = gracedb.download.si('coinc.xml', event['graceid']) | \
486 _setup_dag_for_lalinference.s(rundir, event, superevent_id)
487 elif pe_pipeline == 'bilby':
488 canvas = group(
489 gracedb.download.si('coinc.xml', event['graceid']),
490 gracedb.download.si('bayestar.multiorder.fits', event['graceid'])
491 ) | _setup_dag_for_bilby.s(
492 rundir, event, superevent_id, kwargs['bilby_mode']
493 )
494 elif pe_pipeline == 'rapidpe':
495 canvas = _setup_dag_for_rapidpe.s(rundir, superevent_id, event)
496 include_env = RAPIDPE_GETENV
497 else:
498 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
500 canvas |= _condor_no_submit.s(include_env=include_env)
502 return canvas
505def _find_paths_from_name(directory, name):
506 """Return the paths of files or directories with given name under the
507 specfied directory
509 Parameters
510 ----------
511 directory : string
512 Name of directory under which the target file or directory is searched
513 for.
514 name : string
515 Name of target files or directories
517 Returns
518 -------
519 paths : generator
520 Paths to the target files or directories
522 """
523 return glob.iglob(os.path.join(directory, '**', name), recursive=True)
526@app.task(ignore_result=True, shared=False)
527def _clean_up_bilby(rundir):
528 """Remove large data products produced by bilby
530 Parameters
531 ----------
532 rundir : str
534 """
535 for p in glob.glob(
536 os.path.join(rundir, "data/*_generation_roq_weights.hdf5")
537 ):
538 os.remove(p)
539 for p in glob.glob(
540 os.path.join(rundir, "data/*_generation_data_dump.pickle")
541 ):
542 os.remove(p)
545@app.task(ignore_result=True, shared=False)
546def job_error_notification(request, exc, traceback,
547 superevent_id, rundir, analysis):
548 """Upload notification when condor.submit terminates unexpectedly.
550 Parameters
551 ----------
552 request : Context (placeholder)
553 Task request variables
554 exc : Exception
555 Exception raised by condor.submit
556 traceback : str (placeholder)
557 Traceback message from a task
558 superevent_id : str
559 The GraceDB ID of a target superevent
560 rundir : str
561 The run directory for PE
562 analysis : str
563 Analysis name used as a label in uploaded messages
565 Notes
566 -----
567 Some large bilby data products are cleaned up after the notification if the
568 gracedb host is different from `gracedb.ligo.org`.
570 """
571 if isinstance(exc, condor.JobRunning):
572 subprocess.run(['condor_rm', str(exc.args[0]['Cluster'])])
573 canvas = gracedb.upload.si(
574 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
575 message=f'The {analysis} condor job was aborted by gwcelery, '
576 'due to its long run time.'
577 )
578 elif isinstance(exc, condor.JobAborted):
579 canvas = gracedb.upload.si(
580 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
581 message=f'The {analysis} condor job was aborted.'
582 )
583 else:
584 canvas = gracedb.upload.si(
585 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
586 message=f'The {analysis} condor job failed.'
587 )
589 if analysis == "rapidpe":
590 to_upload = [
591 'event_all_iterations.dag.lib.err',
592 'marginalize_extrinsic_parameters_iteration_*.dag.lib.err'
593 ]
594 else:
595 to_upload = ['*.log', '*.err', '*.out']
596 for filename in to_upload:
597 tasks = []
598 for path in _find_paths_from_name(rundir, filename):
599 with open(path, 'rb') as f:
600 contents = f.read()
601 if contents:
602 # put .log suffix in log file names so that users can directly
603 # read the contents instead of downloading them when they click
604 # file names
605 tasks.append(gracedb.upload.si(
606 filecontents=contents,
607 filename=os.path.basename(path) + '.log',
608 graceid=superevent_id,
609 message=f'A log file for {analysis} condor job.',
610 tags='pe'
611 ))
612 canvas |= group(tasks)
614 if "bilby" in analysis and app.conf['gracedb_host'] != 'gracedb.ligo.org':
615 canvas |= _clean_up_bilby.si(rundir)
617 canvas.delay()
620def _upload_tasks_lalinference(rundir, superevent_id):
621 """Return canvas of tasks to upload LALInference results
623 Parameters
624 ----------
625 rundir : str
626 The path to a run directory
627 superevent_id : str
628 The GraceDB ID of a target superevent
630 Returns
631 -------
632 tasks : canvas
633 The work-flow for uploading LALInference results
635 """
636 pe_results_path = os.path.join(
637 app.conf['pe_results_path'], superevent_id, 'lalinference'
638 )
640 # posterior samples
641 path, = glob.glob(
642 os.path.join(rundir, '**', 'posterior*.hdf5'), recursive=True)
643 with open(path, 'rb') as f:
644 canvas = gracedb.upload.si(
645 f.read(), 'LALInference.posterior_samples.hdf5',
646 superevent_id, 'LALInference posterior samples', 'pe')
648 # plots
649 tasks = []
650 for filename, message in [
651 ('extrinsic.png', 'LALInference corner plot for extrinsic parameters'),
652 ('intrinsic.png', 'LALInference corner plot for intrinsic parameters')
653 ]:
654 # Here it is not required that only a single png file exists, so that
655 # posterior samples are uploaded whatever. This applies for the other
656 # files.
657 for path in _find_paths_from_name(pe_results_path, filename):
658 with open(path, 'rb') as f:
659 tasks.append(gracedb.upload.si(
660 f.read(), f'LALInference.{filename}', superevent_id,
661 message, 'pe'
662 ))
663 canvas |= group(tasks)
665 # psd
666 tasks = []
667 for path in _find_paths_from_name(rundir, 'glitch_median_PSD_forLI_*.dat'):
668 with open(path, 'r') as f:
669 tasks.append(gracedb.upload.si(
670 f.read(), os.path.basename(path), superevent_id,
671 'Bayeswave PSD used for LALInference PE', 'pe'
672 ))
673 canvas |= group(tasks)
675 # dag
676 tasks = []
677 for path in _find_paths_from_name(rundir, 'lalinference*.dag'):
678 with open(path, 'r') as f:
679 tasks.append(gracedb.upload.si(
680 f.read(), os.path.basename(path), superevent_id,
681 'LALInference DAG', 'pe'
682 ))
683 canvas |= group(tasks)
685 # link to results page
686 tasks = []
687 for path in _find_paths_from_name(pe_results_path, 'posplots.html'):
688 baseurl = urllib.parse.urljoin(
689 app.conf['pe_results_url'],
690 os.path.relpath(path, app.conf['pe_results_path'])
691 )
692 tasks.append(gracedb.upload.si(
693 None, None, superevent_id,
694 'Online lalinference parameter estimation finished. '
695 f'<a href={baseurl}>results</a>'
696 ))
697 canvas |= group(tasks)
699 return canvas
702def _upload_tasks_bilby(rundir, superevent_id, mode):
703 """Return canvas of tasks to upload Bilby results
705 Parameters
706 ----------
707 rundir : str
708 The path to a run directory
709 superevent_id : str
710 The GraceDB ID of a target superevent
711 mode : str
712 Analysis mode
714 Returns
715 -------
716 tasks : canvas
717 The work-flow for uploading Bilby results
719 Notes
720 -----
721 Some large bilby data products are cleaned up after posteterior file is
722 uploaded if the gracedb host is different from `gracedb.ligo.org`.
724 """
725 # convert bilby sample file into one compatible with ligo-skymap
726 samples_dir = os.path.join(rundir, 'final_result')
727 if mode == 'production':
728 samples_filename = 'Bilby.posterior_samples.hdf5'
729 else:
730 samples_filename = f'Bilby.{mode}.posterior_samples.hdf5'
731 out_samples = os.path.join(samples_dir, samples_filename)
732 in_samples, = glob.glob(os.path.join(samples_dir, '*result.hdf5'))
733 subprocess.run(
734 ['bilby_pipe_to_ligo_skymap_samples', in_samples, '--out', out_samples]
735 )
737 with open(out_samples, 'rb') as f:
738 canvas = gracedb.upload.si(
739 f.read(), samples_filename,
740 superevent_id, f'{mode}-mode Bilby posterior samples', 'pe')
742 if app.conf['gracedb_host'] != 'gracedb.ligo.org':
743 canvas |= _clean_up_bilby.si(rundir)
745 # pesummary
746 pesummary_kwargs = {}
747 path_to_ini, = glob.glob(os.path.join(rundir, "*_complete.ini"))
748 pesummary_kwargs["config"] = path_to_ini
749 config_parser = BilbyConfigFileParser()
750 with open(path_to_ini, "r") as f:
751 config_content, _, _, _ = config_parser.parse(f)
752 pesummary_kwargs["psd"] = convert_string_to_dict(
753 config_content["psd-dict"]
754 )
755 pesummary_kwargs["calibration"] = convert_string_to_dict(
756 config_content["spline-calibration-envelope-dict"]
757 )
758 pesummary_kwargs["approximant"] = config_content["waveform-approximant"]
759 pesummary_kwargs["f_low"] = config_content["minimum-frequency"]
760 pesummary_kwargs["f_ref"] = config_content["reference-frequency"]
761 pesummary_kwargs["label"] = "online"
763 webdir = os.path.join(config_content["webdir"], 'pesummary')
764 url = urllib.parse.urljoin(
765 app.conf['pe_results_url'],
766 os.path.relpath(
767 os.path.join(webdir, 'home.html'),
768 app.conf['pe_results_path']
769 )
770 )
771 canvas = group(
772 canvas,
773 _pesummary_task(webdir, in_samples, **pesummary_kwargs)
774 |
775 gracedb.upload.si(
776 None, None, superevent_id,
777 f'PESummary page for {mode}-mode Bilby is available '
778 f'<a href={url}>here</a>',
779 'pe'
780 )
781 )
783 return canvas
786def _upload_tasks_rapidpe(rundir, superevent_id):
787 summary_path = os.path.join(rundir, "summary")
789 url = urllib.parse.urljoin(
790 app.conf['pe_results_url'],
791 os.path.join(superevent_id, 'rapidpe', 'summarypage.html')
792 )
793 canvas = gracedb.upload.si(
794 None, None, superevent_id,
795 f'Summary page for RapidPE-RIFT is available <a href={url}>here</a>',
796 ('pe',))
797 to_upload = [
798 (
799 "p_astro.json", "RapidPE_RIFT.p_astro.json",
800 "RapidPE-RIFT Pastro results",
801 ("pe", "p_astro", "public"),
802 ),
803 ]
804 tasks = []
805 for src_basename, dst_filename, description, tags in to_upload:
806 src_filename = os.path.join(summary_path, src_basename)
807 if os.path.isfile(src_filename):
808 with open(src_filename, "rb") as f:
809 tasks.append(
810 gracedb.upload.si(
811 f.read(), dst_filename,
812 superevent_id, description, tags))
814 canvas |= group(tasks)
816 return canvas
819@app.task(ignore_result=True, shared=False)
820def dag_finished(rundir, superevent_id, pe_pipeline, **kwargs):
821 """Upload PE results
823 Parameters
824 ----------
825 rundir : str
826 The path to a run directory where the DAG file exits
827 superevent_id : str
828 The GraceDB ID of a target superevent
829 pe_pipeline : str
830 The parameter estimation pipeline used,
831 lalinference, bilby, or rapidpe.
833 """
834 if pe_pipeline == 'lalinference':
835 canvas = _upload_tasks_lalinference(rundir, superevent_id)
836 elif pe_pipeline == 'bilby':
837 canvas = _upload_tasks_bilby(
838 rundir, superevent_id, kwargs['bilby_mode'])
839 elif pe_pipeline == 'rapidpe':
840 canvas = _upload_tasks_rapidpe(rundir, superevent_id)
841 else:
842 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
844 canvas.delay()
846 # NOTE: check if this should include rapidpe as well
847 if pe_pipeline == 'bilby':
848 gracedb.create_label.delay('PE_READY', superevent_id)
851def _pesummary_task(webdir, samples, **pesummary_kwargs):
852 """Return a celery task to submit a pesummary condor job.
854 Parameters
855 ----------
856 webdir : str
857 output directory
858 samples : str
859 path to posterior sample file
860 **pesummary_kwargs
861 Extra arguments of summarypages
863 Returns
864 -------
865 celery task
867 Notes
868 -----
869 `--disable_interactive --disable_expert` are added and `--redshift_method
870 exact --evolve_spins_forwards` are not added to `summarypages` arguments
871 when the gracedb host is different from `gracedb.ligo.org`. Condor queue is
872 set to `Online_PE` if gracedb host is `gracedb.ligo.org`, and
873 `Online_PE_MDC` otherwise.
875 """
876 args = [
877 "summarypages", "--webdir", webdir, "--samples", samples, "--gw",
878 "--no_ligo_skymap", "--multi_process", "6"
879 ]
880 for key in pesummary_kwargs:
881 if key in ["psd", "calibration"]:
882 # these values can be none if calibration envelopes
883 # or PSD files aren't passed to bilby_pipe
884 # see https://git.ligo.org/emfollow/gwcelery/-/issues/852
885 # and related issues
886 if pesummary_kwargs[key] is None:
887 continue
888 args += [f"--{key}"]
889 for ifo in pesummary_kwargs[key]:
890 args += [f'{ifo}:{pesummary_kwargs[key][ifo]}']
891 else:
892 args += [f"--{key}", pesummary_kwargs[key]]
893 condor_kwargs = dict(
894 request_memory=16000, request_disk=5000, request_cpus=6,
895 accounting_group_user='soichiro.morisaki'
896 )
897 if app.conf['gracedb_host'] != 'gracedb.ligo.org':
898 condor_kwargs['accounting_group'] = 'ligo.dev.o4.cbc.pe.bilby'
899 condor_kwargs['requirements'] = '((TARGET.Online_PE_MDC =?= True))'
900 condor_kwargs['+Online_PE_MDC'] = True
901 args += ["--disable_interactive", "--disable_expert"]
902 else:
903 condor_kwargs['accounting_group'] = 'ligo.prod.o4.cbc.pe.bilby'
904 condor_kwargs['requirements'] = '((TARGET.Online_PE =?= True))'
905 condor_kwargs['+Online_PE'] = True
906 args += ["--redshift_method", "exact", "--evolve_spins_forwards"]
907 return condor.check_output.si(args, **condor_kwargs)
910# Modified version of condor.submit task with retry kwargs overridden with
911# RapidPE-specific settings.
912submit_rapidpe = app.task(
913 **condor.submit_kwargs,
914 **app.conf['rapidpe_condor_retry_kwargs'],
915)(condor.submit.run)
918@app.task(ignore_result=True, shared=False)
919def start_pe(event, superevent_id, pe_pipeline):
920 """Run Parameter Estimation on a given event.
922 Parameters
923 ----------
924 event : dict
925 The json contents of a target G event retrieved from
926 gracedb.get_event(), whose mass and spin information are used to
927 determine analysis settings.
928 superevent_id : str
929 The GraceDB ID of a target superevent
930 pe_pipeline : str
931 The parameter estimation pipeline used,
932 lalinference, bilby, or rapidpe.
934 """
935 # make an event directory
936 pipeline_dir = platformdirs.user_cache_dir(pe_pipeline, ensure_exists=True)
937 event_dir = os.path.join(pipeline_dir, superevent_id)
939 if pe_pipeline == 'bilby':
940 if (
941 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' and
942 event['extra_attributes']['CoincInspiral']['mchirp'] >= 12
943 ):
944 # Count the number of BBH jobs and do not start a run if it exceeds
945 # 5 so that we do not use up disk space. We assume that the job is
946 # running if a data dump pickle file exists under the run
947 # directory, which is the largest file produced by PE and removed
948 # when the run completes.
949 number_of_bbh_running = 0
950 for p in glob.glob(
951 os.path.join(
952 pipeline_dir,
953 "*/*/data/*_generation_data_dump.pickle"
954 )
955 ):
956 path_to_ev = os.path.join(os.path.dirname(p), "../event.json")
957 if os.path.exists(path_to_ev):
958 with open(path_to_ev, "r") as f:
959 ev = json.load(f)
960 mc = ev['extra_attributes']['CoincInspiral']['mchirp']
961 if mc >= 12:
962 number_of_bbh_running += 1
963 if number_of_bbh_running > 5:
964 gracedb.upload.delay(
965 filecontents=None, filename=None, graceid=superevent_id,
966 message='Parameter estimation will not start to save disk '
967 f'space (There are {number_of_bbh_running} BBH '
968 'jobs running).',
969 tags='pe'
970 )
971 return
972 modes = ["production"]
973 rundirs = [os.path.join(event_dir, m) for m in modes]
974 kwargs_list = [{'bilby_mode': m} for m in modes]
975 analyses = [f'{m}-mode bilby' for m in modes]
976 condor_submit_task = condor.submit
977 elif pe_pipeline == 'rapidpe':
978 rundirs = [event_dir]
979 kwargs_list = [{'event_pipeline': event["pipeline"]}]
980 analyses = [pe_pipeline]
981 condor_submit_task = submit_rapidpe
982 else:
983 rundirs = [event_dir]
984 kwargs_list = [{}]
985 analyses = [pe_pipeline]
986 condor_submit_task = condor.submit
988 for rundir, kwargs, analysis in zip(rundirs, kwargs_list, analyses):
989 os.makedirs(rundir, exist_ok=True)
991 gracedb.upload.delay(
992 filecontents=None, filename=None, graceid=superevent_id,
993 message=(f'Starting {analysis} parameter estimation '
994 f'for {event["graceid"]}'),
995 tags='pe'
996 )
998 (
999 dag_prepare_task(
1000 rundir, event, superevent_id, pe_pipeline, **kwargs
1001 )
1002 |
1003 condor_submit_task.s().on_error(
1004 job_error_notification.s(superevent_id, rundir, analysis)
1005 )
1006 |
1007 dag_finished.si(rundir, superevent_id, pe_pipeline, **kwargs)
1008 ).delay()