Coverage for gwcelery/tasks/igwn_alert.py: 97%
36 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""IGWN alert client."""
2import json
4from celery.utils.log import get_task_logger
6from .. import app
7from ..igwn_alert.signals import igwn_alert_received
8from . import gracedb
9from .core import DispatchHandler
11log = get_task_logger(__name__)
14class _IGWNAlertDispatchHandler(DispatchHandler):
15 def __call__(self, *keys, **kwargs):
16 try:
17 igwn_alert_topics = app.conf['igwn_alert_topics']
18 except KeyError:
19 igwn_alert_topics = app.conf['igwn_alert_topics'] = set()
20 igwn_alert_topics.update(keys)
21 return super().__call__(*keys, **kwargs)
23 def process_args(self, topic, alert):
24 alert = json.loads(alert)
25 # Determine GraceDB service URL
26 try:
27 try:
28 self_link = alert['object']['links']['self']
29 except KeyError:
30 self_link = alert['object']['self']
31 except KeyError:
32 log.exception(
33 'IGWN alert message does not contain an API URL: %r',
34 alert)
35 return None, None, None
36 base, api, _ = self_link.partition('/api/')
37 service = base + api
39 if service != gracedb.client.url:
40 # FIXME: this is probably redundant since IGWN alert client
41 # is initialized using group gracedb-playground, gracedb-test etc.
42 log.warning(
43 'ignoring IGWN alert message because it is intended for '
44 'GraceDB server %s, but we are set up for server %s',
45 service, gracedb.client.url)
46 return None, None, None
48 return super().process_args(topic, alert)
51handler = _IGWNAlertDispatchHandler()
52r"""Function decorator to register a handler callback for specified IGWN alert
53message types. The decorated function is turned into a Celery task, which will
54be automatically called whenever a matching IGWN alert message is received.
56Parameters
57----------
58\*keys
59 List of IGWN alert message types to accept
60\*\*kwargs
61 Additional keyword arguments for :meth:`celery.Celery.task`.
63Examples
64--------
65Declare a new handler like this::
67 @igwn_alert.handler('cbc_gstlal',
68 'cbc_spiir',
69 'cbc_pycbc',
70 'cbc_mbta')
71 def handle_cbc(alert_content):
72 # do work here...
73"""
76@igwn_alert_received.connect
77def _on_igwn_received(topic, payload, **kwargs):
78 handler.dispatch(topic, payload)