Coverage for gwcelery/tasks/gcn.py: 82%

51 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""Tasks to send and receive Gamma-ray Coordinates Network [GCN]_ notices. 

2 

3References 

4---------- 

5.. [GCN] https://gcn.gsfc.nasa.gov 

6 

7""" 

8import difflib 

9import html 

10import urllib.parse 

11 

12import gcn 

13import lxml.etree 

14from comet.utility.xml import xml_document 

15from gcn import NoticeType, get_notice_type 

16from twisted.internet import reactor 

17 

18from .. import app 

19from ..voevent.signals import voevent_received 

20from . import gracedb 

21from .core import DispatchHandler 

22 

23 

24class _VOEventDispatchHandler(DispatchHandler): 

25 

26 def process_args(self, event): 

27 notice_type = get_notice_type(event.element) 

28 

29 # Just cast to enum for prettier log messages 

30 try: 

31 notice_type = NoticeType(notice_type) 

32 except ValueError: 

33 pass 

34 

35 return notice_type, (event.raw_bytes,), {} 

36 

37 

38handler = _VOEventDispatchHandler() 

39r"""Function decorator to register a handler callback for specified GCN notice 

40types. The decorated function is turned into a Celery task, which will be 

41automatically called whenever a matching GCN notice is received. 

42 

43Parameters 

44---------- 

45\*keys 

46 List of GCN notice types to accept 

47\*\*kwargs 

48 Additional keyword arguments for :meth:`celery.Celery.task`. 

49 

50Examples 

51-------- 

52Declare a new handler like this:: 

53 

54 @gcn.handler(gcn.NoticeType.FERMI_GBM_GND_POS, 

55 gcn.NoticeType.FERMI_GBM_FIN_POS) 

56 def handle_fermi(payload): 

57 root = lxml.etree.fromstring(payload) 

58 # do work here... 

59""" 

60 

61 

62@voevent_received.connect 

63def _on_voevent_received(xml_document, **kwargs): 

64 handler.dispatch(xml_document) 

65 

66 

67class SendingError(RuntimeError): 

68 """A generic error associated with sending VOEvents.""" 

69 

70 

71@app.task(autoretry_for=(SendingError,), bind=True, default_retry_delay=20.0, 

72 ignore_result=True, queue='voevent', retry_backoff=True, 

73 retry_kwargs=dict(max_retries=10), shared=False) 

74def send(self, message): 

75 """Send a VOEvent to GCN. 

76 

77 This task will be retried several times if the VOEvent cannot be sent. See 

78 the Raises section below for circumstances that cause a retry. 

79 

80 Parameters 

81 ---------- 

82 message : bytes 

83 The raw VOEvent file contents. 

84 

85 Raises 

86 ------ 

87 SendingError 

88 If the VOEvent could not be sent because there were no network peers 

89 connected to the VOEvent broadcaster. 

90 

91 """ 

92 broadcasters = self.app.conf['voevent_broadcaster_factory'].broadcasters 

93 if broadcasters: 

94 event = xml_document(message) 

95 for broadcaster in broadcasters: 

96 reactor.callFromThread(broadcaster.send_event, event) 

97 elif self.app.conf['voevent_broadcaster_whitelist']: 

98 raise SendingError('Not sending the event because there are no ' 

99 'subscribers connected to the GCN broker.') 

100 

101 

102@handler(gcn.NoticeType.LVC_EARLY_WARNING, 

103 gcn.NoticeType.LVC_PRELIMINARY, 

104 gcn.NoticeType.LVC_INITIAL, 

105 gcn.NoticeType.LVC_UPDATE, 

106 gcn.NoticeType.LVC_RETRACTION, 

107 bind=True, shared=False) 

108def validate(self, payload): 

109 """Validate LIGO/Virgo GCN notices. 

110 

111 Check that the contents of a public LIGO/Virgo GCN matches the original 

112 VOEvent in GraceDB. 

113 

114 Notes 

115 ----- 

116 If the VOEvent broadcaster is disabled by setting 

117 :obj:`~gwcelery.conf.voevent_broadcaster_whitelist` to an empty list, then 

118 this task becomes a no-op. 

119 

120 """ 

121 if not self.app.conf['voevent_broadcaster_whitelist']: 

122 return 

123 

124 root = lxml.etree.fromstring(payload) 

125 

126 # Which GraceDB ID does this refer to? 

127 graceid = root.find("./What/Param[@name='GraceID']").attrib['value'] 

128 

129 # Which VOEvent does this refer to? 

130 u = urllib.parse.urlparse(root.attrib['ivorn']) 

131 local_id = u.fragment 

132 filename = local_id + '.xml' 

133 

134 # Download and parse original VOEvent 

135 orig = gracedb.download(filename, graceid) 

136 

137 # Create a diff of the two VOEvents. 

138 diff = ''.join( 

139 difflib.unified_diff( 

140 *( 

141 [ 

142 line.decode('ascii', 'surrogateescape') 

143 for line in contents.splitlines(keepends=True) 

144 ] 

145 for contents in (orig, payload) 

146 ), 

147 fromfile='{} (sent)'.format(filename), 

148 tofile='{} (received)'.format(filename) 

149 ) 

150 ) 

151 

152 if diff: 

153 # Write a log message to indicate that the event differed. 

154 msg = 'VOEvent received from GCN differs from what we sent.' 

155 gracedb.upload.delay( 

156 None, None, graceid, 

157 '{}<pre>{}</pre>'.format(msg, html.escape(diff)), 

158 ['em_follow']) 

159 raise ValueError('{}\n\n{}'.format(msg, diff)) 

160 else: 

161 # Tag the VOEvent to indicate that it was received correctly. 

162 gracedb.create_tag.delay(filename, 'gcn_received', graceid)