Coverage for gwcelery/tasks/gcn.py: 82%
51 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-14 05:52 +0000
1"""Tasks to send and receive Gamma-ray Coordinates Network [GCN]_ notices.
3References
4----------
5.. [GCN] https://gcn.gsfc.nasa.gov
7"""
8import difflib
9import html
10import urllib.parse
12import gcn
13import lxml.etree
14from comet.utility.xml import xml_document
15from gcn import NoticeType, get_notice_type
16from twisted.internet import reactor
18from .. import app
19from ..voevent.signals import voevent_received
20from . import gracedb
21from .core import DispatchHandler
24class _VOEventDispatchHandler(DispatchHandler):
26 def process_args(self, event):
27 notice_type = get_notice_type(event.element)
29 # Just cast to enum for prettier log messages
30 try:
31 notice_type = NoticeType(notice_type)
32 except ValueError:
33 pass
35 return notice_type, (event.raw_bytes,), {}
38handler = _VOEventDispatchHandler()
39r"""Function decorator to register a handler callback for specified GCN notice
40types. The decorated function is turned into a Celery task, which will be
41automatically called whenever a matching GCN notice is received.
43Parameters
44----------
45\*keys
46 List of GCN notice types to accept
47\*\*kwargs
48 Additional keyword arguments for :meth:`celery.Celery.task`.
50Examples
51--------
52Declare a new handler like this::
54 @gcn.handler(gcn.NoticeType.FERMI_GBM_GND_POS,
55 gcn.NoticeType.FERMI_GBM_FIN_POS)
56 def handle_fermi(payload):
57 root = lxml.etree.fromstring(payload)
58 # do work here...
59"""
62@voevent_received.connect
63def _on_voevent_received(xml_document, **kwargs):
64 handler.dispatch(xml_document)
67class SendingError(RuntimeError):
68 """A generic error associated with sending VOEvents."""
71@app.task(autoretry_for=(SendingError,), bind=True, default_retry_delay=20.0,
72 ignore_result=True, queue='voevent', retry_backoff=True,
73 retry_kwargs=dict(max_retries=10), shared=False)
74def send(self, message):
75 """Send a VOEvent to GCN.
77 This task will be retried several times if the VOEvent cannot be sent. See
78 the Raises section below for circumstances that cause a retry.
80 Parameters
81 ----------
82 message : bytes
83 The raw VOEvent file contents.
85 Raises
86 ------
87 SendingError
88 If the VOEvent could not be sent because there were no network peers
89 connected to the VOEvent broadcaster.
91 """
92 broadcasters = self.app.conf['voevent_broadcaster_factory'].broadcasters
93 if broadcasters:
94 event = xml_document(message)
95 for broadcaster in broadcasters:
96 reactor.callFromThread(broadcaster.send_event, event)
97 elif self.app.conf['voevent_broadcaster_whitelist']:
98 raise SendingError('Not sending the event because there are no '
99 'subscribers connected to the GCN broker.')
102@handler(gcn.NoticeType.LVC_EARLY_WARNING,
103 gcn.NoticeType.LVC_PRELIMINARY,
104 gcn.NoticeType.LVC_INITIAL,
105 gcn.NoticeType.LVC_UPDATE,
106 gcn.NoticeType.LVC_RETRACTION,
107 bind=True, shared=False)
108def validate(self, payload):
109 """Validate LIGO/Virgo GCN notices.
111 Check that the contents of a public LIGO/Virgo GCN matches the original
112 VOEvent in GraceDB.
114 Notes
115 -----
116 If the VOEvent broadcaster is disabled by setting
117 :obj:`~gwcelery.conf.voevent_broadcaster_whitelist` to an empty list, then
118 this task becomes a no-op.
120 """
121 if not self.app.conf['voevent_broadcaster_whitelist']:
122 return
124 root = lxml.etree.fromstring(payload)
126 # Which GraceDB ID does this refer to?
127 graceid = root.find("./What/Param[@name='GraceID']").attrib['value']
129 # Which VOEvent does this refer to?
130 u = urllib.parse.urlparse(root.attrib['ivorn'])
131 local_id = u.fragment
132 filename = local_id + '.xml'
134 # Download and parse original VOEvent
135 orig = gracedb.download(filename, graceid)
137 # Create a diff of the two VOEvents.
138 diff = ''.join(
139 difflib.unified_diff(
140 *(
141 [
142 line.decode('ascii', 'surrogateescape')
143 for line in contents.splitlines(keepends=True)
144 ]
145 for contents in (orig, payload)
146 ),
147 fromfile='{} (sent)'.format(filename),
148 tofile='{} (received)'.format(filename)
149 )
150 )
152 if diff:
153 # Write a log message to indicate that the event differed.
154 msg = 'VOEvent received from GCN differs from what we sent.'
155 gracedb.upload.delay(
156 None, None, graceid,
157 '{}<pre>{}</pre>'.format(msg, html.escape(diff)),
158 ['em_follow'])
159 raise ValueError('{}\n\n{}'.format(msg, diff))
160 else:
161 # Tag the VOEvent to indicate that it was received correctly.
162 gracedb.create_tag.delay(filename, 'gcn_received', graceid)