Coverage for gwcelery/tools/condor.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Shortcuts for HTCondor commands to manage deployment of GWCelery on LIGO 

2Data Grid clusters. 

3 

4These commands apply to the GWCelery instance that is 

5running in the current working directory. 

6""" 

7import json 

8import os 

9import shlex 

10import subprocess 

11import sys 

12import time 

13from importlib import resources 

14 

15import click 

16import lxml.etree 

17 

18from .. import data 

19 

20SUBMIT_FILE = str(resources.files(data).joinpath('gwcelery.sub')) 

21 

22 

23@click.group(help=__doc__) 

24def condor(): 

25 # Just register the condor command group; 

26 # no need to do anything here. 

27 pass 

28 

29 

30def get_constraints(): 

31 return '-constraint', 'JobBatchName=={} && Iwd=={}'.format( 

32 json.dumps('gwcelery'), # JSON string literal escape sequences 

33 json.dumps(os.getcwd()) # are a close match to HTCondor ClassAds. 

34 ) 

35 

36 

37def run_exec(*args): 

38 print(' '.join(shlex.quote(arg) for arg in args)) 

39 os.execvp(args[0], args) 

40 

41 

42def running(): 

43 """Determine if GWCelery is already running under HTCondor.""" 

44 status = subprocess.check_output(('condor_q', '-xml', *get_constraints())) 

45 classads = lxml.etree.fromstring(status) 

46 return classads.find('.//c') is not None 

47 

48 

49@condor.command() 

50@click.pass_context 

51def submit(ctx): 

52 """Submit all GWCelery jobs to HTCondor (if not already running).""" 

53 if running(): 

54 print('error: GWCelery jobs are already running in this directory.\n' 

55 'First remove existing jobs with "gwcelery condor rm".\n' 

56 'To see the status of those jobs, run "gwcelery condor q".', 

57 file=sys.stderr) 

58 sys.exit(1) 

59 else: 

60 accounting_group = ctx.obj.app.conf['condor_accounting_group'] 

61 run_exec('condor_submit', 

62 'accounting_group={}'.format(accounting_group), 

63 SUBMIT_FILE) 

64 

65 

66@condor.command() 

67@click.pass_context 

68def resubmit(ctx): 

69 """Remove any running GWCelery jobs and resubmit to HTCondor.""" 

70 if running(): 

71 subprocess.check_call(('condor_rm', *get_constraints())) 

72 timeout = 120 

73 start = time.monotonic() 

74 while time.monotonic() - start < timeout: 

75 if not running(): 

76 break 

77 time.sleep(1) 

78 else: 

79 print('error: Could not stop all GWCelery jobs', file=sys.stderr) 

80 sys.exit(1) 

81 accounting_group = ctx.obj.app.conf['condor_accounting_group'] 

82 run_exec('condor_submit', 'accounting_group={}'.format(accounting_group), 

83 SUBMIT_FILE) 

84 

85 

86@condor.command() 

87def rm(): 

88 """Remove all GWCelery jobs.""" 

89 run_exec('condor_rm', *get_constraints()) 

90 

91 

92@condor.command() 

93def hold(): 

94 """Put all GWCelery jobs on hold.""" 

95 run_exec('condor_hold', *get_constraints()) 

96 

97 

98@condor.command() 

99def release(): 

100 """Release all GWCelery jobs from hold status.""" 

101 run_exec('condor_release', *get_constraints()) 

102 

103 

104@condor.command() 

105def q(): 

106 """Show status of all GWCelery jobs.""" 

107 run_exec('condor_q', '-nobatch', *get_constraints())