Coverage for gwcelery/tasks/gracedb.py: 80%

188 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Communication with GraceDB.""" 

2import functools 

3import re 

4 

5import gracedb_sdk 

6from celery.utils.log import get_task_logger 

7from requests.exceptions import ConnectionError, HTTPError 

8 

9from .. import app 

10from ..util import PromiseProxy 

11 

12client = PromiseProxy(gracedb_sdk.Client, 

13 ('https://' + app.conf.gracedb_host + '/api/',), 

14 {'fail_if_noauth': True, 'cert_reload': True}) 

15 

16log = get_task_logger(__name__) 

17 

18 

19class RetryableHTTPError(HTTPError): 

20 """Exception class for server-side HTTP errors that we should retry.""" 

21 

22 

23def catch_retryable_http_errors(f): 

24 """Decorator to capture server-side errors that we should retry. 

25 

26 We retry HTTP status 502 (Bad Gateway), 503 (Service Unavailable), and 

27 504 (Gateway Timeout). We also retry client side error codes 408 (Timeout), 

28 409 (Conflicting URL), 429 (Too many requests). 

29 """ 

30 @functools.wraps(f) 

31 def wrapper(*args, **kwargs): 

32 try: 

33 return f(*args, **kwargs) 

34 except HTTPError as e: 

35 if e.response.status_code in {408, 409, 429, 500, 502, 503, 504}: 

36 raise RetryableHTTPError( 

37 *e.args, request=e.request, response=e.response) 

38 else: 

39 raise 

40 

41 return wrapper 

42 

43 

44def task(*args, **kwargs): 

45 return app.task(*args, **kwargs, 

46 autoretry_for=( 

47 ConnectionError, RetryableHTTPError, TimeoutError), 

48 default_retry_delay=20.0, retry_backoff=True, 

49 retry_kwargs=dict(max_retries=10)) 

50 

51 

52versioned_filename_regex = re.compile( 

53 r'^(?P<filename>.*?)(?:,(?P<file_version>\d+))?$') 

54 

55 

56def _parse_versioned_filename(versioned_filename): 

57 match = versioned_filename_regex.fullmatch(versioned_filename) 

58 filename = match['filename'] 

59 file_version = match['file_version'] 

60 if file_version is not None: 

61 file_version = int(file_version) 

62 return filename, file_version 

63 

64 

65@task(ignore_result=True, shared=False) 

66@catch_retryable_http_errors 

67def add_pipeline_preferred_event(superevent_id, event_id): 

68 client.superevents[superevent_id].pipeline_preferred_events.add(event_id) 

69 

70 

71@task(shared=False) 

72@catch_retryable_http_errors 

73def create_event(filecontents, search, pipeline, group, labels=()): 

74 """Create an event in GraceDB.""" 

75 response = client.events.create(group=group, pipeline=pipeline, 

76 filename='initial.data', search=search, 

77 filecontents=filecontents, labels=labels) 

78 return response 

79 

80 

81@task(ignore_result=True, shared=False) 

82@catch_retryable_http_errors 

83def create_label(label, graceid): 

84 """Create a label in GraceDB.""" 

85 try: 

86 client.events[graceid].labels.create(label) 

87 except HTTPError as e: 

88 # If we got a 400 error because no change was made, then ignore 

89 # the exception and return successfully to preserve idempotency. 

90 messages = { 

91 b'"The \'ADVREQ\' label cannot be applied to request a signoff ' 

92 b'because a related signoff already exists."', 

93 

94 b'"The fields superevent, name must make a unique set."' 

95 } 

96 if e.response.content not in messages: 

97 raise 

98 

99 

100@task(ignore_result=True, shared=False) 

101@catch_retryable_http_errors 

102def create_label_with_log(log_message, label, tags, superevent_id): 

103 """Create a label with a log for a superevent in GraceDB.""" 

104 try: 

105 client.superevents[superevent_id].logs.create( 

106 comment=log_message, 

107 label=label, 

108 tags=tags 

109 ) 

110 except HTTPError: 

111 raise 

112 

113 

114@task(ignore_result=True, shared=False) 

115@catch_retryable_http_errors 

116def remove_label(label, graceid): 

117 """Remove a label in GraceDB.""" 

118 try: 

119 client.events[graceid].labels.delete(label) 

120 except HTTPError as e: 

121 # If the label did not exist, then GraceDB will return a 404 error. 

122 # Don't treat this as a failure because we got what we wanted: for the 

123 # label to be removed. 

124 if e.response.status_code != 404: 

125 raise 

126 

127 

128@task(ignore_result=True, shared=False) 

129@catch_retryable_http_errors 

130def create_signoff(status, comment, signoff_type, graceid): 

131 """Create a signoff in GraceDB.""" 

132 try: 

133 client.superevents[graceid].signoff(signoff_type, status, comment) 

134 except HTTPError as e: 

135 # If we got a 400 error because the signoff was already applied, 

136 # then ignore the exception and return successfully to preserve 

137 # idempotency. 

138 message = b'The fields superevent, instrument must make a unique set' 

139 if message not in e.response.content: 

140 raise 

141 

142 

143@task(ignore_result=True, shared=False) 

144@catch_retryable_http_errors 

145def create_tag(filename, tag, graceid): 

146 """Create a tag in GraceDB.""" 

147 filename, file_version = _parse_versioned_filename(filename) 

148 log = get_log(graceid) 

149 if file_version is None: 

150 *_, entry = (e for e in log if e['filename'] == filename) 

151 else: 

152 *_, entry = (e for e in log if e['filename'] == filename 

153 and e['file_version'] == file_version) 

154 log_number = entry['N'] 

155 try: 

156 client.events[graceid].logs[log_number].tags.create(tag) 

157 except HTTPError as e: 

158 # If we got a 400 error because no change was made, then ignore 

159 # the exception and return successfully to preserve idempotency. 

160 message = b'"Tag is already applied to this log message"' 

161 if e.response.content != message: 

162 raise 

163 

164 

165@task(shared=False) 

166@catch_retryable_http_errors 

167def create_voevent(graceid, voevent_type, **kwargs): 

168 """Create a VOEvent. 

