Coverage for gwcelery/tools/condor.py: 100%
58 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Shortcuts for HTCondor commands to manage deployment of GWCelery on LIGO
2Data Grid clusters.
4These commands apply to the GWCelery instance that is
5running in the current working directory.
6"""
7import json
8import os
9import shlex
10import subprocess
11import sys
12import time
13from importlib import resources
15import click
16import lxml.etree
18from .. import data
20SUBMIT_FILE = str(resources.files(data).joinpath('gwcelery.sub'))
23@click.group(help=__doc__)
24def condor():
25 # Just register the condor command group;
26 # no need to do anything here.
27 pass
30def get_constraints():
31 return '-constraint', 'JobBatchName=={} && Iwd=={}'.format(
32 json.dumps('gwcelery'), # JSON string literal escape sequences
33 json.dumps(os.getcwd()) # are a close match to HTCondor ClassAds.
34 )
37def run_exec(*args):
38 print(' '.join(shlex.quote(arg) for arg in args))
39 os.execvp(args[0], args)
42def running():
43 """Determine if GWCelery is already running under HTCondor."""
44 status = subprocess.check_output(('condor_q', '-xml', *get_constraints()))
45 classads = lxml.etree.fromstring(status)
46 return classads.find('.//c') is not None
49@condor.command()
50@click.pass_context
51def submit(ctx):
52 """Submit all GWCelery jobs to HTCondor (if not already running)."""
53 if running():
54 print('error: GWCelery jobs are already running in this directory.\n'
55 'First remove existing jobs with "gwcelery condor rm".\n'
56 'To see the status of those jobs, run "gwcelery condor q".',
57 file=sys.stderr)
58 sys.exit(1)
59 else:
60 accounting_group = ctx.obj.app.conf['condor_accounting_group']
61 run_exec('condor_submit',
62 'accounting_group={}'.format(accounting_group),
63 SUBMIT_FILE)
66@condor.command()
67@click.pass_context
68def resubmit(ctx):
69 """Remove any running GWCelery jobs and resubmit to HTCondor."""
70 if running():
71 subprocess.check_call(('condor_rm', *get_constraints()))
72 timeout = 120
73 start = time.monotonic()
74 while time.monotonic() - start < timeout:
75 if not running():
76 break
77 time.sleep(1)
78 else:
79 print('error: Could not stop all GWCelery jobs', file=sys.stderr)
80 sys.exit(1)
81 accounting_group = ctx.obj.app.conf['condor_accounting_group']
82 run_exec('condor_submit', 'accounting_group={}'.format(accounting_group),
83 SUBMIT_FILE)
86@condor.command()
87def rm():
88 """Remove all GWCelery jobs."""
89 run_exec('condor_rm', *get_constraints())
92@condor.command()
93def hold():
94 """Put all GWCelery jobs on hold."""
95 run_exec('condor_hold', *get_constraints())
98@condor.command()
99def release():
100 """Release all GWCelery jobs from hold status."""
101 run_exec('condor_release', *get_constraints())
104@condor.command()
105def q():
106 """Show status of all GWCelery jobs."""
107 run_exec('condor_q', '-nobatch', *get_constraints())