Coverage for gwcelery/igwn_alert/bootsteps.py: 58%

31 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-08-16 20:59 +0000

1from threading import Thread 

2 

3from celery import bootsteps 

4from celery.utils.log import get_logger 

5from igwn_alert import client 

6 

7from .signals import igwn_alert_received 

8 

9__all__ = ('Receiver',) 

10 

11log = get_logger(__name__) 

12 

13 

14class IGWNAlertBootStep(bootsteps.ConsumerStep): 

15 """Generic boot step to limit us to appropriate kinds of workers. 

16 

17 Only include this bootstep in workers that are started with the 

18 ``--igwn-alerts`` command line option. 

19 """ 

20 

21 def __init__(self, consumer, igwn_alert=False, **kwargs): 

22 self.enabled = bool(igwn_alert) 

23 

24 def start(self, consumer): 

25 log.info('Starting %s', self.name) 

26 

27 def stop(self, consumer): 

28 log.info('Stopping %s', self.name) 

29 

30 

31def _send_igwn_alert(topic, payload): 

32 """Shim to send Celery signal.""" 

33 igwn_alert_received.send(None, topic=topic, payload=payload) 

34 

35 

36class Receiver(IGWNAlertBootStep): 

37 """Run the global IGWN alert receiver in background thread.""" 

38 

39 name = 'IGWN Alert client' 

40 

41 def start(self, consumer): 

42 super().start(consumer) 

43 

44 self._client = client( 

45 server=consumer.app.conf['igwn_alert_server'], 

46 noauth=consumer.app.conf['igwn_alert_noauth'], 

47 group=consumer.app.conf['igwn_alert_group']) 

48 self.thread = Thread( 

49 target=self._client.listen, 

50 args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']), 

51 name='IGWNReceiverThread') 

52 self.thread.start() 

53 

54 def stop(self, consumer): 

55 super().stop(consumer) 

56 if self._client.running: 

57 self._client.running = False 

58 self._client.stream_obj._consumer.stop() 

59 self.thread.join() 

60 

61 def info(self, consumer): 

62 return {'igwn-alert-topics': consumer.app.conf[ 

63 'igwn_alert_topics'].intersection(self._client.get_topics())}