Coverage for gwcelery/tasks/alerts.py: 83%

113 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1import json 

2 

3import numpy as np 

4from astropy import time 

5from celery import group 

6from celery.utils.log import get_logger 

7 

8from .. import app 

9from ..kafka.signals import kafka_record_consumed 

10from . import gracedb 

11from .core import DispatchHandler 

12 

13log = get_logger(__name__) 

14 

15# FIXME Remove this once cwb bbh is uploading to cbc instead of burst 

16CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP = { 

17 'Burst': {'BBH': 'CBC'} 

18} 

19 

20 

21class _KafkaDispatchHandler(DispatchHandler): 

22 

23 def process_args(self, name, record): 

24 return name, (record,), {} 

25 

26 def __call__(self, *keys, **kwargs): 

27 r"""Create a new task and register it as a callback for handling the 

28 given keys. 

29 

30 Parameters 

31 ---------- 

32 \*keys : list 

33 Keys to match 

34 \*\*kwargs 

35 Additional keyword arguments for `celery.Celery.task`. 

36 

37 """ 

38 def wrap(f): 

39 f = gracedb.task(ignore_result=True, **kwargs)(f) 

40 for key in keys: 

41 self.setdefault(key, []).append(f) 

42 return f 

43 

44 return wrap 

45 

46 

47handler = _KafkaDispatchHandler() 

48r"""Function decorator to register a handler callback for specified Kafka URLs. 

49The decorated function is turned into a Celery task, which will be 

50automatically called whenever a message is received from a matching URL. 

51 

52Parameters 

53---------- 

54\*keys 

55 List of keys from :obj:`gwcelery.conf.kafka_consumer_config` 

56 associated with Kafka topics to listen to messages to. 

57\*\*kwargs 

58 Additional keyword arguments for :meth:`celery.Celery.task`. 

59 

60Examples 

61-------- 

62Declare a new handler like this:: 

63 

64 # Assumes kafka_consumer_config dictionary has 'fermi_swift' key 

65 @alerts.handler('fermi_swift') 

66 def handle_swift(record): 

67 # record is a dict that contains the contents of the message 

68 # do work here... 

69""" 

70 

71 

72@kafka_record_consumed.connect 

73def _on_kafka_record_consumed(name, record, **kwargs): 

74 handler.dispatch(name, record) 

75 

76 

77def _create_base_alert_dict(classification, superevent, alert_type): 

78 '''Create the base of the alert dictionary, with all contents except the 

79 skymap and the external coinc information.''' 

80 # NOTE Everything that comes through this code path will be marked as 

81 # public. However, MDC events with this flag are not made public on 

82 # GraceDB-playground and GraceDB-test. 

83 # Re time_created: Dont need better than second precision for alert times 

84 

85 # FIXME Dont change alert types internally 

86 # NOTE less-significant alerts have alert_type as PRELIMINARY 

87 alert_type_kafka = 'preliminary' if alert_type == 'less-significant' \ 

88 else alert_type 

89 # NOTE the alert group is usually the same as the g-event group. Exceptions 

90 # are recorded in the CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP definition 

91 # above 

92 superevent_group = superevent['preferred_event_data']['group'] 

93 superevent_search = superevent['preferred_event_data']['search'] 

94 if superevent_group in CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP and \ 

95 superevent_search in \ 

96 CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP[superevent_group]: 

97 alert_group_kafka = \ 

98 CUSTOM_EVENT_GROUP_TO_NOTICE_GROUP_MAP[ 

99 superevent_group 

100 ][superevent_search] 

101 else: 

102 alert_group_kafka = superevent['preferred_event_data']['group'] 

103 

104 alert_dict = { 

105 'alert_type': alert_type_kafka.upper(), 

106 'time_created': time.Time.now().utc.isot.split('.')[0] + 'Z', 

107 'superevent_id': superevent['superevent_id'], 

108 'urls': {'gracedb': superevent['links']['self'].replace('api/', '') + 

109 'view/'}, 

110 'event': None, 

111 'external_coinc': None 

112 } 

113 

114 if alert_type == 'retraction': 

115 return alert_dict 

116 

117 if classification and classification[0] is not None: 

118 properties = json.loads(classification[0]) 

119 else: 

120 properties = {} 

121 

122 if classification and classification[1] is not None: 

123 classification = json.loads(classification[1]) 

124 else: 

125 classification = {} 

126 

127 duration = None 

128 central_frequency = None 

129 

130 if alert_group_kafka == 'Burst': 

131 if superevent['preferred_event_data']['pipeline'].lower() == 'cwb': 

132 duration = \ 

133 superevent['preferred_event_data']['extra_attributes'].get( 

134 'MultiBurst', {}).get('duration', None) 

135 central_frequency = \ 

136 superevent['preferred_event_data']['extra_attributes'].get( 

137 'MultiBurst', {}).get('central_freq', None) 

138 elif superevent['preferred_event_data']['pipeline'].lower() == 'mly': 

139 duration = \ 

140 superevent['preferred_event_data']['extra_attributes'].get( 

141 'MLyBurst', {}).get('duration', None) 

