Coverage for gwcelery/tasks/gracedb.py: 81%

181 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1"""Communication with GraceDB.""" 

2import functools 

3import re 

4 

5import gracedb_sdk 

6from celery.utils.log import get_task_logger 

7from requests.exceptions import ConnectionError, HTTPError 

8 

9from .. import app 

10from ..util import PromiseProxy 

11 

12client = PromiseProxy(gracedb_sdk.Client, 

13 ('https://' + app.conf.gracedb_host + '/api/',), 

14 {'fail_if_noauth': True, 'cert_reload': True}) 

15 

16log = get_task_logger(__name__) 

17 

18 

19class RetryableHTTPError(HTTPError): 

20 """Exception class for server-side HTTP errors that we should retry.""" 

21 

22 

23def catch_retryable_http_errors(f): 

24 """Decorator to capture server-side errors that we should retry. 

25 

26 We retry HTTP status 502 (Bad Gateway), 503 (Service Unavailable), and 

27 504 (Gateway Timeout). We also retry client side error codes 408 (Timeout), 

28 409 (Conflicting URL), 429 (Too many requests). 

29 """ 

30 @functools.wraps(f) 

31 def wrapper(*args, **kwargs): 

32 try: 

33 return f(*args, **kwargs) 

34 except HTTPError as e: 

35 if e.response.status_code in {408, 409, 429, 500, 502, 503, 504}: 

36 raise RetryableHTTPError( 

37 *e.args, request=e.request, response=e.response) 

38 else: 

39 raise 

40 

41 return wrapper 

42 

43 

44def task(*args, **kwargs): 

45 return app.task(*args, **kwargs, 

46 autoretry_for=( 

47 ConnectionError, RetryableHTTPError, TimeoutError), 

48 default_retry_delay=20.0, retry_backoff=True, 

49 retry_kwargs=dict(max_retries=10)) 

50 

51 

52versioned_filename_regex = re.compile( 

53 r'^(?P<filename>.*?)(?:,(?P<file_version>\d+))?$') 

54 

55 

56def _parse_versioned_filename(versioned_filename): 

57 match = versioned_filename_regex.fullmatch(versioned_filename) 

58 filename = match['filename'] 

59 file_version = match['file_version'] 

60 if file_version is not None: 

61 file_version = int(file_version) 

62 return filename, file_version 

63 

64 

65@task(ignore_result=True, shared=False) 

66@catch_retryable_http_errors 

67def add_pipeline_preferred_event(superevent_id, event_id): 

68 client.superevents[superevent_id].pipeline_preferred_events.add(event_id) 

69 

70 

71@task(shared=False) 

72@catch_retryable_http_errors 

73def create_event(filecontents, search, pipeline, group, labels=()): 

74 """Create an event in GraceDB.""" 

75 response = client.events.create(group=group, pipeline=pipeline, 

76 filename='initial.data', search=search, 

77 filecontents=filecontents, labels=labels) 

78 return response 

79 

80 

81@task(ignore_result=True, shared=False) 

82@catch_retryable_http_errors 

83def create_label(label, graceid): 

84 """Create a label in GraceDB.""" 

85 try: 

86 client.events[graceid].labels.create(label) 

87 except HTTPError as e: 

88 # If we got a 400 error because no change was made, then ignore 

89 # the exception and return successfully to preserve idempotency. 

90 messages = { 

91 b'"The \'ADVREQ\' label cannot be applied to request a signoff ' 

92 b'because a related signoff already exists."', 

93 

94 b'"The fields superevent, name must make a unique set."' 

95 } 

96 if e.response.content not in messages: 

97 raise 

98 

99 

100@task(ignore_result=True, shared=False) 

101@catch_retryable_http_errors 

102def remove_label(label, graceid): 

103 """Remove a label in GraceDB.""" 

104 try: 

105 client.events[graceid].labels.delete(label) 

106 except HTTPError as e: 

107 # If the label did not exist, then GraceDB will return a 404 error. 

108 # Don't treat this as a failure because we got what we wanted: for the 

109 # label to be removed. 

110 if e.response.status_code != 404: 

111 raise 

112 

113 

114@task(ignore_result=True, shared=False) 

115@catch_retryable_http_errors 

116def create_signoff(status, comment, signoff_type, graceid): 

117 """Create a signoff in GraceDB.""" 

118 try: 

119 client.superevents[graceid].signoff(signoff_type, status, comment) 

120 except HTTPError as e: 

121 # If we got a 400 error because the signoff was already applied, 

