Coverage for gwcelery/tasks/first2years_external.py: 100%

103 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Create mock external events to be in coincidence 

2 with MDC superevents.""" 

3 

4import random 

5import re 

6from pathlib import Path 

7 

8import numpy as np 

9from astropy.time import Time 

10from lxml import etree 

11 

12from .. import app 

13from ..tests import data 

14from ..util import read_json 

15from . import external_triggers, igwn_alert, raven 

16 

17 

18def create_upload_external_event(gpstime, pipeline, ext_search): 

19 """Create a random external event VOEvent or Kafka alert packet. 

20 

21 Parameters 

22 ---------- 

23 gpstime : float 

24 Event's GPS time 

25 pipeline : str 

26 External trigger pipeline name 

27 ext_search : str 

28 Search field for external event 

29 

30 Returns 

31 ------- 

32 event : str or dict 

33 Alert packet in format as if it was sent from GCN or Kafka, in a 

34 XML GCN notice alert packet in string format for the GRB or SubGRB 

35 search or a dictionary Kafka packet if for SubGRBTargeted search. 

36 """ 

37 new_date = str(Time(gpstime, format='gps', scale='utc').isot) + 'Z' 

38 new_TrigID = str(int(gpstime)) 

39 ra = random.uniform(0, 360) 

40 thetas = np.arange(-np.pi / 2, np.pi / 2, .01) 

41 dec = random.choices(np.rad2deg(thetas), 

42 weights=np.cos(thetas) / sum(np.cos(thetas)))[0] 

43 error = .05 if pipeline == 'Swift' else random.uniform(1, 30) 

44 

45 if pipeline in {'Fermi', 'Swift', 'INTEGRAL'}: 

46 is_grb = True 

47 else: 

48 is_grb = False 

49 

50 # If SubGRBTargeted modify alert packet from Kafka 

51 if ext_search == 'SubGRBTargeted': 

52 if pipeline == 'Fermi': 

53 # Template based on: 

54 # https://github.com/joshuarwood/gcn-schema/tree/main/gcn/notices/ 

55 # fermi/gbm 

56 alert = read_json(data, 'kafka_alert_fermi.json') 

57 # Remove sky map file to create our own sky map 

58 alert.pop('healpix_file') 

59 alert['dec_uncertainty'] = [error] 

60 elif pipeline == 'Swift': 

61 # Template based on: 

62 # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/ 

63 # swift/bat/guano 

64 alert = read_json(data, 'kafka_alert_swift.json') 

65 alert['ra_uncertainty'] = [error] 

66 else: 

67 raise ValueError( 

68 'Only use Fermi or Swift for SubGRBTargeted search') 

69 

70 alert['trigger_time'] = new_date 

71 alert['alert_datetime'] = new_date 

72 alert['id'] = [new_TrigID] 

73 alert['ra'] = ra 

74 alert['dec'] = dec 

75 # Generate FAR from max threshold and trials factors 

76 alert['far'] = \ 

77 (app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] * 

78 random.uniform(0, 1)) 

79 

80 external_triggers.handle_targeted_kafka_alert(alert) 

81 

82 # Return VOEvent for testing until GraceDB natively ingests kafka 

83 # alerts 

84 return external_triggers._kafka_to_voevent(alert, ext_search)[0] 

85 

86 # Otherwise modify respective VOEvent template 

87 else: 

88 fname = str(Path(__file__).parent / 

89 '../tests/data/{0}{1}_{2}gcn.xml'.format( 

90 pipeline.lower(), 

91 '_subthresh' if ext_search == 'SubGRB' else '', 

92 'grb_' if is_grb else '')) 

93 

94 root = etree.parse(fname) 

95 # Change ivorn to indicate if this is an MDC event or O3 replay event 

96 root.xpath('.')[0].attrib['ivorn'] = \ 

97 'ivo://lvk.internal/{0}#{1}{2}_event_{3}'.format( 

98 pipeline if pipeline != 'Swift' else 'SWIFT', 

99 '_subthresh' if ext_search == 'SubGRB' else '', 

100 'MDC-test' if ext_search == 'MDC' else 'O3-replay', 

101 new_date).encode() 

102 

103 # Change times to chosen time 

104 root.find("./Who/Date").text = str(new_date).encode() 

105 root.find(("./WhereWhen/ObsDataLocation/" 

106 "ObservationLocation/AstroCoords/Time/TimeInstant/" 

107 "ISOTime")).text = str(new_date).encode() 

108 if ext_search == 'SubGRB': 

109 root.find("./What/Param[@name='Trans_Num']").attrib['value'] = \ 

110 str(new_TrigID).encode() 

111 else: 

112 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

113 str(new_TrigID).encode() 

114 

115 if is_grb: 

116 # Give random sky position 

117 root.find(("./WhereWhen/ObsDataLocation/" 

118 "ObservationLocation/AstroCoords/Position2D/Value2/" 

119 "C1")).text = str(ra).encode() 

120 root.find(("./WhereWhen/ObsDataLocation/" 

121 "ObservationLocation/AstroCoords/Position2D/Value2/" 

122 "C2")).text = str(dec).encode() 

123 if pipeline != 'Swift': 

124 root.find( 

125 ("./WhereWhen/ObsDataLocation/" 

126 "ObservationLocation/AstroCoords/Position2D/" 

127 "Error2Radius")).text = str(error).encode() 

128 

129 event = etree.tostring(root, xml_declaration=True, encoding="UTF-8", 

130 pretty_print=True) 

131 

132 # Upload as from GCN 

133 if is_grb: 

134 external_triggers.handle_grb_gcn(event) 

135 else: 

136 external_triggers.handle_snews_gcn(event) 

137 

138 return event 

139 

140 

141def _offset_time(gpstime, group, pipeline, ext_search, se_search): 

142 """Offsets the given GPS time by applying a random number within a time 