142 central_frequency = \ 

143 superevent['preferred_event_data']['extra_attributes'].get( 

144 'MLyBurst', {}).get('central_freq', None) 

145 elif superevent['preferred_event_data']['pipeline'].lower() == 'olib': 

146 quality_mean = \ 

147 superevent['preferred_event_data']['extra_attributes'].get( 

148 'LalInferenceBurst', {}).get('quality_mean', None) 

149 frequency_mean = \ 

150 superevent['preferred_event_data']['extra_attributes'].get( 

151 'LalInferenceBurst', {}).get('frequency_mean', None) 

152 central_frequency = \ 

153 superevent['preferred_event_data']['extra_attributes'].get( 

154 'LalInferenceBurst', {}).get('frequency_mean', None) 

155 duration = quality_mean / (2 * np.pi * frequency_mean) 

156 else: 

157 raise NotImplementedError( 

158 'Duration and central_frequency not implemented for Burst ' 

159 'pipeline {}'.format( 

160 superevent['preferred_event_data']['pipeline'].lower() 

161 ) 

162 ) 

163 

164 alert_dict['event'] = { 

165 # set 'significant' field based on 

166 # https://dcc.ligo.org/LIGO-G2300151/public 

167 'significant': False if alert_type == 'less-significant' else True, 

168 'time': time.Time(superevent['t_0'], format='gps').utc.isot + 'Z', 

169 'far': superevent['far'], 

170 'instruments': sorted( 

171 superevent['preferred_event_data']['instruments'].split(',') 

172 ), 

173 'group': alert_group_kafka, 

174 'pipeline': superevent['preferred_event_data']['pipeline'], 

175 'search': superevent_search, 

176 'properties': properties, 

177 'classification': classification, 

178 'duration': duration, 

179 'central_frequency': central_frequency 

180 } 

181 

182 return alert_dict 

183 

184 

185@gracedb.task(shared=False) 

186def _add_external_coinc_to_alert(alert_dict, superevent, 

187 combined_skymap_filename): 

188 external_event = gracedb.get_event(superevent['em_type']) 

189 if combined_skymap_filename: 

190 combined_skymap = gracedb.download(combined_skymap_filename, 

191 superevent['superevent_id']) 

192 else: 

193 combined_skymap = None 

194 alert_dict['external_coinc'] = { 

195 'gcn_notice_id': 

196 int(external_event['extra_attributes']['GRB']['trigger_id']), 

197 'ivorn': external_event['extra_attributes']['GRB']['ivorn'], 

198 'observatory': external_event['pipeline'], 

199 'search': external_event['search'], 

200 'time_difference': round(external_event['gpstime'] - 

201 superevent['t_0'], 2), 

202 'time_coincidence_far': superevent['time_coinc_far'], 

203 'time_sky_position_coincidence_far': superevent['space_coinc_far'] 

204 } 

205 

206 return alert_dict, combined_skymap 

207 

208 

209@app.task(bind=True, shared=False, queue='kafka', ignore_result=True) 

210def _upload_notice(self, payload, brokerhost, superevent_id): 

211 ''' 

212 Upload serialized alert notice to GraceDB 

213 ''' 

214 config = self.app.conf['kafka_alert_config'][brokerhost] 

215 kafka_writer = self.app.conf['kafka_streams'][brokerhost] 

216 

217 # FIXME Drop get_payload_content method once 

218 # https://github.com/scimma/hop-client/pull/190 is merged 

219 alert_dict = kafka_writer.get_payload_content(payload) 

220 message = 'Kafka alert notice sent to {}'.format(config['url']) 

221 

222 filename = '{}-{}.{}'.format( 

223 alert_dict['superevent_id'], 

224 alert_dict['alert_type'].lower(), 

225 config['suffix'] 

226 ) 

227 

228 gracedb.upload.delay(payload.serialize()['content'], filename, 

229 superevent_id, message, tags=['public', 'em_follow']) 

230 

231 

232@app.task(bind=True, queue='kafka', shared=False) 

233def _send(self, alert_dict, skymap, brokerhost, combined_skymap=None): 

234 """Write the alert to the Kafka topic""" 

235 # Copy the alert dictionary so we dont modify the original 

236 payload_dict = alert_dict.copy() 

237 # Add skymap to alert_dict 

238 config = self.app.conf['kafka_alert_config'][brokerhost] 

239 if alert_dict['event'] is not None: 

240 # dict.copy is a shallow copy, so need to copy event dict as well since 

241 # we plan to modify it 

242 payload_dict['event'] = alert_dict['event'].copy() 

243 

244 # Encode the skymap 

245 encoder = config['skymap_encoder'] 

246 payload_dict['event']['skymap'] = encoder(skymap) 

247 

248 if combined_skymap: 

249 payload_dict['external_coinc']['combined_skymap'] = \ 

250 encoder(combined_skymap) 

251 

252 # Write to kafka topic 

253 serialization_model = \ 

254 self.app.conf['kafka_streams'][brokerhost].serialization_model 

