Coverage for gwcelery/tasks/condor.py: 82%
98 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-08-16 20:59 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-08-16 20:59 +0000
1"""Submit and monitor HTCondor jobs [1]_.
3Notes
4-----
5Internally, we use the XML condor log format [2]_ for easier parsing.
7References
8----------
9.. [1] http://research.cs.wisc.edu/htcondor/manual/latest/condor_submit.html
10.. [2] http://research.cs.wisc.edu/htcondor/classad/refman/node3.html
12"""
13import os
14import subprocess
15import tempfile
16from distutils.dir_util import mkpath
18import lxml.etree
20from .. import app
23def _escape_arg(arg):
24 """Escape a command line argument for an HTCondor submit file."""
25 arg = arg.replace('"', '""').replace("'", "''")
26 if ' ' in arg or '\t' in arg:
27 arg = "'" + arg + "'"
28 return arg
31def _escape_args(args):
32 """Escape a list of command line arguments for an HTCondor submit file."""
33 return '"' + ' '.join(_escape_arg(arg) for arg in args) + '"'
36def _mklog(suffix):
37 """Create a unique path for an HTCondor log."""
38 condor_dir = os.path.expanduser('~/.cache/condor')
39 mkpath(condor_dir)
40 with tempfile.NamedTemporaryFile(dir=condor_dir, suffix=suffix) as f:
41 return f.name
44def _read(filename):
45 with open(filename, 'r') as f:
46 return f.read()
49def _rm_f(*args):
50 for arg in args:
51 try:
52 os.remove(arg)
53 except OSError:
54 pass
57def _parse_classad(c):
58 """Turn a ClassAd XML fragment into a dictionary of Python values.
60 Note that this supports only the small subset of the ClassAd XML
61 syntax [2]_ that we need to determine if a job succeeded or failed.
62 """
63 if c is not None:
64 for a in c.findall('a'):
65 key = a.attrib['n']
66 child, = a.getchildren()
67 if child.tag == 's':
68 value = str(child.text)
69 elif child.tag == 'b':
70 value = (child.attrib['v'] == 't')
71 elif child.tag == 'i':
72 value = int(child.text)
73 else:
74 # Coverage skipped below because the Python compiler optimzies
75 # away ``continue`` statements.
76 #
77 # See <https://bitbucket.org/ned/coveragepy/issues/198>.
78 continue # pragma: no cover
79 yield key, value
82def _read_last_event(log):
83 """Get the last event from an HTCondor log file.
85 FIXME: It would be more efficient in terms of I/O and file desciptors to
86 use a single HTCondor log file for all jobs and use the inotify
87 capabilities of ``htcondor.read_events`` to avoid unnecessary polling.
88 """
89 tree = lxml.etree.fromstring('<classads>' + _read(log) + '</classads>')
90 return dict(_parse_classad(tree.find('c[last()]')))
93def _submit(submit_file=None, **kwargs):
94 args = ['condor_submit']
95 for key, value in kwargs.items():
96 args += ['-append', '{}={}'.format(key, value)]
97 if submit_file is None:
98 args += ['/dev/null', '-queue', '1']
99 else:
100 args += [submit_file]
101 subprocess.run(args, capture_output=True, check=True)
104class JobAborted(Exception):
105 """Raised if an HTCondor job was aborted (e.g. by ``condor_rm``)."""
108class JobRunning(Exception):
109 """Raised if an HTCondor job is still running."""
112class JobFailed(subprocess.CalledProcessError):
113 """Raised if an HTCondor job fails."""
116submit_kwargs = dict(
117 bind=True, autoretry_for=(JobRunning,),
118 ignore_result=True, shared=False,
119)
122@app.task(
123 **submit_kwargs,
124 **app.conf['condor_retry_kwargs']
125)
126def submit(self, submit_file, log=None):
127 """Submit a job using HTCondor.
129 Parameters
130 ----------
131 submit_file : str
132 Path of the submit file.
133 log: str
134 Used internally to track job state. Caller should not set.
136 Raises
137 ------
138 :class:`JobAborted`
139 If the job was aborted (e.g. by running ``condor_rm``).
140 :class:`JobFailed`
141 If the job terminates and returns a nonzero exit code.
142 :class:`JobRunning`
143 If the job is still running. Causes the task to be re-queued until the
144 job is complete.
146 Example
147 -------
148 >>> submit.s('example.sub',
149 ... accounting_group='ligo.dev.o3.cbc.explore.test')
150 """
151 if log is None:
152 log = _mklog('.log')
153 try:
154 _submit(submit_file, log_xml='true', log=log)
155 except subprocess.CalledProcessError:
156 _rm_f(log)
157 raise
158 self.retry((submit_file,), dict(log=log))
159 else:
160 event = _read_last_event(log)
161 if event.get('MyType') == 'JobTerminatedEvent':
162 _rm_f(log)
163 if event['TerminatedNormally'] and event['ReturnValue'] != 0:
164 raise JobFailed(event['ReturnValue'], (submit_file,))
165 elif event.get('MyType') == 'JobAbortedEvent':
166 _rm_f(log)
167 raise JobAborted(event)
168 else:
169 raise JobRunning(event)
172@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1,
173 max_retries=None, retry_backoff=True, shared=False)
174def check_output(self, args, log=None, error=None, output=None, **kwargs):
175 """Call a process using HTCondor.
177 Call an external process using HTCondor, in a manner patterned after
178 :meth:`subprocess.check_output`. If successful, returns its output on
179 stdout. On failure, raise an exception.
181 Parameters
182 ----------
183 args : list
184 Command line arguments, as if passed to :func:`subprocess.check_call`.
185 log, error, output : str
186 Used internally to track job state. Caller should not set.
187 **kwargs
188 Extra submit description file commands. See the documentation for
189 ``condor_submit`` for possible values.
191 Returns
192 -------
193 str
194 Captured output from command.
196 Raises
197 ------
198 :class:`JobAborted`
199 If the job was aborted (e.g. by running ``condor_rm``).
200 :class:`JobFailed`
201 If the job terminates and returns a nonzero exit code.
202 :class:`JobRunning`
203 If the job is still running. Causes the task to be re-queued until the
204 job is complete.
206 Example
207 -------
208 >>> check_output.s(['sleep', '10'],
209 ... accounting_group='ligo.dev.o3.cbc.explore.test')
211 """
212 # FIXME: Refactor to reuse common code from this task and
213 # gwcelery.tasks.condor.submit.
215 if log is None:
216 log = _mklog('.log')
217 error = _mklog('.err')
218 output = _mklog('.out')
219 kwargs = dict(kwargs,
220 universe='vanilla',
221 executable='/usr/bin/env',
222 getenv='true',
223 log_xml='true',
224 arguments=_escape_args(args),
225 log=log, error=error, output=output)
226 try:
227 _submit(**kwargs)
228 except subprocess.CalledProcessError:
229 _rm_f(log, error, output)
230 raise
231 self.retry((args,), kwargs)
232 else:
233 event = _read_last_event(log)
234 if event.get('MyType') == 'JobTerminatedEvent':
235 captured_error = _read(error)
236 captured_output = _read(output)
237 _rm_f(log, error, output)
238 if event['TerminatedNormally'] and event['ReturnValue'] == 0:
239 return captured_output
240 else:
241 raise JobFailed(event['ReturnValue'], args,
242 captured_output,
243 captured_error)
244 elif event.get('MyType') == 'JobAbortedEvent':
245 _rm_f(log, error, output)
246 raise JobAborted(event)
247 else:
248 raise JobRunning(event)