169 

170 Returns 

171 ------- 

172 str 

173 The filename of the new VOEvent. 

174 

175 """ 

176 response = client.events[graceid].voevents.create( 

177 voevent_type=voevent_type, **kwargs) 

178 return response['filename'] 

179 

180 

181@task(shared=False) 

182@catch_retryable_http_errors 

183def download(filename, graceid): 

184 """Download a file from GraceDB.""" 

185 with client.events[graceid].files[filename].get() as f: 

186 return f.read() 

187 

188 

189@task(ignore_result=True, shared=False) 

190@catch_retryable_http_errors 

191def expose(graceid): 

192 """Expose an event to the public. 

193 

194 Notes 

195 ----- 

196 If :obj:`~gwcelery.conf.expose_to_public` is False, then this because a 

197 no-op. 

198 

199 """ 

200 if app.conf['expose_to_public']: 

201 client.superevents[graceid].expose() 

202 

203 

204@task(shared=False) 

205@catch_retryable_http_errors 

206def get_events(query, **kwargs): 

207 """Get events from GraceDB.""" 

208 return list(client.events.search(query=query, **kwargs)) 

209 

210 

211@task(shared=False) 

212@catch_retryable_http_errors 

213def get_event(graceid): 

214 """Retrieve an event from GraceDB.""" 

215 return client.events[graceid].get() 

216 

217 

218@task(shared=False) 

219@catch_retryable_http_errors 

220def get_group(graceid): 

221 """Retrieve the search field of an event from GraceDB.""" 

222 return client.events[graceid].get()['group'] 

223 

224 

225@task(shared=False) 

226@catch_retryable_http_errors 

227def get_search(graceid): 

228 """Retrieve the search field of an event from GraceDB.""" 

229 return client.events[graceid].get()['search'] 

230 

231 

232@task(shared=False) 

233@catch_retryable_http_errors 

234def get_labels(graceid): 

235 """Get all labels for an event in GraceDB.""" 

236 return {row['name'] for row in client.events[graceid].labels.get()} 

237 

238 

239@task(shared=False) 

240@catch_retryable_http_errors 

241def get_log(graceid): 

242 """Get all log messages for an event in GraceDB.""" 

243 return client.events[graceid].logs.get() 

244 

245 

246@task(shared=False) 

247@catch_retryable_http_errors 

248def get_superevent(graceid): 

249 """Retrieve a superevent from GraceDB.""" 

250 return client.superevents[graceid].get() 

251 

252 

253@task(shared=False) 

254@catch_retryable_http_errors 

255def replace_event(graceid, payload): 

256 """Get an event from GraceDB.""" 

257 return client.events.update(graceid, filecontents=payload) 

258 

259 

260@task(shared=False) 

261@catch_retryable_http_errors 

262def upload(filecontents, filename, graceid, message, tags=()): 

263 """Upload a file to GraceDB.""" 

264 result = client.events[graceid].logs.create( 

265 comment=message, filename=filename, 

266 filecontents=filecontents, tags=tags) 

267 return '{},{}'.format(result['filename'], result['file_version']) 

268 

269 

270@app.task(shared=False) 

271@catch_retryable_http_errors 

272def get_superevents(query, **kwargs): 

273 """List matching superevents in gracedb. 

