Coverage for gwcelery/tasks/condor.py: 81%
97 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Submit and monitor HTCondor jobs [1]_.
3Notes
4-----
5Internally, we use the XML condor log format [2]_ for easier parsing.
7References
8----------
9.. [1] http://research.cs.wisc.edu/htcondor/manual/latest/condor_submit.html
10.. [2] http://research.cs.wisc.edu/htcondor/classad/refman/node3.html
12"""
13import os
14import subprocess
15import tempfile
17import lxml.etree
18import platformdirs
20from .. import app
23def _escape_arg(arg):
24 """Escape a command line argument for an HTCondor submit file."""
25 arg = arg.replace('"', '""').replace("'", "''")
26 if ' ' in arg or '\t' in arg:
27 arg = "'" + arg + "'"
28 return arg
31def _escape_args(args):
32 """Escape a list of command line arguments for an HTCondor submit file."""
33 return '"' + ' '.join(_escape_arg(arg) for arg in args) + '"'
36def _mklog(suffix):
37 """Create a unique path for an HTCondor log."""
38 condor_dir = platformdirs.user_cache_dir('condor', ensure_exists=True)
39 with tempfile.NamedTemporaryFile(dir=condor_dir, suffix=suffix) as f:
40 return f.name
43def _read(filename):
44 with open(filename, 'r') as f:
45 return f.read()
48def _rm_f(*args):
49 for arg in args:
50 try:
51 os.remove(arg)
52 except OSError:
53 pass
56def _parse_classad(c):
57 """Turn a ClassAd XML fragment into a dictionary of Python values.
59 Note that this supports only the small subset of the ClassAd XML
60 syntax [2]_ that we need to determine if a job succeeded or failed.
61 """
62 if c is not None:
63 for a in c.findall('a'):
64 key = a.attrib['n']
65 child, = a.getchildren()
66 if child.tag == 's':
67 value = str(child.text)
68 elif child.tag == 'b':
69 value = (child.attrib['v'] == 't')
70 elif child.tag == 'i':
71 value = int(child.text)
72 else:
73 # Coverage skipped below because the Python compiler optimzies
74 # away ``continue`` statements.
75 #
76 # See <https://bitbucket.org/ned/coveragepy/issues/198>.
77 continue # pragma: no cover
78 yield key, value
81def _read_last_event(log):
82 """Get the last event from an HTCondor log file.
84 FIXME: It would be more efficient in terms of I/O and file desciptors to
85 use a single HTCondor log file for all jobs and use the inotify
86 capabilities of ``htcondor.read_events`` to avoid unnecessary polling.
87 """
88 tree = lxml.etree.fromstring('<classads>' + _read(log) + '</classads>')
89 return dict(_parse_classad(tree.find('c[last()]')))
92def _submit(submit_file=None, **kwargs):
93 args = ['condor_submit']
94 for key, value in kwargs.items():
95 args += ['-append', '{}={}'.format(key, value)]
96 if submit_file is None:
97 args += ['/dev/null', '-queue', '1']
98 else:
99 args += [submit_file]
100 subprocess.run(args, capture_output=True, check=True)
103class JobAborted(Exception):
104 """Raised if an HTCondor job was aborted (e.g. by ``condor_rm``)."""
107class JobRunning(Exception):
108 """Raised if an HTCondor job is still running."""
111class JobFailed(subprocess.CalledProcessError):
112 """Raised if an HTCondor job fails."""
115submit_kwargs = dict(
116 bind=True, autoretry_for=(JobRunning,),
117 ignore_result=True, shared=False,
118)
121@app.task(
122 **submit_kwargs,
123 **app.conf['condor_retry_kwargs']
124)
125def submit(self, submit_file, log=None):
126 """Submit a job using HTCondor.
128 Parameters
129 ----------
130 submit_file : str
131 Path of the submit file.
132 log: str
133 Used internally to track job state. Caller should not set.
135 Raises
136 ------
137 :class:`JobAborted`
138 If the job was aborted (e.g. by running ``condor_rm``).
139 :class:`JobFailed`
140 If the job terminates and returns a nonzero exit code.
141 :class:`JobRunning`
142 If the job is still running. Causes the task to be re-queued until the
143 job is complete.
145 Example
146 -------
147 >>> submit.s('example.sub',
148 ... accounting_group='ligo.dev.o3.cbc.explore.test')
149 """
150 if log is None:
151 log = _mklog('.log')
152 try:
153 _submit(submit_file, log_xml='true', log=log)
154 except subprocess.CalledProcessError:
155 _rm_f(log)
156 raise
157 self.retry((submit_file,), dict(log=log))
158 else:
159 event = _read_last_event(log)
160 if event.get('MyType') == 'JobTerminatedEvent':
161 _rm_f(log)
162 if event['TerminatedNormally'] and event['ReturnValue'] != 0:
163 raise JobFailed(event['ReturnValue'], (submit_file,))
164 elif event.get('MyType') == 'JobAbortedEvent':
165 _rm_f(log)
166 raise JobAborted(event)
167 else:
168 raise JobRunning(event)
171@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1,
172 max_retries=None, retry_backoff=True, shared=False)
173def check_output(self, args, log=None, error=None, output=None, **kwargs):
174 """Call a process using HTCondor.
176 Call an external process using HTCondor, in a manner patterned after
177 :meth:`subprocess.check_output`. If successful, returns its output on
178 stdout. On failure, raise an exception.
180 Parameters
181 ----------
182 args : list
183 Command line arguments, as if passed to :func:`subprocess.check_call`.
184 log, error, output : str
185 Used internally to track job state. Caller should not set.
186 **kwargs
187 Extra submit description file commands. See the documentation for
188 ``condor_submit`` for possible values.
190 Returns
191 -------
192 str
193 Captured output from command.
195 Raises
196 ------
197 :class:`JobAborted`
198 If the job was aborted (e.g. by running ``condor_rm``).
199 :class:`JobFailed`
200 If the job terminates and returns a nonzero exit code.
201 :class:`JobRunning`
202 If the job is still running. Causes the task to be re-queued until the
203 job is complete.
205 Example
206 -------
207 >>> check_output.s(['sleep', '10'],
208 ... accounting_group='ligo.dev.o3.cbc.explore.test')
210 """
211 # FIXME: Refactor to reuse common code from this task and
212 # gwcelery.tasks.condor.submit.
214 if log is None:
215 log = _mklog('.log')
216 error = _mklog('.err')
217 output = _mklog('.out')
218 kwargs = dict(kwargs,
219 universe='vanilla',
220 executable='/usr/bin/env',
221 getenv='true',
222 log_xml='true',
223 arguments=_escape_args(args),
224 log=log, error=error, output=output)
225 try:
226 _submit(**kwargs)
227 except subprocess.CalledProcessError:
228 _rm_f(log, error, output)
229 raise
230 self.retry((args,), kwargs)
231 else:
232 event = _read_last_event(log)
233 if event.get('MyType') == 'JobTerminatedEvent':
234 captured_error = _read(error)
235 captured_output = _read(output)
236 _rm_f(log, error, output)
237 if event['TerminatedNormally'] and event['ReturnValue'] == 0:
238 return captured_output
239 else:
240 raise JobFailed(event['ReturnValue'], args,
241 captured_output,
242 captured_error)
243 elif event.get('MyType') == 'JobAbortedEvent':
244 _rm_f(log, error, output)
245 raise JobAborted(event)
246 else:
247 raise JobRunning(event)