Coverage for gwcelery/tasks/gracedb.py: 80%
188 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Communication with GraceDB."""
2import functools
3import re
5import gracedb_sdk
6from celery.utils.log import get_task_logger
7from requests.exceptions import ConnectionError, HTTPError
9from .. import app
10from ..util import PromiseProxy
12client = PromiseProxy(gracedb_sdk.Client,
13 ('https://' + app.conf.gracedb_host + '/api/',),
14 {'fail_if_noauth': True, 'cert_reload': True})
16log = get_task_logger(__name__)
19class RetryableHTTPError(HTTPError):
20 """Exception class for server-side HTTP errors that we should retry."""
23def catch_retryable_http_errors(f):
24 """Decorator to capture server-side errors that we should retry.
26 We retry HTTP status 502 (Bad Gateway), 503 (Service Unavailable), and
27 504 (Gateway Timeout). We also retry client side error codes 408 (Timeout),
28 409 (Conflicting URL), 429 (Too many requests).
29 """
30 @functools.wraps(f)
31 def wrapper(*args, **kwargs):
32 try:
33 return f(*args, **kwargs)
34 except HTTPError as e:
35 if e.response.status_code in {408, 409, 429, 500, 502, 503, 504}:
36 raise RetryableHTTPError(
37 *e.args, request=e.request, response=e.response)
38 else:
39 raise
41 return wrapper
44def task(*args, **kwargs):
45 return app.task(*args, **kwargs,
46 autoretry_for=(
47 ConnectionError, RetryableHTTPError, TimeoutError),
48 default_retry_delay=20.0, retry_backoff=True,
49 retry_kwargs=dict(max_retries=10))
52versioned_filename_regex = re.compile(
53 r'^(?P<filename>.*?)(?:,(?P<file_version>\d+))?$')
56def _parse_versioned_filename(versioned_filename):
57 match = versioned_filename_regex.fullmatch(versioned_filename)
58 filename = match['filename']
59 file_version = match['file_version']
60 if file_version is not None:
61 file_version = int(file_version)
62 return filename, file_version
65@task(ignore_result=True, shared=False)
66@catch_retryable_http_errors
67def add_pipeline_preferred_event(superevent_id, event_id):
68 client.superevents[superevent_id].pipeline_preferred_events.add(event_id)
71@task(shared=False)
72@catch_retryable_http_errors
73def create_event(filecontents, search, pipeline, group, labels=()):
74 """Create an event in GraceDB."""
75 response = client.events.create(group=group, pipeline=pipeline,
76 filename='initial.data', search=search,
77 filecontents=filecontents, labels=labels)
78 return response
81@task(ignore_result=True, shared=False)
82@catch_retryable_http_errors
83def create_label(label, graceid):
84 """Create a label in GraceDB."""
85 try:
86 client.events[graceid].labels.create(label)
87 except HTTPError as e:
88 # If we got a 400 error because no change was made, then ignore
89 # the exception and return successfully to preserve idempotency.
90 messages = {
91 b'"The \'ADVREQ\' label cannot be applied to request a signoff '
92 b'because a related signoff already exists."',
94 b'"The fields superevent, name must make a unique set."'
95 }
96 if e.response.content not in messages:
97 raise
100@task(ignore_result=True, shared=False)
101@catch_retryable_http_errors
102def create_label_with_log(log_message, label, tags, superevent_id):
103 """Create a label with a log for a superevent in GraceDB."""
104 try:
105 client.superevents[superevent_id].logs.create(
106 comment=log_message,
107 label=label,
108 tags=tags
109 )
110 except HTTPError:
111 raise
114@task(ignore_result=True, shared=False)
115@catch_retryable_http_errors
116def remove_label(label, graceid):
117 """Remove a label in GraceDB."""
118 try:
119 client.events[graceid].labels.delete(label)
120 except HTTPError as e:
121 # If the label did not exist, then GraceDB will return a 404 error.
122 # Don't treat this as a failure because we got what we wanted: for the
123 # label to be removed.
124 if e.response.status_code != 404:
125 raise
128@task(ignore_result=True, shared=False)
129@catch_retryable_http_errors
130def create_signoff(status, comment, signoff_type, graceid):
131 """Create a signoff in GraceDB."""
132 try:
133 client.superevents[graceid].signoff(signoff_type, status, comment)
134 except HTTPError as e:
135 # If we got a 400 error because the signoff was already applied,
136 # then ignore the exception and return successfully to preserve
137 # idempotency.
138 message = b'The fields superevent, instrument must make a unique set'
139 if message not in e.response.content:
140 raise
143@task(ignore_result=True, shared=False)
144@catch_retryable_http_errors
145def create_tag(filename, tag, graceid):
146 """Create a tag in GraceDB."""
147 filename, file_version = _parse_versioned_filename(filename)
148 log = get_log(graceid)
149 if file_version is None:
150 *_, entry = (e for e in log if e['filename'] == filename)
151 else:
152 *_, entry = (e for e in log if e['filename'] == filename
153 and e['file_version'] == file_version)
154 log_number = entry['N']
155 try:
156 client.events[graceid].logs[log_number].tags.create(tag)
157 except HTTPError as e:
158 # If we got a 400 error because no change was made, then ignore
159 # the exception and return successfully to preserve idempotency.
160 message = b'"Tag is already applied to this log message"'
161 if e.response.content != message:
162 raise
165@task(shared=False)
166@catch_retryable_http_errors
167def create_voevent(graceid, voevent_type, **kwargs):
168 """Create a VOEvent.
170 Returns
171 -------
172 str
173 The filename of the new VOEvent.
175 """
176 response = client.events[graceid].voevents.create(
177 voevent_type=voevent_type, **kwargs)
178 return response['filename']
181@task(shared=False)
182@catch_retryable_http_errors
183def download(filename, graceid):
184 """Download a file from GraceDB."""
185 with client.events[graceid].files[filename].get() as f:
186 return f.read()
189@task(ignore_result=True, shared=False)
190@catch_retryable_http_errors
191def expose(graceid):
192 """Expose an event to the public.
194 Notes
195 -----
196 If :obj:`~gwcelery.conf.expose_to_public` is False, then this because a
197 no-op.
199 """
200 if app.conf['expose_to_public']:
201 client.superevents[graceid].expose()
204@task(shared=False)
205@catch_retryable_http_errors
206def get_events(query, **kwargs):
207 """Get events from GraceDB."""
208 return list(client.events.search(query=query, **kwargs))
211@task(shared=False)
212@catch_retryable_http_errors
213def get_event(graceid):
214 """Retrieve an event from GraceDB."""
215 return client.events[graceid].get()
218@task(shared=False)
219@catch_retryable_http_errors
220def get_group(graceid):
221 """Retrieve the search field of an event from GraceDB."""
222 return client.events[graceid].get()['group']
225@task(shared=False)
226@catch_retryable_http_errors
227def get_search(graceid):
228 """Retrieve the search field of an event from GraceDB."""
229 return client.events[graceid].get()['search']
232@task(shared=False)
233@catch_retryable_http_errors
234def get_labels(graceid):
235 """Get all labels for an event in GraceDB."""
236 return {row['name'] for row in client.events[graceid].labels.get()}
239@task(shared=False)
240@catch_retryable_http_errors
241def get_log(graceid):
242 """Get all log messages for an event in GraceDB."""
243 return client.events[graceid].logs.get()
246@task(shared=False)
247@catch_retryable_http_errors
248def get_superevent(graceid):
249 """Retrieve a superevent from GraceDB."""
250 return client.superevents[graceid].get()
253@task(shared=False)
254@catch_retryable_http_errors
255def replace_event(graceid, payload):
256 """Get an event from GraceDB."""
257 return client.events.update(graceid, filecontents=payload)
260@task(shared=False)
261@catch_retryable_http_errors
262def upload(filecontents, filename, graceid, message, tags=()):
263 """Upload a file to GraceDB."""
264 result = client.events[graceid].logs.create(
265 comment=message, filename=filename,
266 filecontents=filecontents, tags=tags)
267 return '{},{}'.format(result['filename'], result['file_version'])
270@app.task(shared=False)
271@catch_retryable_http_errors
272def get_superevents(query, **kwargs):
273 """List matching superevents in gracedb.
275 Parameters
276 ----------
277 *args
278 arguments passed to :meth:`GraceDb.superevents`
279 **kwargs
280 keyword arguments passed to :meth:`GraceDb.superevents`
282 Returns
283 -------
284 superevents : list
285 The list of the superevents.
287 """
288 return list(client.superevents.search(query=query, **kwargs))
291@task(ignore_result=True, shared=False)
292@catch_retryable_http_errors
293def remove_pipeline_preferred_event(superevent_id, event_id):
294 client.superevents[
295 superevent_id].pipeline_preferred_events.remove(event_id)
298@task(ignore_result=True, shared=False)
299@catch_retryable_http_errors
300def update_superevent(superevent_id, t_start=None,
301 t_end=None, t_0=None, preferred_event=None,
302 em_type=None, time_coinc_far=None,
303 space_coinc_far=None):
304 """
305 Update superevent information. Wrapper around
306 :meth:`updateSuperevent`
308 Parameters
309 ----------
310 superevent_id : str
311 superevent uid
312 t_start : float
313 start of superevent time window, unchanged if None
314 t_end : float
315 end of superevent time window, unchanged if None
316 t_0 : float
317 superevent t_0, unchanged if None
318 preferred_event : str
319 uid of the preferred event, unchanged if None
321 """
322 try:
323 client.superevents.update(
324 superevent_id, t_start=t_start, t_end=t_end, t_0=t_0,
325 preferred_event=preferred_event, em_type=em_type,
326 time_coinc_far=time_coinc_far, space_coinc_far=space_coinc_far)
327 except HTTPError as e:
328 # If we got a 400 error because no change was made, then ignore
329 # the exception and return successfully to preserve idempotency.
330 error_msg = b'"Request would not modify the superevent"'
331 if not (e.response.status_code == 400
332 and e.response.content == error_msg):
333 raise
336@task(shared=False)
337@catch_retryable_http_errors
338def create_superevent(graceid, t0, t_start, t_end):
339 """Create new superevent in GraceDB with `graceid`
341 Parameters
342 ----------
343 graceid : str
344 graceid with which superevent is created.
345 t0 : float
346 ``t_0`` parameter of superevent
347 t_start : float
348 ``t_start`` parameter of superevent
349 t_end : float
350 ``t_end`` parameter of superevent
352 """
353 try:
354 response = client.superevents.create(
355 t_start=t_start, t_0=t0, t_end=t_end, preferred_event=graceid)
356 return response['superevent_id']
357 except HTTPError as e:
358 error_msg = b'is already assigned to a Superevent'
359 if not (e.response.status_code == 400
360 and error_msg in e.response.content):
361 raise
364@task(ignore_result=True, shared=False)
365@catch_retryable_http_errors
366def add_event_to_superevent(superevent_id, graceid):
367 """Add an event to a superevent in GraceDB."""
368 try:
369 client.superevents[superevent_id].add(graceid)
370 except HTTPError as e:
371 error_msg = b'is already assigned to a Superevent'
372 if not (e.response.status_code == 400
373 and error_msg in e.response.content):
374 raise
377@task(shared=False)
378@catch_retryable_http_errors
379def get_superevent_file_list(superevent_id):
380 """Get superevent file list from GraceDB."""
381 filelist = client.superevents[superevent_id].files.get()
382 return filelist
385@task(shared=False)
386@catch_retryable_http_errors
387def get_latest_file(superevent_id, filename):
388 """Get the lastest file provided a file name
390 Parameters
391 ----------
392 superevent_id : str
393 superevent uid
395 file_name : str
396 The filebase of a file name
397 e.g. 'bayestar.multiorder.fits' for 'bayestar.multiorder.fits,0'
399 Returns
400 ----------
401 The versioned filename for the inquired file
402 """
403 # Get file list for superevent
404 file_list = get_superevent_file_list(superevent_id)
405 # Loop over the keys and mark the key if key includes filename
406 keys = [key for key in file_list.keys() if filename in key]
407 if any(keys):
408 return max(keys)
409 else:
410 return None