Coverage for gwcelery/tasks/igwn_alert.py: 97%

36 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-14 05:52 +0000

1"""IGWN alert client.""" 

2import json 

3 

4from celery.utils.log import get_task_logger 

5 

6from .. import app 

7from ..igwn_alert.signals import igwn_alert_received 

8from . import gracedb 

9from .core import DispatchHandler 

10 

11log = get_task_logger(__name__) 

12 

13 

14class _IGWNAlertDispatchHandler(DispatchHandler): 

15 def __call__(self, *keys, **kwargs): 

16 try: 

17 igwn_alert_topics = app.conf['igwn_alert_topics'] 

18 except KeyError: 

19 igwn_alert_topics = app.conf['igwn_alert_topics'] = set() 

20 igwn_alert_topics.update(keys) 

21 return super().__call__(*keys, **kwargs) 

22 

23 def process_args(self, topic, alert): 

24 alert = json.loads(alert) 

25 # Determine GraceDB service URL 

26 try: 

27 try: 

28 self_link = alert['object']['links']['self'] 

29 except KeyError: 

30 self_link = alert['object']['self'] 

31 except KeyError: 

32 log.exception( 

33 'IGWN alert message does not contain an API URL: %r', 

34 alert) 

35 return None, None, None 

36 base, api, _ = self_link.partition('/api/') 

37 service = base + api 

38 

39 if service != gracedb.client.url: 

40 # FIXME: this is probably redundant since IGWN alert client 

41 # is initialized using group gracedb-playground, gracedb-test etc. 

42 log.warning( 

43 'ignoring IGWN alert message because it is intended for ' 

44 'GraceDB server %s, but we are set up for server %s', 

45 service, gracedb.client.url) 

46 return None, None, None 

47 

48 return super().process_args(topic, alert) 

49 

50 

51handler = _IGWNAlertDispatchHandler() 

52r"""Function decorator to register a handler callback for specified IGWN alert 

53message types. The decorated function is turned into a Celery task, which will 

54be automatically called whenever a matching IGWN alert message is received. 

55 

56Parameters 

57---------- 

58\*keys 

59 List of IGWN alert message types to accept 

60\*\*kwargs 

61 Additional keyword arguments for :meth:`celery.Celery.task`. 

62 

63Examples 

64-------- 

65Declare a new handler like this:: 

66 

67 @igwn_alert.handler('cbc_gstlal', 

68 'cbc_spiir', 

69 'cbc_pycbc', 

70 'cbc_mbta') 

71 def handle_cbc(alert_content): 

72 # do work here... 

73""" 

74 

75 

76@igwn_alert_received.connect 

77def _on_igwn_received(topic, payload, **kwargs): 

78 handler.dispatch(topic, payload)