Coverage for gwcelery/views.py: 83%
257 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Flask web application views."""
2import datetime
3import json
4import os
5import platform
6import re
7import socket
8import sys
9from importlib import metadata
11import lal
12from astropy.time import Time
13from celery import group
14from flask import (flash, jsonify, make_response, redirect, render_template,
15 request, url_for)
16from requests.exceptions import HTTPError
18from . import app as celery_app
19from ._version import get_versions
20from .flask import app, cache
21from .tasks import (bayestar, circulars, core, external_skymaps, first2years,
22 first2years_external, gracedb, orchestrator, skymaps,
23 superevents)
24from .util import PromiseProxy
26# Change the application root url
27PREFIX = os.getenv('FLASK_APP_PREFIX', '')
29distributions = PromiseProxy(lambda: tuple(metadata.distributions()))
32@app.route(PREFIX + '/')
33def index():
34 """Render main page."""
35 return render_template(
36 'index.jinja2',
37 conf=celery_app.conf,
38 hostname=socket.getfqdn(),
39 detectors=sorted(lal.cached_detector_by_prefix.keys()),
40 distributions=distributions,
41 platform=platform.platform(),
42 versions=get_versions(),
43 python_version=sys.version,
44 joint_mdc_freq=celery_app.conf['joint_mdc_freq'])
47def take_n(n, iterable):
48 """Take the first `n` items of a collection."""
49 for i, item in enumerate(iterable):
50 if i >= n:
51 break
52 yield item
55# Regular expression for parsing query strings
56# that look like GraceDB superevent names.
57_typeahead_superevent_id_regex = re.compile(
58 r'(?P<prefix>[MT]?)S?(?P<date>\d{0,6})(?P<suffix>[a-z]*)',
59 re.IGNORECASE)
62@app.route(PREFIX + '/typeahead_superevent_id')
63@cache.cached(query_string=True)
64def typeahead_superevent_id():
65 """Search GraceDB for superevents by ID.
67 This involves some date parsing because GraceDB does not support directly
68 searching for superevents by ID substring.
69 """
70 max_results = 8 # maximum number of results to return
71 batch_results = 32 # batch size for results from server
73 term = request.args.get('superevent_id')
74 match = _typeahead_superevent_id_regex.fullmatch(term) if term else None
76 if match:
77 # Determine GraceDB event category from regular expression.
78 prefix = match['prefix'].upper() + 'S'
79 category = {'T': 'test', 'M': 'MDC'}.get(
80 match['prefix'].upper(), 'production')
82 # Determine start date from regular expression by padding out
83 # the partial date with missing digits defaulting to 000101.
84 date_partial = match['date']
85 date_partial_length = len(date_partial)
86 try:
87 date_start = datetime.datetime.strptime(
88 date_partial + '000101'[date_partial_length:], '%y%m%d')
89 except ValueError: # invalid date
90 return jsonify([])
92 # Determine end date from regular expression by adding a very
93 # loose upper bound on the number of days until the next
94 # digit in the date rolls over. No need to be exact here.
95 date_end = date_start + datetime.timedelta(
96 days=[36600, 3660, 366, 320, 32, 11, 1.1][date_partial_length])
98 # Determine GraceDB event suffix from regular expression.
99 suffix = match['suffix'].lower()
100 else:
101 prefix = 'S'
102 category = 'production'
103 date_end = datetime.datetime.now(datetime.timezone.utc)
104 date_start = date_end - datetime.timedelta(days=7)
105 date_partial = ''
106 date_partial_length = 0
107 suffix = ''
109 # Query GraceDB.
110 query = 'category: {} t_0: {} .. {}'.format(
111 category, Time(date_start).gps, Time(date_end).gps)
112 response = gracedb.client.superevents.search(
113 query=query, sort='superevent_id', count=batch_results)
115 # Filter superevent IDs that match the search term.
116 regex = re.compile(r'{}{}\d{{{}}}{}[a-z]*'.format(
117 prefix, date_partial, 6 - date_partial_length, suffix))
118 superevent_ids = (
119 superevent['superevent_id'] for superevent
120 in response if regex.fullmatch(superevent['superevent_id']))
122 # Return only the first few matches.
123 return jsonify(list(take_n(max_results, superevent_ids)))
126@app.route(PREFIX + '/typeahead_event_id')
127@cache.cached(query_string=True)
128def typeahead_event_id():
129 """Search GraceDB for events by ID."""
130 superevent_id = request.args.get('superevent_id').strip()
131 query_terms = [f'superevent: {superevent_id}']
132 if superevent_id.startswith('T'):
133 query_terms.append('Test')
134 elif superevent_id.startswith('M'):
135 query_terms.append('MDC')
136 query = ' '.join(query_terms)
137 try:
138 results = gracedb.get_events(query)
139 except HTTPError:
140 results = []
141 results = [dict(r, snr=superevents.get_snr(r)) for r in results
142 if superevents.is_complete(r)]
143 return jsonify(list(reversed(sorted(results, key=superevents.keyfunc))))
146def _search_by_tag_and_filename(superevent_id, filename, extension, tag):
147 try:
148 records = gracedb.get_log(superevent_id)
149 return [
150 '{},{}'.format(record['filename'], record['file_version'])
151 for record in records if tag in record['tag_names']
152 and record['filename'].startswith(filename)
153 and record['filename'].endswith(extension)]
154 except HTTPError as e:
155 # Ignore 404 errors from server
156 if e.response.status_code == 404:
157 return []
158 else:
159 raise
162@app.route(PREFIX + '/typeahead_skymap_filename')
163@cache.cached(query_string=True)
164def typeahead_skymap_filename():
165 """Search for sky maps by filename."""
166 return jsonify(_search_by_tag_and_filename(
167 request.args.get('superevent_id') or '',
168 request.args.get('filename') or '',
169 '.multiorder.fits', 'sky_loc'
170 ))
173@app.route(PREFIX + '/typeahead_em_bright_filename')
174@cache.cached(query_string=True)
175def typeahead_em_bright_filename():
176 """Search em_bright files by filename."""
177 return jsonify(_search_by_tag_and_filename(
178 request.args.get('superevent_id') or '',
179 request.args.get('filename') or '',
180 '.json', 'em_bright'
181 ))
184@app.route(PREFIX + '/typeahead_p_astro_filename')
185@cache.cached(query_string=True)
186def typeahead_p_astro_filename():
187 """Search p_astro files by filename."""
188 return jsonify(_search_by_tag_and_filename(
189 request.args.get('superevent_id') or '',
190 request.args.get('filename') or '',
191 '.json', 'p_astro'
192 ))
195@celery_app.task(shared=False, ignore_result=True)
196def _construct_igwn_alert_and_send_prelim_alert(superevent_event_list,
197 superevent_id,
198 initiate_voevent=True):
199 superevent, event = superevent_event_list
200 alert = {
201 'uid': superevent_id,
202 'object': superevent
203 }
205 orchestrator.earlywarning_preliminary_alert(
206 event,
207 alert,
208 alert_type='preliminary',
209 initiate_voevent=initiate_voevent
210 )
213@app.route(PREFIX + '/send_preliminary_gcn', methods=['POST'])
214def send_preliminary_gcn():
215 """Handle submission of preliminary alert form."""
216 keys = ('superevent_id', 'event_id')
217 superevent_id, event_id, *_ = tuple(request.form.get(key) for key in keys)
218 if superevent_id and event_id:
219 try:
220 event = gracedb.get_event(event_id)
221 except HTTPError as e:
222 flash(f'No action performed. GraceDB query for {event_id} '
223 f'returned error code {e.response.status_code}.', 'danger')
224 return redirect(url_for('index'))
226 (
227 gracedb.upload.s(
228 None, None, superevent_id,
229 'User {} queued a Preliminary alert through the dashboard.'
230 .format(request.remote_user or '(unknown)'),
231 tags=['em_follow'])
232 |
233 gracedb.update_superevent.si(
234 superevent_id, preferred_event=event_id, t_0=event['gpstime'])
235 |
236 group(
237 gracedb.get_superevent.si(superevent_id),
239 gracedb.get_event.si(event_id)
240 )
241 |
242 _construct_igwn_alert_and_send_prelim_alert.s(superevent_id)
243 ).delay()
244 flash('Queued preliminary alert for {}.'.format(superevent_id),
245 'success')
246 else:
247 flash('No alert sent. Please fill in all fields.', 'danger')
248 return redirect(url_for('index'))
251@app.route(PREFIX + '/change_preferred_event', methods=['POST'])
252def change_preferred_event():
253 """Handle submission of preliminary alert form."""
254 keys = ('superevent_id', 'event_id')
255 superevent_id, event_id, *_ = tuple(request.form.get(key) for key in keys)
256 if superevent_id and event_id:
257 try:
258 event = gracedb.get_event(event_id)
259 except HTTPError as e:
260 flash(f'No change performed. GraceDB query for {event_id} '
261 f'returned error code {e.response.status_code}.', 'danger')
262 return redirect(url_for('index'))
264 try:
265 superevent = gracedb.get_superevent(superevent_id)
266 except HTTPError as e:
267 flash(f'No change performed. GraceDB query for {superevent_id} '
268 f'returned error code {e.response.status_code}.', 'danger')
269 return redirect(url_for('index'))
270 (
271 gracedb.upload.s(
272 None, None, superevent_id,
273 celery_app.conf['views_manual_preferred_event_log_message']
274 .format(request.remote_user or '(unknown)', event_id),
275 tags=['em_follow'])
276 |
277 gracedb.update_superevent.si(
278 superevent_id, preferred_event=event_id,
279 t_0=event['gpstime'])
280 |
281 _construct_igwn_alert_and_send_prelim_alert.si(
282 [superevent, event],
283 superevent_id,
284 initiate_voevent=False
285 )
286 ).delay()
288 # Update pipeline-preferred event if the new preferred event is not
289 # already the pipeline-preferred event for the pipeline that uploaded
290 # it.
291 pipeline_pref_event = \
292 superevent['pipeline_preferred_events'].get(event['pipeline'], {})
293 if pipeline_pref_event.get('graceid', '') != event_id:
294 (
295 gracedb.upload.s(
296 None, None, superevent_id,
297 'Manual update of preferred event triggered update of '
298 f'{event["pipeline"]}-preferred event to {event_id}',
299 tags=['em_follow'])
300 |
301 gracedb.add_pipeline_preferred_event.si(
302 superevent_id, event_id)
303 ).delay()
305 flash('Changed preferred event for {}.'.format(superevent_id),
306 'success')
307 else:
308 flash('No change performed. Please fill in all fields.', 'danger')
309 return redirect(url_for('index'))
312@app.route(PREFIX + '/change_pipeline_preferred_event', methods=['POST'])
313def change_pipeline_preferred_event():
314 """Handle submission of preliminary alert form."""
315 keys = ('superevent_id', 'pipeline', 'event_id')
316 superevent_id, pipeline, event_id, *_ = tuple(request.form.get(key) for
317 key in keys)
318 if superevent_id and pipeline and event_id:
319 try:
320 event = gracedb.get_event(event_id)
321 except HTTPError as e:
322 flash(f'No change performed. GraceDB query for {event_id} '
323 f'returned error code {e.response.status_code}.', 'danger')
324 return redirect(url_for('index'))
326 # Check that specified event is from specified pipeline
327 if event['pipeline'].lower() != pipeline.lower():
328 flash(f'No change performed. {event_id} was uploaded by '
329 f'{event["pipeline"].lower()} and cannot be the '
330 f'{pipeline.lower()}-preferred event.', 'danger')
331 return redirect(url_for('index'))
333 try:
334 superevent = gracedb.get_superevent(superevent_id)
335 except HTTPError as e:
336 flash(f'No change performed. GraceDB query for {superevent_id} '
337 f'returned error code {e.response.status_code}.', 'danger')
338 return redirect(url_for('index'))
340 # Check that this pipeline's preferred event is not the
341 # superevent's preferred event
342 if superevent['preferred_event_data']['pipeline'].lower() == \
343 pipeline.lower():
344 flash(f'No change performed. User specified pipeline, '
345 f'{pipeline.lower()}, is the same pipeline that '
346 f'produced {superevent_id}\'s preferred event.', 'danger')
347 return redirect(url_for('index'))
349 (
350 gracedb.upload.s(
351 None, None, superevent_id,
352 'User {} queued a {} preferred event change to {}.'
353 .format(request.remote_user or '(unknown)', pipeline,
354 event_id),
355 tags=['em_follow'])
356 |
357 gracedb.add_pipeline_preferred_event.si(
358 superevent_id, event_id)
359 |
360 _construct_igwn_alert_and_send_prelim_alert.si(
361 [superevent, event],
362 superevent_id,
363 initiate_voevent=False
364 )
365 ).delay()
366 flash(f'Changed {pipeline.lower()} preferred event for '
367 f'{superevent_id}.', 'success')
368 else:
369 flash('No change performed. Please fill in all fields.', 'danger')
370 return redirect(url_for('index'))
373@app.route(PREFIX + '/send_update_gcn', methods=['POST'])
374def send_update_gcn():
375 """Handle submission of update alert form."""
376 keys = ('superevent_id', 'skymap_filename',
377 'em_bright_filename', 'p_astro_filename')
378 # Get filenames and map empty strings to Nones so that the orchestrator
379 # will ignore them and not include these values in the alert
380 superevent_id, *filenames = tuple(
381 request.form.get(key) if request.form.get(key) != '' else None
382 for key in keys)
383 # Check at least superevent ID and sky map are present
384 if superevent_id is not None and filenames[0] is not None:
385 (
386 gracedb.upload.s(
387 None, None, superevent_id,
388 'User {} queued an Update alert through the dashboard.'
389 .format(request.remote_user or '(unknown)'),
390 tags=['em_follow'])
391 |
392 orchestrator.update_alert.si(filenames, superevent_id)
393 ).delay()
394 flash('Queued update alert for {}.'.format(superevent_id), 'success')
395 else:
396 flash('No alert sent. Please fill in all required fields.', 'danger')
397 return redirect(url_for('index'))
400@app.route(PREFIX + '/create_medium_latency_gcn_circular', methods=['POST'])
401def create_medium_latency_gcn_circular():
402 """Handle submission of medium_latency GCN Circular form."""
403 ext_event_id = request.form.get('ext_event_id')
404 if ext_event_id:
405 response = make_response(circulars.create_medium_latency_circular(
406 ext_event_id))
407 response.headers["content-type"] = "text/plain"
408 return response
409 else:
410 flash('No circular created. Please fill in external event ID',
411 'danger')
412 return redirect(url_for('index'))
415@app.route(PREFIX + '/create_update_gcn_circular', methods=['POST'])
416def create_update_gcn_circular():
417 """Handle submission of GCN Circular form."""
418 keys = ['sky_localization', 'em_bright', 'p_astro', 'raven']
419 superevent_id = request.form.get('superevent_id')
420 updates = [key for key in keys if request.form.get(key)]
421 if superevent_id and updates:
422 response = make_response(circulars.create_update_circular(
423 superevent_id,
424 update_types=updates))
425 response.headers["content-type"] = "text/plain"
426 return response
427 else:
428 flash('No circular created. Please fill in superevent ID and at ' +
429 'least one update type.', 'danger')
430 return redirect(url_for('index'))
433@app.route(PREFIX + '/download_upload_external_skymap', methods=['POST'])
434def download_upload_external_skymap():
435 """Download sky map from URL to be uploaded to external event. Passes
436 a search field 'FromURL' which indicates to get_upload_external_skymap
437 to use the provided URL to download the sky map.
438 """
439 keys = ('ext_id', 'skymap_url')
440 ext_id, skymap_url, *_ = tuple(request.form.get(key) for key in keys)
441 if ext_id and skymap_url:
442 ext_event = {'graceid': ext_id, 'search': 'FromURL'}
443 external_skymaps.get_upload_external_skymap(
444 ext_event, skymap_link=skymap_url)
445 flash('Downloaded sky map for {}.'.format(ext_id),
446 'success')
447 else:
448 flash('No skymap uploaded. Please fill in all fields.', 'danger')
449 return redirect(url_for('index'))
452@celery_app.task(queue='exttrig',
453 shared=False)
454def _update_preferred_external_event(ext_event, superevent_id):
455 """Update preferred external event to given external event."""
456 # FIXME: Consider consolidating with raven.update_coinc_far by using
457 # a single function in superevents.py
458 if ext_event['search'] in {'GRB', 'SubGRB', 'SubGRBTargeted'}:
459 coinc_far_dict = gracedb.download(
460 'coincidence_far.json', ext_event['graceid'])
461 if not isinstance(coinc_far_dict, dict):
462 coinc_far_dict = json.loads(coinc_far_dict)
463 time_coinc_far = coinc_far_dict['temporal_coinc_far']
464 space_coinc_far = coinc_far_dict['spatiotemporal_coinc_far']
465 else:
466 time_coinc_far = None
467 space_coinc_far = None
468 gracedb.update_superevent(superevent_id, em_type=ext_event['graceid'],
469 time_coinc_far=time_coinc_far,
470 space_coinc_far=space_coinc_far)
473@app.route(PREFIX + '/apply_raven_labels', methods=['POST'])
474def apply_raven_labels():
475 """Applying RAVEN alert label and update the preferred external event
476 to the given coincidence."""
477 keys = ('superevent_id', 'ext_id', 'event_id')
478 superevent_id, ext_id, event_id, *_ = tuple(request.form.get(key)
479 for key in keys)
480 if superevent_id and ext_id and event_id:
481 (
482 gracedb.get_event.si(ext_id)
483 |
484 _update_preferred_external_event.s(superevent_id)
485 |
486 gracedb.create_label.si('RAVEN_ALERT', superevent_id)
487 |
488 gracedb.create_label.si('RAVEN_ALERT', ext_id)
489 |
490 gracedb.create_label.si('RAVEN_ALERT', event_id)
491 ).delay()
492 flash('Applied RAVEN alert label for {}.'.format(superevent_id),
493 'success')
494 else:
495 flash('No alert sent. Please fill in all fields.', 'danger')
496 return redirect(url_for('index'))
499@app.route(PREFIX + '/send_mock_event', methods=['POST'])
500def send_mock_event():
501 """Handle submission of mock alert form."""
502 first2years.upload_event.delay()
503 flash('Queued a mock event.', 'success')
504 return redirect(url_for('index'))
507@gracedb.task(shared=False)
508def _create_upload_external_event(gpstime):
509 new_time = first2years_external._offset_time(
510 gpstime, 'CBC', 'Fermi', 'GRB', 'MDC')
512 ext_event = first2years_external.create_upload_external_event(
513 new_time, 'Fermi', 'MDC')
515 return ext_event
518@app.route(PREFIX + '/send_mock_joint_event', methods=['POST'])
519def send_mock_joint_event():
520 """Handle submission of mock alert form."""
521 (
522 first2years.upload_event.si()
523 |
524 _create_upload_external_event.s().set(countdown=5)
525 ).delay()
526 flash('Queued a mock joint event.', 'success')
527 return redirect(url_for('index'))
530@app.route(PREFIX + '/create_skymap_with_disabled_detectors', methods=['POST'])
531def create_skymap_with_disabled_detectors():
532 """Create a BAYESTAR sky map with one or more disabled detectors."""
533 form = request.form.to_dict()
534 graceid = form.pop('event_id')
535 disabled_detectors = sorted(form.keys())
536 tags = ['sky_loc', 'public']
537 if graceid and disabled_detectors:
538 filename = f'bayestar.no-{"".join(disabled_detectors)}.multiorder.fits'
539 (
540 gracedb.download.s('coinc.xml', graceid)
541 |
542 bayestar.localize.s(graceid, disabled_detectors=disabled_detectors)
543 |
544 group(
545 core.identity.s(),
546 gracedb.upload.s(
547 filename, graceid,
548 'sky localization complete', tags
549 )
550 )
551 |
552 skymaps.annotate_fits_tuple.s(graceid, tags)
553 ).delay()
554 flash('Creating sky map for event ID ' + graceid +
555 ' with these disabled detectors: ' +
556 ' '.join(disabled_detectors), 'success')
557 else:
558 flash('No sky map created. Please fill in all fields.', 'danger')
559 return redirect(url_for('index'))
562@app.route(PREFIX + '/copy_sky_map_between_events', methods=['POST'])
563def copy_sky_map_between_events():
564 superevent_id = request.form['superevent_id']
565 graceid = request.form['event_id']
566 skymap_filename = request.form['skymap_filename']
567 skymap_filename_no_version, _, _ = skymap_filename.rpartition(',')
568 tags = ['sky_loc', 'public']
569 (
570 gracedb.download.s(skymap_filename, graceid)
571 |
572 group(
573 core.identity.s(),
574 gracedb.upload.s(
575 skymap_filename_no_version, superevent_id,
576 f'sky map copied from {graceid}', tags
577 )
578 )
579 |
580 skymaps.annotate_fits_tuple.s(superevent_id, tags)
581 ).delay()
582 flash(f'Copying file {skymap_filename} from {graceid} to {superevent_id}',
583 'success')
584 return redirect(url_for('index'))