Coverage for gwcelery/igwn_alert/bootsteps.py: 58%
31 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1from threading import Thread
3from celery import bootsteps
4from celery.utils.log import get_logger
5from igwn_alert import client
7from .signals import igwn_alert_received
9__all__ = ('Receiver',)
11log = get_logger(__name__)
14class IGWNAlertBootStep(bootsteps.ConsumerStep):
15 """Generic boot step to limit us to appropriate kinds of workers.
17 Only include this bootstep in workers that are started with the
18 ``--igwn-alerts`` command line option.
19 """
21 def __init__(self, consumer, igwn_alert=False, **kwargs):
22 self.enabled = bool(igwn_alert)
24 def start(self, consumer):
25 log.info('Starting %s', self.name)
27 def stop(self, consumer):
28 log.info('Stopping %s', self.name)
31def _send_igwn_alert(topic, payload):
32 """Shim to send Celery signal."""
33 igwn_alert_received.send(None, topic=topic, payload=payload)
36class Receiver(IGWNAlertBootStep):
37 """Run the global IGWN alert receiver in background thread."""
39 name = 'IGWN Alert client'
41 def start(self, consumer):
42 super().start(consumer)
44 self._client = client(
45 server=consumer.app.conf['igwn_alert_server'],
46 noauth=consumer.app.conf['igwn_alert_noauth'],
47 group=consumer.app.conf['igwn_alert_group'])
48 self.thread = Thread(
49 target=self._client.listen,
50 args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']),
51 name='IGWNReceiverThread')
52 self.thread.start()
54 def stop(self, consumer):
55 super().stop(consumer)
56 self._client.fatal_restart_running = False
57 self._client.listening = False
58 self._client.listen_stream._consumer.stop()
59 self.thread.join()
61 def info(self, consumer):
62 return {'igwn-alert-topics': consumer.app.conf[
63 'igwn_alert_topics'].intersection(self._client.get_topics())}