Coverage for gwcelery/views.py: 83%

257 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Flask web application views.""" 

2import datetime 

3import json 

4import os 

5import platform 

6import re 

7import socket 

8import sys 

9from importlib import metadata 

10 

11import lal 

12from astropy.time import Time 

13from celery import group 

14from flask import (flash, jsonify, make_response, redirect, render_template, 

15 request, url_for) 

16from requests.exceptions import HTTPError 

17 

18from . import app as celery_app 

19from ._version import get_versions 

20from .flask import app, cache 

21from .tasks import (bayestar, circulars, core, external_skymaps, first2years, 

22 first2years_external, gracedb, orchestrator, skymaps, 

23 superevents) 

24from .util import PromiseProxy 

25 

26# Change the application root url 

27PREFIX = os.getenv('FLASK_APP_PREFIX', '') 

28 

29distributions = PromiseProxy(lambda: tuple(metadata.distributions())) 

30 

31 

32@app.route(PREFIX + '/') 

33def index(): 

34 """Render main page.""" 

35 return render_template( 

36 'index.jinja2', 

37 conf=celery_app.conf, 

38 hostname=socket.getfqdn(), 

39 detectors=sorted(lal.cached_detector_by_prefix.keys()), 

40 distributions=distributions, 

41 platform=platform.platform(), 

42 versions=get_versions(), 

43 python_version=sys.version, 

44 joint_mdc_freq=celery_app.conf['joint_mdc_freq']) 

45 

46 

47def take_n(n, iterable): 

48 """Take the first `n` items of a collection.""" 

49 for i, item in enumerate(iterable): 

50 if i >= n: 

51 break 

52 yield item 

53 

54 

55# Regular expression for parsing query strings 

56# that look like GraceDB superevent names. 

57_typeahead_superevent_id_regex = re.compile( 

58 r'(?P<prefix>[MT]?)S?(?P<date>\d{0,6})(?P<suffix>[a-z]*)', 

59 re.IGNORECASE) 

60 

61 

62@app.route(PREFIX + '/typeahead_superevent_id') 

63@cache.cached(query_string=True) 

64def typeahead_superevent_id(): 

65 """Search GraceDB for superevents by ID. 

66 

67 This involves some date parsing because GraceDB does not support directly 

68 searching for superevents by ID substring. 