122 # then ignore the exception and return successfully to preserve 

123 # idempotency. 

124 message = b'The fields superevent, instrument must make a unique set' 

125 if message not in e.response.content: 

126 raise 

127 

128 

129@task(ignore_result=True, shared=False) 

130@catch_retryable_http_errors 

131def create_tag(filename, tag, graceid): 

132 """Create a tag in GraceDB.""" 

133 filename, file_version = _parse_versioned_filename(filename) 

134 log = get_log(graceid) 

135 if file_version is None: 

136 *_, entry = (e for e in log if e['filename'] == filename) 

137 else: 

138 *_, entry = (e for e in log if e['filename'] == filename 

139 and e['file_version'] == file_version) 

140 log_number = entry['N'] 

141 try: 

142 client.events[graceid].logs[log_number].tags.create(tag) 

143 except HTTPError as e: 

144 # If we got a 400 error because no change was made, then ignore 

145 # the exception and return successfully to preserve idempotency. 

146 message = b'"Tag is already applied to this log message"' 

147 if e.response.content != message: 

148 raise 

149 

150 

151@task(shared=False) 

152@catch_retryable_http_errors 

153def create_voevent(graceid, voevent_type, **kwargs): 

154 """Create a VOEvent. 

155 

156 Returns 

157 ------- 

158 str 

159 The filename of the new VOEvent. 

160 

161 """ 

162 response = client.events[graceid].voevents.create( 

163 voevent_type=voevent_type, **kwargs) 

164 return response['filename'] 

165 

166 

167@task(shared=False) 

168@catch_retryable_http_errors 

169def download(filename, graceid): 

170 """Download a file from GraceDB.""" 

171 with client.events[graceid].files[filename].get() as f: 

172 return f.read() 

173 

174 

175@task(ignore_result=True, shared=False) 

176@catch_retryable_http_errors 

177def expose(graceid): 

178 """Expose an event to the public. 

179 

180 Notes 

181 ----- 

182 If :obj:`~gwcelery.conf.expose_to_public` is False, then this because a 

183 no-op. 

184 

185 """ 

186 if app.conf['expose_to_public']: 

187 client.superevents[graceid].expose() 

188 

189 

190@task(shared=False) 

191@catch_retryable_http_errors 

192def get_events(query, **kwargs): 

193 """Get events from GraceDB.""" 

194 return list(client.events.search(query=query, **kwargs)) 

195 

196 

197@task(shared=False) 

198@catch_retryable_http_errors 

199def get_event(graceid): 

200 """Retrieve an event from GraceDB.""" 

201 return client.events[graceid].get() 

202 

203 

204@task(shared=False) 

205@catch_retryable_http_errors 

206def get_group(graceid): 

207 """Retrieve the search field of an event from GraceDB.""" 

208 return client.events[graceid].get()['group'] 

209 

210 

211@task(shared=False) 

212@catch_retryable_http_errors 

213def get_search(graceid): 

214 """Retrieve the search field of an event from GraceDB.""" 

215 return client.events[graceid].get()['search'] 

216 

217 

218@task(shared=False) 

219@catch_retryable_http_errors 

220def get_labels(graceid): 

221 """Get all labels for an event in GraceDB.""" 

222 return {row['name'] for row in client.events[graceid].labels.get()} 

223 

224 

225@task(shared=False) 

226@catch_retryable_http_errors 

227def get_log(graceid): 

228 """Get all log messages for an event in GraceDB.""" 

229 return client.events[graceid].logs.get() 

230 

231 

232@task(shared=False) 

233@catch_retryable_http_errors 

234def get_superevent(graceid): 

235 """Retrieve a superevent from GraceDB.""" 

236 return client.superevents[graceid].get() 

237 

238 

239@task(shared=False) 

240@catch_retryable_http_errors 

241def replace_event(graceid, payload): 

242 """Get an event from GraceDB.""" 

243 return client.events.update(graceid, filecontents=payload) 

244 

245 

246@task(shared=False) 

247@catch_retryable_http_errors 

248def upload(filecontents, filename, graceid, message, tags=()): 

249 """Upload a file to GraceDB.""" 

250 result = client.events[graceid].logs.create( 

251 comment=message, filename=filename, 

252 filecontents=filecontents, tags=tags) 

253 return '{},{}'.format(result['filename'], result['file_version']) 

254 

255 

256@app.task(shared=False) 

257@catch_retryable_http_errors 

258def get_superevents(query, **kwargs): 