143 window, determine by the search being done. 

144 

145 Parameters 

146 ---------- 

147 gpstime : float 

148 Event's GPS time 

149 group : str 

150 Burst or CBC 

151 pipeline : str 

152 Pipeline field for external event 

153 ext_search : str 

154 Search field for external event 

155 se_search : list 

156 Search field for superevent 

157 

158 Returns 

159 ------- 

160 gsptime_adjusted : float 

161 Event's original gps time plus a random number within the determined 

162 search window 

163 

164 """ 

165 tl, th = raven._time_window('S1', group, [pipeline], 

166 [ext_search], [se_search]) 

167 return gpstime + random.uniform(tl, th) 

168 

169 

170def _is_joint_mdc(graceid, se_search): 

171 """Determine whether to upload an external events using user-defined 

172 frequency of MDC or AllSky superevents. 

173 

174 Looks at the ending letters of a superevent (e.g. 'ac' from 'MS190124ac'), 

175 converts to a number, and checks if divisible by a number given in the 

176 configuration file. 

177 

178 For example, if the configuration number 

179 :obj:`~gwcelery.conf.joint_mdc_freq` is 10, 

180 this means joint events with superevents ending with 'j', 't', 'ad', etc. 

181 

182 Parameters 

183 ---------- 

184 graceid : str 

185 GraceDB ID of superevent 

186 se_search : str 

187 Search field for preferred event in superevent 

188 

189 Returns 

190 ------- 

191 is_joint_mdc : bool 

192 Returns True if the GraceDB ID matches pre-determined frequency rates 

193 

194 """ 

195 end_string = re.split(r'\d+', graceid)[-1].lower() 

196 val = 0 

197 for i in range(len(end_string)): 

198 val += (ord(end_string[i]) - 96) * 26 ** (len(end_string) - i - 1) 

199 return val % int(app.conf['joint_mdc_freq']) == 0 if se_search == 'MDC' \ 

200 else val % int(app.conf['joint_O3_replay_freq']) == 0 

201 

202 

203@igwn_alert.handler('mdc_superevent', 

204 'superevent', 

205 shared=False) 

206def upload_external_event(alert, ext_search=None): 

207 """Upload a random GRB event(s) for a certain fraction of MDC 