69 """ 

70 max_results = 8 # maximum number of results to return 

71 batch_results = 32 # batch size for results from server 

72 

73 term = request.args.get('superevent_id') 

74 match = _typeahead_superevent_id_regex.fullmatch(term) if term else None 

75 

76 if match: 

77 # Determine GraceDB event category from regular expression. 

78 prefix = match['prefix'].upper() + 'S' 

79 category = {'T': 'test', 'M': 'MDC'}.get( 

80 match['prefix'].upper(), 'production') 

81 

82 # Determine start date from regular expression by padding out 

83 # the partial date with missing digits defaulting to 000101. 

84 date_partial = match['date'] 

85 date_partial_length = len(date_partial) 

86 try: 

87 date_start = datetime.datetime.strptime( 

88 date_partial + '000101'[date_partial_length:], '%y%m%d') 

89 except ValueError: # invalid date 

90 return jsonify([]) 

91 

92 # Determine end date from regular expression by adding a very 

93 # loose upper bound on the number of days until the next 

94 # digit in the date rolls over. No need to be exact here. 

95 date_end = date_start + datetime.timedelta( 

96 days=[36600, 3660, 366, 320, 32, 11, 1.1][date_partial_length]) 

97 

98 # Determine GraceDB event suffix from regular expression. 

99 suffix = match['suffix'].lower() 

100 else: 

101 prefix = 'S' 

102 category = 'production' 

103 date_end = datetime.datetime.now(datetime.timezone.utc) 

104 date_start = date_end - datetime.timedelta(days=7) 

105 date_partial = '' 

106 date_partial_length = 0 

107 suffix = '' 

108 

109 # Query GraceDB. 

110 query = 'category: {} t_0: {} .. {}'.format( 

111 category, Time(date_start).gps, Time(date_end).gps) 

112 response = gracedb.client.superevents.search( 

113 query=query, sort='superevent_id', count=batch_results) 

114 

115 # Filter superevent IDs that match the search term. 

116 regex = re.compile(r'{}{}\d{{{}}}{}[a-z]*'.format( 

117 prefix, date_partial, 6 - date_partial_length, suffix)) 

118 superevent_ids = ( 

119 superevent['superevent_id'] for superevent 

120 in response if regex.fullmatch(superevent['superevent_id'])) 

121 

122 # Return only the first few matches. 

123 return jsonify(list(take_n(max_results, superevent_ids))) 

124 

125 

126@app.route(PREFIX + '/typeahead_event_id') 

127@cache.cached(query_string=True) 

128def typeahead_event_id(): 

129 """Search GraceDB for events by ID.""" 

130 superevent_id = request.args.get('superevent_id').strip() 

131 query_terms = [f'superevent: {superevent_id}'] 

132 if superevent_id.startswith('T'): 

133 query_terms.append('Test') 

134 elif superevent_id.startswith('M'): 

135 query_terms.append('MDC') 

136 query = ' '.join(query_terms) 

137 try: 

138 results = gracedb.get_events(query) 

139 except HTTPError: 

140 results = [] 

141 results = [dict(r, snr=superevents.get_snr(r)) for r in results 

142 if superevents.is_complete(r)] 

143 return jsonify(list(reversed(sorted(results, key=superevents.keyfunc)))) 

144 

145 

146def _search_by_tag_and_filename(superevent_id, filename, extension, tag): 

147 try: 

148 records = gracedb.get_log(superevent_id) 

149 return [ 

150 '{},{}'.format(record['filename'], record['file_version']) 

151 for record in records if tag in record['tag_names'] 

152 and record['filename'].startswith(filename) 

153 and record['filename'].endswith(extension)] 

154 except HTTPError as e: 

155 # Ignore 404 errors from server 

156 if e.response.status_code == 404: 

157 return [] 

158 else: 

159 raise 

160 

161 

162@app.route(PREFIX + '/typeahead_skymap_filename') 

163@cache.cached(query_string=True) 

164def typeahead_skymap_filename(): 

165 """Search for sky maps by filename.""" 

166 return jsonify(_search_by_tag_and_filename( 

167 request.args.get('superevent_id') or '', 

168 request.args.get('filename') or '', 

169 '.multiorder.fits', 'sky_loc' 

170 )) 

171 

172 

173@app.route(PREFIX + '/typeahead_em_bright_filename') 

174@cache.cached(query_string=True) 

175def typeahead_em_bright_filename(): 

176 """Search em_bright files by filename.""" 

177 return jsonify(_search_by_tag_and_filename( 

178 request.args.get('superevent_id') or '', 

179 request.args.get('filename') or '', 

180 '.json', 'em_bright' 

181 )) 

182 

183 

184@app.route(PREFIX + '/typeahead_p_astro_filename') 

185@cache.cached(query_string=True) 

186def typeahead_p_astro_filename(): 

187 """Search p_astro files by filename.""" 

188 return jsonify(_search_by_tag_and_filename( 

189 request.args.get('superevent_id') or '', 

190 request.args.get('filename') or '', 

191 '.json', 'p_astro' 

192 )) 

193 

194 

195@celery_app.task(shared=False, ignore_result=True) 

196def _construct_igwn_alert_and_send_prelim_alert(superevent_event_list, 

197 superevent_id, 

198 initiate_voevent=True): 

199 superevent, event = superevent_event_list 

200 alert = { 

201 'uid': superevent_id, 

202 'object': superevent 

203 } 

204 

205 orchestrator.earlywarning_preliminary_alert( 

206 event, 

207 alert, 

208 alert_type='preliminary', 

209 initiate_voevent=initiate_voevent 

210 ) 

211 

212 

213@app.route(PREFIX + '/send_preliminary_gcn', methods=['POST']) 

214def send_preliminary_gcn(): 

215 """Handle submission of preliminary alert form.""" 

216 keys = ('superevent_id', 'event_id') 

217 superevent_id, event_id, *_ = tuple(request.form.get(key) for key in keys) 

218 if superevent_id and event_id: 

219 try: 

220 event = gracedb.get_event(event_id) 

221 except HTTPError as e: 

222 flash(f'No action performed. GraceDB query for {event_id} ' 

223 f'returned error code {e.response.status_code}.', 'danger') 

224 return redirect(url_for('index')) 

225 

226 ( 

227 gracedb.upload.s( 

228 None, None, superevent_id, 

229 'User {} queued a Preliminary alert through the dashboard.' 

230 .format(request.remote_user or '(unknown)'), 

231 tags=['em_follow']) 

232 | 

233 gracedb.update_superevent.si( 

234 superevent_id, preferred_event=event_id, t_0=event['gpstime']) 

235 | 

236 group( 

237 gracedb.get_superevent.si(superevent_id), 

238 

239 gracedb.get_event.si(event_id) 

240 ) 

241 | 

242 _construct_igwn_alert_and_send_prelim_alert.s(superevent_id) 

243 ).delay() 

244 flash('Queued preliminary alert for {}.'.format(superevent_id), 

245 'success') 

246 else: 

247 flash('No alert sent. Please fill in all fields.', 'danger') 

248 return redirect(url_for('index')) 

249 

250 

251@app.route(PREFIX + '/change_preferred_event', methods=['POST']) 

252def change_preferred_event(): 

253 """Handle submission of preliminary alert form.""" 

254 keys = ('superevent_id', 'event_id') 

255 superevent_id, event_id, *_ = tuple(request.form.get(key) for key in keys) 

256 if superevent_id and event_id: 

257 try: 

258 event = gracedb.get_event(event_id) 

259 except HTTPError as e: 

260 flash(f'No change performed. GraceDB query for {event_id} ' 

261 f'returned error code {e.response.status_code}.', 'danger') 

262 return redirect(url_for('index')) 

263 

264 try: 

265 superevent = gracedb.get_superevent(superevent_id) 

266 except HTTPError as e: 

267 flash(f'No change performed. GraceDB query for {superevent_id} ' 

268 f'returned error code {e.response.status_code}.', 'danger') 

269 return redirect(url_for('index')) 

270 ( 

271 gracedb.upload.s( 

272 None, None, superevent_id, 

273 celery_app.conf['views_manual_preferred_event_log_message'] 

274 .format(request.remote_user or '(unknown)', event_id), 

275 tags=['em_follow']) 

276 | 

277 gracedb.update_superevent.si( 

278 superevent_id, preferred_event=event_id, 

279 t_0=event['gpstime']) 

280 | 

281 _construct_igwn_alert_and_send_prelim_alert.si( 

282 [superevent, event], 

283 superevent_id, 

284 initiate_voevent=False 

285 ) 

286 ).delay() 

287 

288 # Update pipeline-preferred event if the new preferred event is not 

289 # already the pipeline-preferred event for the pipeline that uploaded 

290 # it. 

291 pipeline_pref_event = \ 

292 superevent['pipeline_preferred_events'].get(event['pipeline'], {}) 

293 if pipeline_pref_event.get('graceid', '') != event_id: 

294 ( 

295 gracedb.upload.s( 

296 None, None, superevent_id, 

297 'Manual update of preferred event triggered update of ' 

298 f'{event["pipeline"]}-preferred event to {event_id}', 

299 tags=['em_follow']) 

300 | 

301 gracedb.add_pipeline_preferred_event.si( 

302 superevent_id, event_id) 

303 ).delay() 

304 

305 flash('Changed preferred event for {}.'.format(superevent_id), 

306 'success') 

307 else: 

308 flash('No change performed. Please fill in all fields.', 'danger') 

309 return redirect(url_for('index')) 

310 

311 

312@app.route(PREFIX + '/change_pipeline_preferred_event', methods=['POST']) 

313def change_pipeline_preferred_event(): 

314 """Handle submission of preliminary alert form.""" 

315 keys = ('superevent_id', 'pipeline', 'event_id') 

316 superevent_id, pipeline, event_id, *_ = tuple(request.form.get(key) for 

317 key in keys) 

318 if superevent_id and pipeline and event_id: 

319 try: 

320 event = gracedb.get_event(event_id) 

321 except HTTPError as e: 

322 flash(f'No change performed. GraceDB query for {event_id} ' 

323 f'returned error code {e.response.status_code}.', 'danger') 

324 return redirect(url_for('index')) 

325 

326 # Check that specified event is from specified pipeline 

327 if event['pipeline'].lower() != pipeline.lower(): 

328 flash(f'No change performed. {event_id} was uploaded by ' 

329 f'{event["pipeline"].lower()} and cannot be the ' 

330 f'{pipeline.lower()}-preferred event.', 'danger') 

331 return redirect(url_for('index')) 

332 

333 try: 

334 superevent = gracedb.get_superevent(superevent_id) 

335 except HTTPError as e: 

336 flash(f'No change performed. GraceDB query for {superevent_id} ' 

337 f'returned error code {e.response.status_code}.', 'danger') 

338 return redirect(url_for('index')) 

339 

340 # Check that this pipeline's preferred event is not the 

341 # superevent's preferred event 

342 if superevent['preferred_event_data']['pipeline'].lower() == \ 

343 pipeline.lower(): 

344 flash(f'No change performed. User specified pipeline, ' 

345 f'{pipeline.lower()}, is the same pipeline that ' 

346 f'produced {superevent_id}\'s preferred event.', 'danger') 

347 return redirect(url_for('index')) 

348 

349 ( 

350 gracedb.upload.s( 

351 None, None, superevent_id, 

352 'User {} queued a {} preferred event change to {}.' 

353 .format(request.remote_user or '(unknown)', pipeline, 

354 event_id), 

355 tags=['em_follow']) 

356 | 

357 gracedb.add_pipeline_preferred_event.si( 

358 superevent_id, event_id) 

359 | 

360 _construct_igwn_alert_and_send_prelim_alert.si( 

361 [superevent, event], 

362 superevent_id, 

363 initiate_voevent=False 

364 ) 

365 ).delay() 

366 flash(f'Changed {pipeline.lower()} preferred event for ' 

367 f'{superevent_id}.', 'success') 

368 else: 

369 flash('No change performed. Please fill in all fields.', 'danger') 

370 return redirect(url_for('index')) 

371 

372 

373@app.route(PREFIX + '/send_update_gcn', methods=['POST']) 

374def send_update_gcn(): 

375 """Handle submission of update alert form.""" 

376 keys = ('superevent_id', 'skymap_filename', 

377 'em_bright_filename', 'p_astro_filename') 

378 # Get filenames and map empty strings to Nones so that the orchestrator 

379 # will ignore them and not include these values in the alert 

380 superevent_id, *filenames = tuple( 

381 request.form.get(key) if request.form.get(key) != '' else None 

382 for key in keys) 

383 # Check at least superevent ID and sky map are present 

384 if superevent_id is not None and filenames[0] is not None: 

385 ( 

386 gracedb.upload.s( 

387 None, None, superevent_id, 

388 'User {} queued an Update alert through the dashboard.' 

389 .format(request.remote_user or '(unknown)'), 

390 tags=['em_follow']) 

391 | 

392 orchestrator.update_alert.si(filenames, superevent_id) 

393 ).delay() 

394 flash('Queued update alert for {}.'.format(superevent_id), 'success') 

395 else: 

396 flash('No alert sent. Please fill in all required fields.', 'danger') 

397 return redirect(url_for('index')) 

398 

399 

400@app.route(PREFIX + '/create_medium_latency_gcn_circular', methods=['POST']) 

401def create_medium_latency_gcn_circular(): 

402 """Handle submission of medium_latency GCN Circular form.""" 

403 ext_event_id = request.form.get('ext_event_id') 

404 if ext_event_id: 

405 response = make_response(circulars.create_medium_latency_circular( 

406 ext_event_id)) 

407 response.headers["content-type"] = "text/plain" 

408 return response 

409 else: 

410 flash('No circular created. Please fill in external event ID', 

411 'danger') 

412 return redirect(url_for('index')) 

413 

414 

415@app.route(PREFIX + '/create_update_gcn_circular', methods=['POST']) 

416def create_update_gcn_circular(): 

417 """Handle submission of GCN Circular form.""" 

418 keys = ['sky_localization', 'em_bright', 'p_astro', 'raven'] 

419 superevent_id = request.form.get('superevent_id') 

420 updates = [key for key in keys if request.form.get(key)] 

421 if superevent_id and updates: 

422 response = make_response(circulars.create_update_circular( 

423 superevent_id, 

424 update_types=updates)) 

425 response.headers["content-type"] = "text/plain" 

426 return response 

427 else: 

428 flash('No circular created. Please fill in superevent ID and at ' + 

429 'least one update type.', 'danger') 

430 return redirect(url_for('index')) 

431 

432 

433@app.route(PREFIX + '/download_upload_external_skymap', methods=['POST']) 

434def download_upload_external_skymap(): 

435 """Download sky map from URL to be uploaded to external event. Passes 

