Coverage for gwcelery/tasks/gracedb.py: 81%
181 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Communication with GraceDB."""
2import functools
3import re
5import gracedb_sdk
6from celery.utils.log import get_task_logger
7from requests.exceptions import ConnectionError, HTTPError
9from .. import app
10from ..util import PromiseProxy
12client = PromiseProxy(gracedb_sdk.Client,
13 ('https://' + app.conf.gracedb_host + '/api/',),
14 {'fail_if_noauth': True, 'cert_reload': True})
16log = get_task_logger(__name__)
19class RetryableHTTPError(HTTPError):
20 """Exception class for server-side HTTP errors that we should retry."""
23def catch_retryable_http_errors(f):
24 """Decorator to capture server-side errors that we should retry.
26 We retry HTTP status 502 (Bad Gateway), 503 (Service Unavailable), and
27 504 (Gateway Timeout). We also retry client side error codes 408 (Timeout),
28 409 (Conflicting URL), 429 (Too many requests).
29 """
30 @functools.wraps(f)
31 def wrapper(*args, **kwargs):
32 try:
33 return f(*args, **kwargs)
34 except HTTPError as e:
35 if e.response.status_code in {408, 409, 429, 500, 502, 503, 504}:
36 raise RetryableHTTPError(
37 *e.args, request=e.request, response=e.response)
38 else:
39 raise
41 return wrapper
44def task(*args, **kwargs):
45 return app.task(*args, **kwargs,
46 autoretry_for=(
47 ConnectionError, RetryableHTTPError, TimeoutError),
48 default_retry_delay=20.0, retry_backoff=True,
49 retry_kwargs=dict(max_retries=10))
52versioned_filename_regex = re.compile(
53 r'^(?P<filename>.*?)(?:,(?P<file_version>\d+))?$')
56def _parse_versioned_filename(versioned_filename):
57 match = versioned_filename_regex.fullmatch(versioned_filename)
58 filename = match['filename']
59 file_version = match['file_version']
60 if file_version is not None:
61 file_version = int(file_version)
62 return filename, file_version
65@task(ignore_result=True, shared=False)
66@catch_retryable_http_errors
67def add_pipeline_preferred_event(superevent_id, event_id):
68 client.superevents[superevent_id].pipeline_preferred_events.add(event_id)
71@task(shared=False)
72@catch_retryable_http_errors
73def create_event(filecontents, search, pipeline, group, labels=()):
74 """Create an event in GraceDB."""
75 response = client.events.create(group=group, pipeline=pipeline,
76 filename='initial.data', search=search,
77 filecontents=filecontents, labels=labels)
78 return response
81@task(ignore_result=True, shared=False)
82@catch_retryable_http_errors
83def create_label(label, graceid):
84 """Create a label in GraceDB."""
85 try:
86 client.events[graceid].labels.create(label)
87 except HTTPError as e:
88 # If we got a 400 error because no change was made, then ignore
89 # the exception and return successfully to preserve idempotency.
90 messages = {
91 b'"The \'ADVREQ\' label cannot be applied to request a signoff '
92 b'because a related signoff already exists."',
94 b'"The fields superevent, name must make a unique set."'
95 }
96 if e.response.content not in messages:
97 raise
100@task(ignore_result=True, shared=False)
101@catch_retryable_http_errors
102def remove_label(label, graceid):
103 """Remove a label in GraceDB."""
104 try:
105 client.events[graceid].labels.delete(label)
106 except HTTPError as e:
107 # If the label did not exist, then GraceDB will return a 404 error.
108 # Don't treat this as a failure because we got what we wanted: for the
109 # label to be removed.
110 if e.response.status_code != 404:
111 raise
114@task(ignore_result=True, shared=False)
115@catch_retryable_http_errors
116def create_signoff(status, comment, signoff_type, graceid):
117 """Create a signoff in GraceDB."""
118 try:
119 client.superevents[graceid].signoff(signoff_type, status, comment)
120 except HTTPError as e:
121 # If we got a 400 error because the signoff was already applied,
122 # then ignore the exception and return successfully to preserve
123 # idempotency.
124 message = b'The fields superevent, instrument must make a unique set'
125 if message not in e.response.content:
126 raise
129@task(ignore_result=True, shared=False)
130@catch_retryable_http_errors
131def create_tag(filename, tag, graceid):
132 """Create a tag in GraceDB."""
133 filename, file_version = _parse_versioned_filename(filename)
134 log = get_log(graceid)
135 if file_version is None:
136 *_, entry = (e for e in log if e['filename'] == filename)
137 else:
138 *_, entry = (e for e in log if e['filename'] == filename
139 and e['file_version'] == file_version)
140 log_number = entry['N']
141 try:
142 client.events[graceid].logs[log_number].tags.create(tag)
143 except HTTPError as e:
144 # If we got a 400 error because no change was made, then ignore
145 # the exception and return successfully to preserve idempotency.
146 message = b'"Tag is already applied to this log message"'
147 if e.response.content != message:
148 raise
151@task(shared=False)
152@catch_retryable_http_errors
153def create_voevent(graceid, voevent_type, **kwargs):
154 """Create a VOEvent.
156 Returns
157 -------
158 str
159 The filename of the new VOEvent.
161 """
162 response = client.events[graceid].voevents.create(
163 voevent_type=voevent_type, **kwargs)
164 return response['filename']
167@task(shared=False)
168@catch_retryable_http_errors
169def download(filename, graceid):
170 """Download a file from GraceDB."""
171 with client.events[graceid].files[filename].get() as f:
172 return f.read()
175@task(ignore_result=True, shared=False)
176@catch_retryable_http_errors
177def expose(graceid):
178 """Expose an event to the public.
180 Notes
181 -----
182 If :obj:`~gwcelery.conf.expose_to_public` is False, then this because a
183 no-op.
185 """
186 if app.conf['expose_to_public']:
187 client.superevents[graceid].expose()
190@task(shared=False)
191@catch_retryable_http_errors
192def get_events(query, **kwargs):
193 """Get events from GraceDB."""
194 return list(client.events.search(query=query, **kwargs))
197@task(shared=False)
198@catch_retryable_http_errors
199def get_event(graceid):
200 """Retrieve an event from GraceDB."""
201 return client.events[graceid].get()
204@task(shared=False)
205@catch_retryable_http_errors
206def get_group(graceid):
207 """Retrieve the search field of an event from GraceDB."""
208 return client.events[graceid].get()['group']
211@task(shared=False)
212@catch_retryable_http_errors
213def get_search(graceid):
214 """Retrieve the search field of an event from GraceDB."""
215 return client.events[graceid].get()['search']
218@task(shared=False)
219@catch_retryable_http_errors
220def get_labels(graceid):
221 """Get all labels for an event in GraceDB."""
222 return {row['name'] for row in client.events[graceid].labels.get()}
225@task(shared=False)
226@catch_retryable_http_errors
227def get_log(graceid):
228 """Get all log messages for an event in GraceDB."""
229 return client.events[graceid].logs.get()
232@task(shared=False)
233@catch_retryable_http_errors
234def get_superevent(graceid):
235 """Retrieve a superevent from GraceDB."""
236 return client.superevents[graceid].get()
239@task(shared=False)
240@catch_retryable_http_errors
241def replace_event(graceid, payload):
242 """Get an event from GraceDB."""
243 return client.events.update(graceid, filecontents=payload)
246@task(shared=False)
247@catch_retryable_http_errors
248def upload(filecontents, filename, graceid, message, tags=()):
249 """Upload a file to GraceDB."""
250 result = client.events[graceid].logs.create(
251 comment=message, filename=filename,
252 filecontents=filecontents, tags=tags)
253 return '{},{}'.format(result['filename'], result['file_version'])
256@app.task(shared=False)
257@catch_retryable_http_errors
258def get_superevents(query, **kwargs):
259 """List matching superevents in gracedb.
261 Parameters
262 ----------
263 *args
264 arguments passed to :meth:`GraceDb.superevents`
265 **kwargs
266 keyword arguments passed to :meth:`GraceDb.superevents`
268 Returns
269 -------
270 superevents : list
271 The list of the superevents.
273 """
274 return list(client.superevents.search(query=query, **kwargs))
277@task(ignore_result=True, shared=False)
278@catch_retryable_http_errors
279def remove_pipeline_preferred_event(superevent_id, event_id):
280 client.superevents[
281 superevent_id].pipeline_preferred_events.remove(event_id)
284@task(ignore_result=True, shared=False)
285@catch_retryable_http_errors
286def update_superevent(superevent_id, t_start=None,
287 t_end=None, t_0=None, preferred_event=None,
288 em_type=None, time_coinc_far=None,
289 space_coinc_far=None):
290 """
291 Update superevent information. Wrapper around
292 :meth:`updateSuperevent`
294 Parameters
295 ----------
296 superevent_id : str
297 superevent uid
298 t_start : float
299 start of superevent time window, unchanged if None
300 t_end : float
301 end of superevent time window, unchanged if None
302 t_0 : float
303 superevent t_0, unchanged if None
304 preferred_event : str
305 uid of the preferred event, unchanged if None
307 """
308 try:
309 client.superevents.update(
310 superevent_id, t_start=t_start, t_end=t_end, t_0=t_0,
311 preferred_event=preferred_event, em_type=em_type,
312 time_coinc_far=time_coinc_far, space_coinc_far=space_coinc_far)
313 except HTTPError as e:
314 # If we got a 400 error because no change was made, then ignore
315 # the exception and return successfully to preserve idempotency.
316 error_msg = b'"Request would not modify the superevent"'
317 if not (e.response.status_code == 400
318 and e.response.content == error_msg):
319 raise
322@task(shared=False)
323@catch_retryable_http_errors
324def create_superevent(graceid, t0, t_start, t_end):
325 """Create new superevent in GraceDB with `graceid`
327 Parameters
328 ----------
329 graceid : str
330 graceid with which superevent is created.
331 t0 : float
332 ``t_0`` parameter of superevent
333 t_start : float
334 ``t_start`` parameter of superevent
335 t_end : float
336 ``t_end`` parameter of superevent
338 """
339 try:
340 response = client.superevents.create(
341 t_start=t_start, t_0=t0, t_end=t_end, preferred_event=graceid)
342 return response['superevent_id']
343 except HTTPError as e:
344 error_msg = b'is already assigned to a Superevent'
345 if not (e.response.status_code == 400
346 and error_msg in e.response.content):
347 raise
350@task(ignore_result=True, shared=False)
351@catch_retryable_http_errors
352def add_event_to_superevent(superevent_id, graceid):
353 """Add an event to a superevent in GraceDB."""
354 try:
355 client.superevents[superevent_id].add(graceid)
356 except HTTPError as e:
357 error_msg = b'is already assigned to a Superevent'
358 if not (e.response.status_code == 400
359 and error_msg in e.response.content):
360 raise
363@task(shared=False)
364@catch_retryable_http_errors
365def get_superevent_file_list(superevent_id):
366 """Get superevent file list from GraceDB."""
367 filelist = client.superevents[superevent_id].files.get()
368 return filelist
371@task(shared=False)
372@catch_retryable_http_errors
373def get_latest_file(superevent_id, filename):
374 """Get the lastest file provided a file name
376 Parameters
377 ----------
378 superevent_id : str
379 superevent uid
381 file_name : str
382 The filebase of a file name
383 e.g. 'bayestar.multiorder.fits' for 'bayestar.multiorder.fits,0'
385 Returns
386 ----------
387 The versioned filename for the inquired file
388 """
389 # Get file list for superevent
390 file_list = get_superevent_file_list(superevent_id)
391 # Loop over the keys and mark the key if key includes filename
392 keys = [key for key in file_list.keys() if filename in key]
393 if any(keys):
394 return max(keys)
395 else:
396 return None