Coverage for gwcelery/tasks/first2years_external.py: 100%
103 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Create mock external events to be in coincidence
2 with MDC superevents."""
4import random
5import re
6from pathlib import Path
8import numpy as np
9from astropy.time import Time
10from lxml import etree
12from .. import app
13from ..tests import data
14from ..util import read_json
15from . import external_triggers, igwn_alert, raven
18def create_upload_external_event(gpstime, pipeline, ext_search):
19 """Create a random external event VOEvent or Kafka alert packet.
21 Parameters
22 ----------
23 gpstime : float
24 Event's GPS time
25 pipeline : str
26 External trigger pipeline name
27 ext_search : str
28 Search field for external event
30 Returns
31 -------
32 event : str or dict
33 Alert packet in format as if it was sent from GCN or Kafka, in a
34 XML GCN notice alert packet in string format for the GRB or SubGRB
35 search or a dictionary Kafka packet if for SubGRBTargeted search.
36 """
37 new_date = str(Time(gpstime, format='gps', scale='utc').isot) + 'Z'
38 new_TrigID = str(int(gpstime))
39 ra = random.uniform(0, 360)
40 thetas = np.arange(-np.pi / 2, np.pi / 2, .01)
41 dec = random.choices(np.rad2deg(thetas),
42 weights=np.cos(thetas) / sum(np.cos(thetas)))[0]
43 error = .05 if pipeline == 'Swift' else random.uniform(1, 30)
45 if pipeline in {'Fermi', 'Swift', 'INTEGRAL'}:
46 is_grb = True
47 else:
48 is_grb = False
50 # If SubGRBTargeted modify alert packet from Kafka
51 if ext_search == 'SubGRBTargeted':
52 if pipeline == 'Fermi':
53 # Template based on:
54 # https://github.com/joshuarwood/gcn-schema/tree/main/gcn/notices/
55 # fermi/gbm
56 alert = read_json(data, 'kafka_alert_fermi.json')
57 # Remove sky map file to create our own sky map
58 alert.pop('healpix_file')
59 alert['dec_uncertainty'] = [error]
60 elif pipeline == 'Swift':
61 # Template based on:
62 # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/
63 # swift/bat/guano
64 alert = read_json(data, 'kafka_alert_swift.json')
65 alert['ra_uncertainty'] = [error]
66 else:
67 raise ValueError(
68 'Only use Fermi or Swift for SubGRBTargeted search')
70 alert['trigger_time'] = new_date
71 alert['alert_datetime'] = new_date
72 alert['id'] = [new_TrigID]
73 alert['ra'] = ra
74 alert['dec'] = dec
75 # Generate FAR from max threshold and trials factors
76 alert['far'] = \
77 (app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] *
78 random.uniform(0, 1))
80 external_triggers.handle_targeted_kafka_alert(alert)
82 # Return VOEvent for testing until GraceDB natively ingests kafka
83 # alerts
84 return external_triggers._kafka_to_voevent(alert, ext_search)[0]
86 # Otherwise modify respective VOEvent template
87 else:
88 fname = str(Path(__file__).parent /
89 '../tests/data/{0}{1}_{2}gcn.xml'.format(
90 pipeline.lower(),
91 '_subthresh' if ext_search == 'SubGRB' else '',
92 'grb_' if is_grb else ''))
94 root = etree.parse(fname)
95 # Change ivorn to indicate if this is an MDC event or O3 replay event
96 root.xpath('.')[0].attrib['ivorn'] = \
97 'ivo://lvk.internal/{0}#{1}{2}_event_{3}'.format(
98 pipeline if pipeline != 'Swift' else 'SWIFT',
99 '_subthresh' if ext_search == 'SubGRB' else '',
100 'MDC-test' if ext_search == 'MDC' else 'O3-replay',
101 new_date).encode()
103 # Change times to chosen time
104 root.find("./Who/Date").text = str(new_date).encode()
105 root.find(("./WhereWhen/ObsDataLocation/"
106 "ObservationLocation/AstroCoords/Time/TimeInstant/"
107 "ISOTime")).text = str(new_date).encode()
108 if ext_search == 'SubGRB':
109 root.find("./What/Param[@name='Trans_Num']").attrib['value'] = \
110 str(new_TrigID).encode()
111 else:
112 root.find("./What/Param[@name='TrigID']").attrib['value'] = \
113 str(new_TrigID).encode()
115 if is_grb:
116 # Give random sky position
117 root.find(("./WhereWhen/ObsDataLocation/"
118 "ObservationLocation/AstroCoords/Position2D/Value2/"
119 "C1")).text = str(ra).encode()
120 root.find(("./WhereWhen/ObsDataLocation/"
121 "ObservationLocation/AstroCoords/Position2D/Value2/"
122 "C2")).text = str(dec).encode()
123 if pipeline != 'Swift':
124 root.find(
125 ("./WhereWhen/ObsDataLocation/"
126 "ObservationLocation/AstroCoords/Position2D/"
127 "Error2Radius")).text = str(error).encode()
129 event = etree.tostring(root, xml_declaration=True, encoding="UTF-8",
130 pretty_print=True)
132 # Upload as from GCN
133 if is_grb:
134 external_triggers.handle_grb_gcn(event)
135 else:
136 external_triggers.handle_snews_gcn(event)
138 return event
141def _offset_time(gpstime, group, pipeline, ext_search, se_search):
142 """Offsets the given GPS time by applying a random number within a time
143 window, determine by the search being done.
145 Parameters
146 ----------
147 gpstime : float
148 Event's GPS time
149 group : str
150 Burst or CBC
151 pipeline : str
152 Pipeline field for external event
153 ext_search : str
154 Search field for external event
155 se_search : list
156 Search field for superevent
158 Returns
159 -------
160 gsptime_adjusted : float
161 Event's original gps time plus a random number within the determined
162 search window
164 """
165 tl, th = raven._time_window('S1', group, [pipeline],
166 [ext_search], [se_search])
167 return gpstime + random.uniform(tl, th)
170def _is_joint_mdc(graceid, se_search):
171 """Determine whether to upload an external events using user-defined
172 frequency of MDC or AllSky superevents.
174 Looks at the ending letters of a superevent (e.g. 'ac' from 'MS190124ac'),
175 converts to a number, and checks if divisible by a number given in the
176 configuration file.
178 For example, if the configuration number
179 :obj:`~gwcelery.conf.joint_mdc_freq` is 10,
180 this means joint events with superevents ending with 'j', 't', 'ad', etc.
182 Parameters
183 ----------
184 graceid : str
185 GraceDB ID of superevent
186 se_search : str
187 Search field for preferred event in superevent
189 Returns
190 -------
191 is_joint_mdc : bool
192 Returns True if the GraceDB ID matches pre-determined frequency rates
194 """
195 end_string = re.split(r'\d+', graceid)[-1].lower()
196 val = 0
197 for i in range(len(end_string)):
198 val += (ord(end_string[i]) - 96) * 26 ** (len(end_string) - i - 1)
199 return val % int(app.conf['joint_mdc_freq']) == 0 if se_search == 'MDC' \
200 else val % int(app.conf['joint_O3_replay_freq']) == 0
203@igwn_alert.handler('mdc_superevent',
204 'superevent',
205 shared=False)
206def upload_external_event(alert, ext_search=None):
207 """Upload a random GRB event(s) for a certain fraction of MDC
208 or O3-replay superevents.
210 Notes
211 -----
212 Every n superevents, upload a GRB candidate within the
213 standard CBC-GRB or Burst-GRB search window, where the frequency n is
214 determined by the configuration variable
215 :obj:`~gwcelery.conf.joint_mdc_freq` or
216 :obj:`~gwcelery.conf.joint_O3_replay_freq`.
218 For O3 replay testing with RAVEN pipeline, only runs on gracedb-playground.
220 Parameters
221 ----------
222 alert : dict
223 IGWN alert packet
224 ext_search : str
225 Search field for external event
227 Returns
228 -------
229 events, pipelines : tuple
230 Returns tuple of the list of external events created and the list of
231 pipelines chosen for each event
233 """
234 if alert['alert_type'] != 'new':
235 return
236 se_search = alert['object']['preferred_event_data']['search']
237 group = alert['object']['preferred_event_data']['group']
238 is_gracedb_playground = app.conf['gracedb_host'] \
239 == 'gracedb-playground.ligo.org'
240 joint_mdc_alert = se_search == 'MDC' and _is_joint_mdc(alert['uid'], 'MDC')
241 joint_o3replay_alert = se_search in {'AllSky', 'BBH'} and \
242 _is_joint_mdc(alert['uid'], 'AllSky') and is_gracedb_playground and \
243 group in {'CBC', 'Burst'}
244 if not (joint_mdc_alert or joint_o3replay_alert):
245 return
247 # Potentially upload 1, 2, or 3 GRB events
248 num = 1 + np.random.choice(np.arange(3), p=[.6, .3, .1])
250 if joint_mdc_alert:
251 # If joint MDC alert, make external event MDC
252 if ext_search is None:
253 ext_search = 'MDC'
254 elif ext_search == 'MDC':
255 pass
256 else:
257 raise ValueError('External search must be "MDC" if MDC superevent')
258 # If O3 replay, choose search from acceptable list
259 elif ext_search is None:
260 # Determine search for external event and then pipelines
261 ext_search = \
262 (np.random.choice(['GRB', 'SubGRB', 'SubGRBTargeted'],
263 p=[.6, .1, .3])
264 if joint_o3replay_alert else 'GRB')
265 # Choose pipeline(s) based on search
266 if ext_search in {'GRB', 'MDC'}:
267 pipelines = np.random.choice(['Fermi', 'Swift', 'INTEGRAL'],
268 p=[.6, .3, .1,],
269 size=num, replace=False)
270 elif ext_search == 'SubGRB':
271 pipelines = np.full(num, 'Fermi')
272 elif ext_search == 'SubGRBTargeted':
273 # Only two current pipelines in targeted search so map 3 => 2
274 num = min(num, 2)
275 pipelines = np.random.choice(['Fermi', 'Swift'],
276 p=[.5, .5], size=num, replace=False)
277 events = []
278 for pipeline in pipelines:
279 gpstime = float(alert['object']['t_0'])
280 new_time = _offset_time(gpstime, group, pipeline,
281 ext_search, se_search)
283 # Choose external grb pipeline to simulate
284 ext_event = create_upload_external_event(
285 new_time, pipeline, ext_search)
287 events.append(ext_event)
289 return events, pipelines
292@app.task(ignore_result=True,
293 shared=False)
294def upload_snews_event():
295 """Create and upload a SNEWS-like MDC external event.
297 Returns
298 -------
299 ext_event : str
300 XML GCN notice alert packet in string format
301 """
302 current_time = Time(Time.now(), format='gps').value
303 ext_event = create_upload_external_event(current_time, 'SNEWS', 'MDC')
304 return ext_event