Coverage for gwcelery/tasks/notice_text.py: 15%
107 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Tasks to validate the GCN Notice types of the e-mail formats [GCN e-mail]_.
3References
4----------
5.. [GCN e-mail] https://gcn.gsfc.nasa.gov/lvc.html#tc13
7"""
8import email
9import email.policy
10from math import isclose
12import lxml.etree
13from celery.utils.log import get_task_logger
15from .. import app
16from ..email.signals import email_received
17from . import gracedb
19log = get_task_logger(__name__)
22def _trigger_datetime(gcn_notice_mail):
23 """Get trigger data and time from a GCN email notice."""
25 # We now add Z to ISOtime
26 # TRIGGER_DATE: 20123 TJD; 179 DOY; 2023/06/28 (yyyy/mm/dd)
27 # TRIGGER_TIME: 83520.000000 SOD {23:12:00.000000} UT
28 # <ISOTime>2023-06-28T23:12:00Z</ISOTime>
29 trigger_date = gcn_notice_mail[
30 "TRIGGER_DATE"].split()[4].replace("/", "-")
32 # FIXME: replace with a regular expression.
33 trigger_time = gcn_notice_mail["TRIGGER_TIME"].split()[2]
34 trigger_time = trigger_time.replace("{", "").replace("}", "")
35 trigger_time = trigger_time.split('.')[0]
37 trigger_datetime = (f'{trigger_date}T{trigger_time}Z')
39 return trigger_datetime
42def _vo_match_notice(gcn_notice_mail, params_vo, trigger_time_vo):
43 """Match the notice-email and the VOtable keywords."""
44 dict_checks = {}
46 # TRIGGER_DATE+TRIGGER_TIME
47 trigger_datetime_notice_mail = _trigger_datetime(gcn_notice_mail)
49 match_trigger_datetime = (
50 trigger_datetime_notice_mail == trigger_time_vo)
51 dict_checks['TRIGGER_DATETIME'] = match_trigger_datetime
53 # SEQUENCE_NUM
54 match_sequence_num = (
55 gcn_notice_mail["SEQUENCE_NUM"].split()[0] == params_vo["Pkt_Ser_Num"])
56 dict_checks['SEQUENCE_NUM'] = match_sequence_num
58 if params_vo['AlertType'] == 'Retraction':
59 return dict_checks
61 # Notice keywords
62 notice_keys = ({"types": ["GROUP_TYPE", "PIPELINE_TYPE", "SEARCH_TYPE"],
63 "classif_props_cbc": ["PROB_NS", "PROB_REMNANT",
64 "PROB_BNS", "PROB_NSBH", "PROB_BBH",
65 "PROB_TERRES"],
66 "urls": ["SKYMAP_FITS_URL", "EVENTPAGE_URL"],
67 "classif_props_burst": ["CENTRAL_FREQ", "DURATION"]})
69 # Votable keywords
70 vo_keys = ({"types": ["Group", "Pipeline", "Search"],
71 "classif_props_cbc": ["HasNS", "HasRemnant", "BNS",
72 "NSBH", "BBH", "Terrestrial"],
73 "urls": ["skymap_fits", "EventPage"],
74 "classif_props_burst": ["CentralFreq", "Duration"]})
76 # FAR
77 far_notice = float(gcn_notice_mail["FAR"].split()[0])
78 match_far = isclose(far_notice, float(params_vo["FAR"]), rel_tol=0.001)
79 dict_checks['FAR'] = match_far
81 # Group and pipeline types
82 for notice_key, vo_key in zip(notice_keys["types"], vo_keys["types"]):
83 value_notice = gcn_notice_mail[notice_key].split()[2]
84 match = (value_notice == params_vo[vo_key])
85 dict_checks[notice_key] = match
87 # EventPage/EVENTPAGE_URL and skymap_fits/SKYMAP_FITS_URL
88 for notice_key, vo_key in zip(notice_keys["urls"], vo_keys["urls"]):
89 value_notice = gcn_notice_mail[notice_key]
90 match = (value_notice == params_vo[vo_key])
91 dict_checks[notice_key] = match
93 # CBC classification and properties
94 if params_vo['Group'] == 'CBC':
95 for notice_key, vo_key, in zip(notice_keys["classif_props_cbc"],
96 vo_keys["classif_props_cbc"]):
97 value_notice = float(gcn_notice_mail[notice_key].split()[0])
98 match = isclose(value_notice, float(params_vo[vo_key]),
99 abs_tol=0.01)
100 dict_checks[notice_key] = match
102 # Burst Properties
103 if params_vo['Group'] == 'Burst':
104 for notice_key, vo_key in zip(notice_keys["classif_props_burst"],
105 vo_keys["classif_props_burst"]):
106 value_notice = float(gcn_notice_mail[notice_key].split()[0])
107 match = isclose(value_notice,
108 float(params_vo[vo_key]), rel_tol=0.001)
109 dict_checks[notice_key] = match
111 return dict_checks
114def _vo_match_comments(gcn_notice_mail, params_vo):
115 """Check the notice-email comments for the contributed instruments."""
116 dict_check_comments = {}
118 comments_notice_mail = gcn_notice_mail.get_all("COMMENTS")
119 instruments_vo = params_vo["Instruments"]
121 text = ' contributed to this candidate event.'
122 gcn_to_vo_instruments = {'LIGO-Hanford Observatory': 'H1',
123 'LIGO-Livingston Observatory': 'L1',
124 'VIRGO Observatory': 'V1'}
126 instrument_comments = (line.strip() for line in comments_notice_mail)
127 instruments_gcn = {gcn_to_vo_instruments[line[:-len(text)]]
128 for line in instrument_comments if line.endswith(text)}
130 instruments_vo = set(instruments_vo.split(','))
131 match_instruments = (instruments_gcn == instruments_vo)
132 dict_check_comments["INSTRUMENT"] = match_instruments
134 return dict_check_comments
137@email_received.connect
138def on_email_received(rfc822, **kwargs):
139 """Read the RFC822 email."""
140 message = email.message_from_bytes(rfc822, policy=email.policy.default)
141 validate_text_notice.s(message).delay()
144@app.task(shared=False)
145def validate_text_notice(message):
146 """Validate LIGO/Virgo GCN e-mail notice format.
148 Check that the contents of a public LIGO/Virgo GCN e-mail notice format
149 matches the original VOEvent in GraceDB.
151 """
152 # Filter from address and subject
153 if message['From'] != 'Bacodine <vxw@capella2.gsfc.nasa.gov>':
154 log.info('Email is not from BACODINE. Subject:%s', message['Subject'])
155 log.info('Sender is: %s', message['From'])
156 return
158 # Write message log
159 log.info('Validating Notice: Subject:%s', message['Subject'])
161 # Parse body email
162 bodymsg = message.get_payload()
163 notice = email.message_from_string(bodymsg)
165 # Get notice type
166 notice_type = notice['NOTICE_TYPE']
168 if notice_type.split(" ")[-1] == "Skymap":
169 notice_type = notice_type.split(" ")[-2]
170 else:
171 notice_type = notice_type.split(" ")[-1]
173 # GCN e-mail notice type for EarlyWarning is Early_Warning
174 # while we have the ivo://gwnet/LVC#S231030av-1-EarlyWarning
175 # No underscore in ivo Fix IT
176 notice_type = notice_type.replace('_', '')
178 # Get gracedb id and sequence number
179 trigger_num = notice['TRIGGER_NUM']
180 sequence_num = notice['SEQUENCE_NUM']
182 # Download VOevent
183 filename = f'{trigger_num}-{sequence_num}-{notice_type}.xml'
184 payload = gracedb.download(filename, trigger_num)
186 # Parse VOevent
187 root = lxml.etree.fromstring(payload)
189 params_vo = {elem.attrib['name']:
190 elem.attrib['value']
191 for elem in root.iterfind('.//Param')}
193 trigger_time_vo = root.findtext('.//ISOTime')
195 # Match
196 filename_email = 'email_' + filename.replace('.xml', '.txt')
197 gracedb.upload.delay(bodymsg, filename_email, trigger_num,
198 'email notice corresponding to ' + filename,
199 tags=['em_follow'])
201 error = None
202 try:
203 if notice_type == 'Retraction':
204 match = _vo_match_notice(notice, params_vo, trigger_time_vo)
205 elif params_vo['Group'] in ["CBC", "Burst"]:
206 match = {**_vo_match_notice(notice, params_vo, trigger_time_vo),
207 **_vo_match_comments(notice, params_vo)}
208 else:
209 match = {}
210 error = f'Email notice {filename} has unknown notice type'
212 mismatched = ' '.join(key for key, value in match.items() if not value)
213 if mismatched:
214 error = \
215 f'Email notice {filename} has mismatched keys: {mismatched}'
216 except KeyError as err:
217 # Since there was an exeception, the gcn was not annnotated
218 error = f'Email notice {filename} missing key: {err}'
219 except Exception as err:
220 # Since there are other possible exceptions
221 # we also catch generic error not to stop exection
222 # and record the exception
223 error = f'Email notice {filename} generated exception: {err}'
225 if error:
226 gracedb.create_tag.delay(filename, 'gcn_email_notok', trigger_num)
227 gracedb.upload.delay(None, None, trigger_num,
228 error, tags=['em_follow'])
229 else:
230 gracedb.create_tag.delay(filename, 'gcn_email_ok', trigger_num)