436 a search field 'FromURL' which indicates to get_upload_external_skymap 

437 to use the provided URL to download the sky map. 

438 """ 

439 keys = ('ext_id', 'skymap_url') 

440 ext_id, skymap_url, *_ = tuple(request.form.get(key) for key in keys) 

441 if ext_id and skymap_url: 

442 ext_event = {'graceid': ext_id, 'search': 'FromURL'} 

443 external_skymaps.get_upload_external_skymap( 

444 ext_event, skymap_link=skymap_url) 

445 flash('Downloaded sky map for {}.'.format(ext_id), 

446 'success') 

447 else: 

448 flash('No skymap uploaded. Please fill in all fields.', 'danger') 

449 return redirect(url_for('index')) 

450 

451 

452@celery_app.task(queue='exttrig', 

453 shared=False) 

454def _update_preferred_external_event(ext_event, superevent_id): 

455 """Update preferred external event to given external event.""" 

456 # FIXME: Consider consolidating with raven.update_coinc_far by using 

457 # a single function in superevents.py 

458 if ext_event['search'] in {'GRB', 'SubGRB', 'SubGRBTargeted'}: 

459 coinc_far_dict = gracedb.download( 

460 'coincidence_far.json', ext_event['graceid']) 

461 if not isinstance(coinc_far_dict, dict): 

462 coinc_far_dict = json.loads(coinc_far_dict) 

463 time_coinc_far = coinc_far_dict['temporal_coinc_far'] 

464 space_coinc_far = coinc_far_dict['spatiotemporal_coinc_far'] 

465 else: 

466 time_coinc_far = None 

467 space_coinc_far = None 

468 gracedb.update_superevent(superevent_id, em_type=ext_event['graceid'], 

469 time_coinc_far=time_coinc_far, 

470 space_coinc_far=space_coinc_far) 

471 

472 

473@app.route(PREFIX + '/apply_raven_labels', methods=['POST']) 

474def apply_raven_labels(): 

475 """Applying RAVEN alert label and update the preferred external event 

