Coverage for gwcelery/tasks/condor.py: 81%

97 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Submit and monitor HTCondor jobs [1]_. 

2 

3Notes 

4----- 

5Internally, we use the XML condor log format [2]_ for easier parsing. 

6 

7References 

8---------- 

9.. [1] http://research.cs.wisc.edu/htcondor/manual/latest/condor_submit.html 

10.. [2] http://research.cs.wisc.edu/htcondor/classad/refman/node3.html 

11 

12""" 

13import os 

14import subprocess 

15import tempfile 

16 

17import lxml.etree 

18import platformdirs 

19 

20from .. import app 

21 

22 

23def _escape_arg(arg): 

24 """Escape a command line argument for an HTCondor submit file.""" 

25 arg = arg.replace('"', '""').replace("'", "''") 

26 if ' ' in arg or '\t' in arg: 

27 arg = "'" + arg + "'" 

28 return arg 

29 

30 

31def _escape_args(args): 

32 """Escape a list of command line arguments for an HTCondor submit file.""" 

33 return '"' + ' '.join(_escape_arg(arg) for arg in args) + '"' 

34 

35 

36def _mklog(suffix): 

37 """Create a unique path for an HTCondor log.""" 

38 condor_dir = platformdirs.user_cache_dir('condor', ensure_exists=True) 

39 with tempfile.NamedTemporaryFile(dir=condor_dir, suffix=suffix) as f: 

40 return f.name 

41 

42 

43def _read(filename): 

44 with open(filename, 'r') as f: 

45 return f.read() 

46 

47 

48def _rm_f(*args): 

49 for arg in args: 

50 try: 

51 os.remove(arg) 

52 except OSError: 

53 pass 

54 

55 

56def _parse_classad(c): 

57 """Turn a ClassAd XML fragment into a dictionary of Python values. 

58 

59 Note that this supports only the small subset of the ClassAd XML 

60 syntax [2]_ that we need to determine if a job succeeded or failed. 

61 """ 

62 if c is not None: 

63 for a in c.findall('a'): 

64 key = a.attrib['n'] 

65 child, = a.getchildren() 

66 if child.tag == 's': 

67 value = str(child.text) 

68 elif child.tag == 'b': 

69 value = (child.attrib['v'] == 't') 

70 elif child.tag == 'i': 

71 value = int(child.text) 

72 else: 

73 # Coverage skipped below because the Python compiler optimzies 

74 # away ``continue`` statements. 

75 # 

76 # See <https://bitbucket.org/ned/coveragepy/issues/198>. 

77 continue # pragma: no cover 

78 yield key, value 

79 

80 

81def _read_last_event(log): 

82 """Get the last event from an HTCondor log file. 

83 

84 FIXME: It would be more efficient in terms of I/O and file desciptors to 

85 use a single HTCondor log file for all jobs and use the inotify 

86 capabilities of ``htcondor.read_events`` to avoid unnecessary polling. 

87 """ 

88 tree = lxml.etree.fromstring('<classads>' + _read(log) + '</classads>') 

89 return dict(_parse_classad(tree.find('c[last()]'))) 

90 

91 

92def _submit(submit_file=None, **kwargs): 

93 args = ['condor_submit'] 

94 for key, value in kwargs.items(): 

95 args += ['-append', '{}={}'.format(key, value)] 

96 if submit_file is None: 

97 args += ['/dev/null', '-queue', '1'] 

98 else: 

99 args += [submit_file] 

100 subprocess.run(args, capture_output=True, check=True) 

101 

102 

103class JobAborted(Exception): 

104 """Raised if an HTCondor job was aborted (e.g. by ``condor_rm``).""" 

105 

106 

107class JobRunning(Exception): 

108 """Raised if an HTCondor job is still running.""" 

109 

110 

111class JobFailed(subprocess.CalledProcessError): 

112 """Raised if an HTCondor job fails.""" 

113 

114 

115submit_kwargs = dict( 

116 bind=True, autoretry_for=(JobRunning,), 

117 ignore_result=True, shared=False, 

118) 

119 

120 

121@app.task( 

122 **submit_kwargs, 

123 **app.conf['condor_retry_kwargs'] 

124) 

125def submit(self, submit_file, log=None): 

126 """Submit a job using HTCondor. 