255 payload = serialization_model(payload_dict) 

256 self.app.conf['kafka_streams'][brokerhost].write(payload) 

257 

258 return payload 

259 

260 

261@app.task(bind=True, queue='kafka', shared=False) 

262def _send_with_combined(self, alert_dict_combined_skymap, skymap, brokerhost): 

263 alert_dict, combined_skymap = alert_dict_combined_skymap 

264 return _send(alert_dict, skymap, brokerhost, 

265 combined_skymap=combined_skymap) 

266 

267 

268@app.task(bind=True, ignore_result=True, queue='kafka', shared=False) 

269def send(self, skymap_and_classification, superevent, alert_type, 

270 raven_coinc=False, combined_skymap_filename=None): 

271 """Send an public alert to all currently connected kafka brokers. 

272 

273 Parameters 

274 ---------- 

275 skymap_and_classification : tuple, None 

276 The filecontents of the skymap followed by a collection of JSON 

277 strings. The former generated by 

278 :meth:`gwcelery.tasks.gracedb.download`, the latter generated by 

279 :meth:`gwcelery.tasks.em_bright.classifier` and 

280 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or content of 

281 ``{gstlal,mbta}.p_astro.json`` uploaded by {gstlal,mbta} respectively. 

282 Can also be None. 

283 superevent : dict 

284 The superevent dictionary, typically obtained from an IGWN Alert or 

285 from querying GraceDB. 

286 alert_type : str 

287 The alert type. Either of {`less-significant`, `earlywarning`, 

288 `preliminary`, `initial`, `update`}. 

289 raven_coinc: bool 

290 Is there a coincident external event processed by RAVEN? 

291 combined_skymap_filename : str 

292 Combined skymap filename. Default None. 

293 

294 Notes 

295 ----- 

296 The `alert_type` value is used to set the `significant` field in the 

297 alert dictionary. 

298 """ 

299 

300 if skymap_and_classification is not None: 

301 skymap, *classification = skymap_and_classification 

302 else: 

303 skymap = None 

304 classification = None 

305 

306 alert_dict = _create_base_alert_dict( 

307 classification, 

308 superevent, 

309 alert_type 

310 ) 

311 

312 if raven_coinc and alert_type != 'retraction': 

313 canvas = ( 

314 _add_external_coinc_to_alert.si( 

315 alert_dict, 

316 superevent, 

317 combined_skymap_filename 

318 ) 

319 | 

320 group( 

321 ( 

322 _send_with_combined.s(skymap, brokerhost) 

323 | 

324 _upload_notice.s(brokerhost, superevent['superevent_id']) 

325 ) for brokerhost in self.app.conf['kafka_streams'].keys() 

326 ) 

327 ) 

328 else: 

329 canvas = ( 

330 group( 

331 ( 

332 _send.s(alert_dict, skymap, brokerhost) 

333 | 

334 _upload_notice.s(brokerhost, superevent['superevent_id']) 

335 ) for brokerhost in self.app.conf['kafka_streams'].keys() 

336 ) 

337 ) 

338 

339 canvas.apply_async() 

340 

341 

342@app.task(shared=False) 

343def _create_skymap_classification_tuple(skymap, classification): 

344 return (skymap, *classification) 

345 

346 

347@app.task(shared=False, ignore_result=True) 

348def download_skymap_and_send_alert(classification, superevent, alert_type, 

349 skymap_filename=None, raven_coinc=False, 

350 combined_skymap_filename=None): 

351 """Wrapper for send function when caller has not already downloaded the 

352 skymap. 

353 

354 Parameters 

355 ---------- 

356 classification : tuple, None 

357 A collection of JSON strings, generated by 

358 :meth:`gwcelery.tasks.em_bright.classifier` and 

359 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or 

360 content of ``{gstlal,mbta}.p_astro.json`` uploaded by {gstlal,mbta} 

361 respectively; or None 

362 superevent : dict 

363 The superevent dictionary, typically obtained from an IGWN Alert or 

364 from querying GraceDB. 

365 alert_type : {'earlywarning', 'preliminary', 'initial', 'update'} 

366 The alert type. 

367 skymap_filename : string 

368 The skymap filename. 

369 raven_coinc: bool 

370 Is there a coincident external event processed by RAVEN? 

371 combined_skymap_filename : str 

372 The combined skymap filename. Default None 

373 """ 

374 

375 if skymap_filename is not None and alert_type != 'retraction': 

376 canvas = ( 

377 gracedb.download.si( 

378 skymap_filename, 

379 superevent['superevent_id'] 

380 ) 

381 | 

382 _create_skymap_classification_tuple.s(classification) 

383 | 

384 send.s(superevent, alert_type, raven_coinc=raven_coinc, 

385 combined_skymap_filename=combined_skymap_filename) 

386 ) 

387 else: 

388 canvas = send.s( 

389 (None, classification), 

390 superevent, 

391 alert_type, 

392 raven_coinc=raven_coinc, 

393 combined_skymap_filename=combined_skymap_filename 

394 ) 

395 

396 canvas.apply_async()