476 to the given coincidence.""" 

477 keys = ('superevent_id', 'ext_id', 'event_id') 

478 superevent_id, ext_id, event_id, *_ = tuple(request.form.get(key) 

479 for key in keys) 

480 if superevent_id and ext_id and event_id: 

481 ( 

482 gracedb.get_event.si(ext_id) 

483 | 

484 _update_preferred_external_event.s(superevent_id) 

485 | 

486 gracedb.create_label.si('RAVEN_ALERT', superevent_id) 

487 | 

488 gracedb.create_label.si('RAVEN_ALERT', ext_id) 

489 | 

490 gracedb.create_label.si('RAVEN_ALERT', event_id) 

491 ).delay() 

492 flash('Applied RAVEN alert label for {}.'.format(superevent_id), 

493 'success') 

494 else: 

495 flash('No alert sent. Please fill in all fields.', 'danger') 

496 return redirect(url_for('index')) 

497 

498 

499@app.route(PREFIX + '/send_mock_event', methods=['POST']) 

500def send_mock_event(): 

501 """Handle submission of mock alert form.""" 

502 first2years.upload_event.delay() 

503 flash('Queued a mock event.', 'success') 

504 return redirect(url_for('index')) 

505 

506 

507@gracedb.task(shared=False) 

508def _create_upload_external_event(gpstime): 

509 new_time = first2years_external._offset_time( 

510 gpstime, 'CBC', 'Fermi', 'GRB', 'MDC') 

511 

512 ext_event = first2years_external.create_upload_external_event( 

513 new_time, 'Fermi', 'MDC') 

514 

515 return ext_event 

516 

517 

518@app.route(PREFIX + '/send_mock_joint_event', methods=['POST']) 

519def send_mock_joint_event(): 

520 """Handle submission of mock alert form.""" 

521 ( 

522 first2years.upload_event.si() 

523 | 

524 _create_upload_external_event.s().set(countdown=5) 

525 ).delay() 

526 flash('Queued a mock joint event.', 'success') 

527 return redirect(url_for('index')) 

528 

529 

530@app.route(PREFIX + '/create_skymap_with_disabled_detectors', methods=['POST']) 

531def create_skymap_with_disabled_detectors(): 

532 """Create a BAYESTAR sky map with one or more disabled detectors.""" 

533 form = request.form.to_dict() 

534 graceid = form.pop('event_id') 

535 disabled_detectors = sorted(form.keys()) 

536 tags = ['sky_loc', 'public'] 

537 if graceid and disabled_detectors: 

538 filename = f'bayestar.no-{"".join(disabled_detectors)}.multiorder.fits' 

539 ( 

540 gracedb.download.s('coinc.xml', graceid) 

541 | 

542 bayestar.localize.s(graceid, disabled_detectors=disabled_detectors) 

543 | 

544 group( 

545 core.identity.s(), 

546 gracedb.upload.s( 

547 filename, graceid, 

548 'sky localization complete', tags 

549 ) 

550 ) 

551 | 

552 skymaps.annotate_fits_tuple.s(graceid, tags) 

553 ).delay() 

554 flash('Creating sky map for event ID ' + graceid + 

555 ' with these disabled detectors: ' + 

556 ' '.join(disabled_detectors), 'success') 

557 else: 

558 flash('No sky map created. Please fill in all fields.', 'danger') 

559 return redirect(url_for('index')) 

560 

561 

562@app.route(PREFIX + '/copy_sky_map_between_events', methods=['POST']) 

563def copy_sky_map_between_events(): 

564 superevent_id = request.form['superevent_id'] 

565 graceid = request.form['event_id'] 

566 skymap_filename = request.form['skymap_filename'] 

567 skymap_filename_no_version, _, _ = skymap_filename.rpartition(',') 

568 tags = ['sky_loc', 'public'] 

569 ( 

570 gracedb.download.s(skymap_filename, graceid) 

571 | 

572 group( 

573 core.identity.s(), 

574 gracedb.upload.s( 

575 skymap_filename_no_version, superevent_id, 

576 f'sky map copied from {graceid}', tags 

577 ) 

578 ) 

579 | 

580 skymaps.annotate_fits_tuple.s(superevent_id, tags) 

581 ).delay() 

582 flash(f'Copying file {skymap_filename} from {graceid} to {superevent_id}', 

583 'success') 

584 return redirect(url_for('index'))