208 or O3-replay superevents. 

209 

210 Notes 

211 ----- 

212 Every n superevents, upload a GRB candidate within the 

213 standard CBC-GRB or Burst-GRB search window, where the frequency n is 

214 determined by the configuration variable 

215 :obj:`~gwcelery.conf.joint_mdc_freq` or 

216 :obj:`~gwcelery.conf.joint_O3_replay_freq`. 

217 

218 For O3 replay testing with RAVEN pipeline, only runs on gracedb-playground. 

219 

220 Parameters 

221 ---------- 

222 alert : dict 

223 IGWN alert packet 

224 ext_search : str 

225 Search field for external event 

226 

227 Returns 

228 ------- 

229 events, pipelines : tuple 

230 Returns tuple of the list of external events created and the list of 

231 pipelines chosen for each event 

232 

233 """ 

234 if alert['alert_type'] != 'new': 

235 return 

236 se_search = alert['object']['preferred_event_data']['search'] 

237 group = alert['object']['preferred_event_data']['group'] 

238 is_gracedb_playground = app.conf['gracedb_host'] \ 

239 == 'gracedb-playground.ligo.org' 

240 joint_mdc_alert = se_search == 'MDC' and _is_joint_mdc(alert['uid'], 'MDC') 

241 joint_o3replay_alert = se_search in {'AllSky', 'BBH'} and \ 

242 _is_joint_mdc(alert['uid'], 'AllSky') and is_gracedb_playground and \ 

243 group in {'CBC', 'Burst'} 

244 if not (joint_mdc_alert or joint_o3replay_alert): 

245 return 

246 

247 # Potentially upload 1, 2, or 3 GRB events 

248 num = 1 + np.random.choice(np.arange(3), p=[.6, .3, .1]) 

249 

250 if joint_mdc_alert: 

251 # If joint MDC alert, make external event MDC 

252 if ext_search is None: 

253 ext_search = 'MDC' 

254 elif ext_search == 'MDC': 

255 pass 

256 else: 

257 raise ValueError('External search must be "MDC" if MDC superevent') 

258 # If O3 replay, choose search from acceptable list 

259 elif ext_search is None: 

260 # Determine search for external event and then pipelines 

261 ext_search = \ 

262 (np.random.choice(['GRB', 'SubGRB', 'SubGRBTargeted'], 

263 p=[.6, .1, .3]) 

264 if joint_o3replay_alert else 'GRB') 

265 # Choose pipeline(s) based on search 

266 if ext_search in {'GRB', 'MDC'}: 

267 pipelines = np.random.choice(['Fermi', 'Swift', 'INTEGRAL'], 

268 p=[.6, .3, .1,], 

269 size=num, replace=False) 

270 elif ext_search == 'SubGRB': 

271 pipelines = np.full(num, 'Fermi') 

272 elif ext_search == 'SubGRBTargeted': 

273 # Only two current pipelines in targeted search so map 3 => 2 

274 num = min(num, 2) 

275 pipelines = np.random.choice(['Fermi', 'Swift'], 

276 p=[.5, .5], size=num, replace=False) 

277 events = [] 

278 for pipeline in pipelines: 

279 gpstime = float(alert['object']['t_0']) 

280 new_time = _offset_time(gpstime, group, pipeline, 

281 ext_search, se_search) 

282 

283 # Choose external grb pipeline to simulate 

284 ext_event = create_upload_external_event( 

285 new_time, pipeline, ext_search) 

286 

287 events.append(ext_event) 

288 

289 return events, pipelines 

290 

291 

292@app.task(ignore_result=True, 

293 shared=False) 

294def upload_snews_event(): 

295 """Create and upload a SNEWS-like MDC external event. 

296 

297 Returns 

298 ------- 

299 ext_event : str 

300 XML GCN notice alert packet in string format 

301 """ 

302 current_time = Time(Time.now(), format='gps').value 

303 ext_event = create_upload_external_event(current_time, 'SNEWS', 'MDC') 

304 return ext_event