259 """List matching superevents in gracedb. 

260 

261 Parameters 

262 ---------- 

263 *args 

264 arguments passed to :meth:`GraceDb.superevents` 

265 **kwargs 

266 keyword arguments passed to :meth:`GraceDb.superevents` 

267 

268 Returns 

269 ------- 

270 superevents : list 

271 The list of the superevents. 

272 

273 """ 

274 return list(client.superevents.search(query=query, **kwargs)) 

275 

276 

277@task(ignore_result=True, shared=False) 

278@catch_retryable_http_errors 

279def remove_pipeline_preferred_event(superevent_id, event_id): 

280 client.superevents[ 

281 superevent_id].pipeline_preferred_events.remove(event_id) 

282 

283 

284@task(ignore_result=True, shared=False) 

285@catch_retryable_http_errors 

286def update_superevent(superevent_id, t_start=None, 

287 t_end=None, t_0=None, preferred_event=None, 

288 em_type=None, time_coinc_far=None, 

289 space_coinc_far=None): 

290 """ 

291 Update superevent information. Wrapper around 

292 :meth:`updateSuperevent` 

293 

294 Parameters 

295 ---------- 

296 superevent_id : str 

297 superevent uid 

298 t_start : float 

299 start of superevent time window, unchanged if None 

300 t_end : float 

301 end of superevent time window, unchanged if None 

302 t_0 : float 

303 superevent t_0, unchanged if None 

304 preferred_event : str 

305 uid of the preferred event, unchanged if None 

306 

307 """ 

308 try: 

309 client.superevents.update( 

310 superevent_id, t_start=t_start, t_end=t_end, t_0=t_0, 

311 preferred_event=preferred_event, em_type=em_type, 

312 time_coinc_far=time_coinc_far, space_coinc_far=space_coinc_far) 

313 except HTTPError as e: 

314 # If we got a 400 error because no change was made, then ignore 

315 # the exception and return successfully to preserve idempotency. 

316 error_msg = b'"Request would not modify the superevent"' 

317 if not (e.response.status_code == 400 

318 and e.response.content == error_msg): 

319 raise 

320 

321 

322@task(shared=False) 

323@catch_retryable_http_errors 

324def create_superevent(graceid, t0, t_start, t_end): 

325 """Create new superevent in GraceDB with `graceid` 

326 

327 Parameters 

328 ---------- 

329 graceid : str 

330 graceid with which superevent is created. 

331 t0 : float 

332 ``t_0`` parameter of superevent 

333 t_start : float 

334 ``t_start`` parameter of superevent 

335 t_end : float 

336 ``t_end`` parameter of superevent 

337 

338 """ 

339 try: 

340 response = client.superevents.create( 

341 t_start=t_start, t_0=t0, t_end=t_end, preferred_event=graceid) 

342 return response['superevent_id'] 

343 except HTTPError as e: 

344 error_msg = b'is already assigned to a Superevent' 

345 if not (e.response.status_code == 400 

346 and error_msg in e.response.content): 

347 raise 

348 

349 

350@task(ignore_result=True, shared=False) 

351@catch_retryable_http_errors 

352def add_event_to_superevent(superevent_id, graceid): 

353 """Add an event to a superevent in GraceDB.""" 

354 try: 

355 client.superevents[superevent_id].add(graceid) 

356 except HTTPError as e: 

357 error_msg = b'is already assigned to a Superevent' 

358 if not (e.response.status_code == 400 

359 and error_msg in e.response.content): 

360 raise 

361 

362 

363@task(shared=False) 

364@catch_retryable_http_errors 

365def get_superevent_file_list(superevent_id): 

366 """Get superevent file list from GraceDB.""" 

367 filelist = client.superevents[superevent_id].files.get() 

368 return filelist 

369 

370 

371@task(shared=False) 

372@catch_retryable_http_errors 

373def get_latest_file(superevent_id, filename): 

374 """Get the lastest file provided a file name 

375 

376 Parameters 

377 ---------- 

378 superevent_id : str 

379 superevent uid 

380 

381 file_name : str 

382 The filebase of a file name 

383 e.g. 'bayestar.multiorder.fits' for 'bayestar.multiorder.fits,0' 

384 

385 Returns 

386 ---------- 

387 The versioned filename for the inquired file 

388 """ 

389 # Get file list for superevent 

390 file_list = get_superevent_file_list(superevent_id) 

391 # Loop over the keys and mark the key if key includes filename 

392 keys = [key for key in file_list.keys() if filename in key] 

393 if any(keys): 

394 return max(keys) 

395 else: 

396 return None