274 

275 Parameters 

276 ---------- 

277 *args 

278 arguments passed to :meth:`GraceDb.superevents` 

279 **kwargs 

280 keyword arguments passed to :meth:`GraceDb.superevents` 

281 

282 Returns 

283 ------- 

284 superevents : list 

285 The list of the superevents. 

286 

287 """ 

288 return list(client.superevents.search(query=query, **kwargs)) 

289 

290 

291@task(ignore_result=True, shared=False) 

292@catch_retryable_http_errors 

293def remove_pipeline_preferred_event(superevent_id, event_id): 

294 client.superevents[ 

295 superevent_id].pipeline_preferred_events.remove(event_id) 

296 

297 

298@task(ignore_result=True, shared=False) 

299@catch_retryable_http_errors 

300def update_superevent(superevent_id, t_start=None, 

301 t_end=None, t_0=None, preferred_event=None, 

302 em_type=None, time_coinc_far=None, 

303 space_coinc_far=None): 

304 """ 

305 Update superevent information. Wrapper around 

306 :meth:`updateSuperevent` 

307 

308 Parameters 

309 ---------- 

310 superevent_id : str 

311 superevent uid 

312 t_start : float 

313 start of superevent time window, unchanged if None 

314 t_end : float 

315 end of superevent time window, unchanged if None 

316 t_0 : float 

317 superevent t_0, unchanged if None 

318 preferred_event : str 

319 uid of the preferred event, unchanged if None 

320 

321 """ 

322 try: 

323 client.superevents.update( 

324 superevent_id, t_start=t_start, t_end=t_end, t_0=t_0, 

325 preferred_event=preferred_event, em_type=em_type, 

326 time_coinc_far=time_coinc_far, space_coinc_far=space_coinc_far) 

327 except HTTPError as e: 

328 # If we got a 400 error because no change was made, then ignore 

329 # the exception and return successfully to preserve idempotency. 

330 error_msg = b'"Request would not modify the superevent"' 

331 if not (e.response.status_code == 400 

332 and e.response.content == error_msg): 

333 raise 

334 

335 

336@task(shared=False) 

337@catch_retryable_http_errors 

338def create_superevent(graceid, t0, t_start, t_end): 

339 """Create new superevent in GraceDB with `graceid` 

340 

341 Parameters 

342 ---------- 

343 graceid : str 

344 graceid with which superevent is created. 

345 t0 : float 

346 ``t_0`` parameter of superevent 

347 t_start : float 

348 ``t_start`` parameter of superevent 

349 t_end : float 

350 ``t_end`` parameter of superevent 

351 

352 """ 

353 try: 

354 response = client.superevents.create( 

355 t_start=t_start, t_0=t0, t_end=t_end, preferred_event=graceid) 

356 return response['superevent_id'] 

357 except HTTPError as e: 

358 error_msg = b'is already assigned to a Superevent' 

359 if not (e.response.status_code == 400 

360 and error_msg in e.response.content): 

361 raise 

362 

363 

364@task(ignore_result=True, shared=False) 

365@catch_retryable_http_errors 

366def add_event_to_superevent(superevent_id, graceid): 

367 """Add an event to a superevent in GraceDB.""" 

368 try: 

369 client.superevents[superevent_id].add(graceid) 

370 except HTTPError as e: 

371 error_msg = b'is already assigned to a Superevent' 

372 if not (e.response.status_code == 400 

373 and error_msg in e.response.content): 

374 raise 

375 

376 

377@task(shared=False) 

378@catch_retryable_http_errors 

379def get_superevent_file_list(superevent_id): 

380 """Get superevent file list from GraceDB.""" 

381 filelist = client.superevents[superevent_id].files.get() 

382 return filelist 

383 

384 

385@task(shared=False) 

386@catch_retryable_http_errors 

387def get_latest_file(superevent_id, filename): 

388 """Get the lastest file provided a file name 

389 

390 Parameters 

391 ---------- 

392 superevent_id : str 

393 superevent uid 

394 

395 file_name : str 

396 The filebase of a file name 

397 e.g. 'bayestar.multiorder.fits' for 'bayestar.multiorder.fits,0' 

398 

399 Returns 

400 ---------- 

401 The versioned filename for the inquired file 

402 """ 

403 # Get file list for superevent 

404 file_list = get_superevent_file_list(superevent_id) 

405 # Loop over the keys and mark the key if key includes filename 

406 keys = [key for key in file_list.keys() if filename in key] 

407 if any(keys): 

408 return max(keys) 

409 else: 

410 return None