127 

128 Parameters 

129 ---------- 

130 submit_file : str 

131 Path of the submit file. 

132 log: str 

133 Used internally to track job state. Caller should not set. 

134 

135 Raises 

136 ------ 

137 :class:`JobAborted` 

138 If the job was aborted (e.g. by running ``condor_rm``). 

139 :class:`JobFailed` 

140 If the job terminates and returns a nonzero exit code. 

141 :class:`JobRunning` 

142 If the job is still running. Causes the task to be re-queued until the 

143 job is complete. 

144 

145 Example 

146 ------- 

147 >>> submit.s('example.sub', 

148 ... accounting_group='ligo.dev.o3.cbc.explore.test') 

149 """ 

150 if log is None: 

151 log = _mklog('.log') 

152 try: 

153 _submit(submit_file, log_xml='true', log=log) 

154 except subprocess.CalledProcessError: 

155 _rm_f(log) 

156 raise 

157 self.retry((submit_file,), dict(log=log)) 

158 else: 

159 event = _read_last_event(log) 

160 if event.get('MyType') == 'JobTerminatedEvent': 

161 _rm_f(log) 

162 if event['TerminatedNormally'] and event['ReturnValue'] != 0: 

163 raise JobFailed(event['ReturnValue'], (submit_file,)) 

164 elif event.get('MyType') == 'JobAbortedEvent': 

165 _rm_f(log) 

166 raise JobAborted(event) 

167 else: 

168 raise JobRunning(event) 

169 

170 

171@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1, 

172 max_retries=None, retry_backoff=True, shared=False) 

173def check_output(self, args, log=None, error=None, output=None, **kwargs): 

174 """Call a process using HTCondor. 

175 

176 Call an external process using HTCondor, in a manner patterned after 

177 :meth:`subprocess.check_output`. If successful, returns its output on 

178 stdout. On failure, raise an exception. 

179 

180 Parameters 

181 ---------- 

182 args : list 

183 Command line arguments, as if passed to :func:`subprocess.check_call`. 

184 log, error, output : str 

185 Used internally to track job state. Caller should not set. 

186 **kwargs 

187 Extra submit description file commands. See the documentation for 

188 ``condor_submit`` for possible values. 

189 

190 Returns 

191 ------- 

192 str 

193 Captured output from command. 

194 

195 Raises 

196 ------ 

197 :class:`JobAborted` 

198 If the job was aborted (e.g. by running ``condor_rm``). 

199 :class:`JobFailed` 

200 If the job terminates and returns a nonzero exit code. 

201 :class:`JobRunning` 

202 If the job is still running. Causes the task to be re-queued until the 

203 job is complete. 

204 

205 Example 

206 ------- 

207 >>> check_output.s(['sleep', '10'], 

208 ... accounting_group='ligo.dev.o3.cbc.explore.test') 

209 

210 """ 

211 # FIXME: Refactor to reuse common code from this task and 

212 # gwcelery.tasks.condor.submit. 

213 

214 if log is None: 

215 log = _mklog('.log') 

216 error = _mklog('.err') 

217 output = _mklog('.out') 

218 kwargs = dict(kwargs, 

219 universe='vanilla', 

220 executable='/usr/bin/env', 

221 getenv='true', 

222 log_xml='true', 

223 arguments=_escape_args(args), 

224 log=log, error=error, output=output) 

225 try: 

226 _submit(**kwargs) 

227 except subprocess.CalledProcessError: 

228 _rm_f(log, error, output) 

229 raise 

230 self.retry((args,), kwargs) 

231 else: 

232 event = _read_last_event(log) 

233 if event.get('MyType') == 'JobTerminatedEvent': 

234 captured_error = _read(error) 

235 captured_output = _read(output) 

236 _rm_f(log, error, output) 

237 if event['TerminatedNormally'] and event['ReturnValue'] == 0: 

238 return captured_output 

239 else: 

240 raise JobFailed(event['ReturnValue'], args, 

241 captured_output, 

242 captured_error) 

243 elif event.get('MyType') == 'JobAbortedEvent': 

244 _rm_f(log, error, output) 

245 raise JobAborted(event) 

246 else: 

247 raise JobRunning(event)