Coverage for gwcelery/tools/nagios.py: 95%
147 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""A Nagios plugin for monitoring GWCelery.
3See https://nagios-plugins.org/doc/guidelines.html.
4"""
5import glob
6from enum import IntEnum
7from pathlib import Path
8from sys import exit
9from traceback import format_exc, format_exception
11import click
12import kombu.exceptions
13import lal
14import numpy as np
15from gwpy.time import tconvert
17# Make sure that all tasks are registered
18from .. import tasks # noqa: F401
21class NagiosPluginStatus(IntEnum):
22 """Nagios plugin status codes."""
24 OK = 0
25 WARNING = 1
26 CRITICAL = 2
27 UNKNOWN = 3
30class NagiosCriticalError(Exception):
31 """An exception that maps to a Nagios status of `CRITICAL`."""
34def get_active_queues(inspector):
35 return {queue['name']
36 for queues in (inspector.active_queues() or {}).values()
37 for queue in queues}
40def get_active_igwn_alert_topics(inspector):
41 return {node for stat in inspector.stats().values()
42 for node in stat.get('igwn-alert-topics', ())}
45def get_expected_queues(app):
46 # Get the queues for all registered tasks.
47 result = {getattr(task, 'queue', None) for task in app.tasks.values()}
48 # We use 'celery' for all tasks that do not explicitly specify a queue.
49 result -= {None}
50 result |= {'celery'}
51 # Done.
52 return result
55def get_expected_igwn_alert_topics(app):
56 return app.conf['igwn_alert_topics']
59def get_active_voevent_peers(inspector):
60 stats = inspector.stats()
61 broker_peers, receiver_peers = (
62 {peer for stat in stats.values() for peer in stat.get(key, ())}
63 for key in ['voevent-broker-peers', 'voevent-receiver-peers'])
64 return broker_peers, receiver_peers
67def get_expected_kafka_bootstep_urls(inspector):
68 stats = inspector.stats()
69 expected_kafka_urls = \
70 {peer for stat in stats.values() for peer in
71 stat.get('kafka_topic_up', {})}
72 return expected_kafka_urls
75def get_active_kafka_bootstep_urls(inspector):
76 stats = inspector.stats()
77 active_kafka_urls = \
78 {kafka_url for stat in stats.values() for kafka_url, active_flag in
79 stat.get('kafka_topic_up', {}).items() if active_flag}
80 return active_kafka_urls
83def get_undelivered_message_urls(inspector):
84 stats = inspector.stats()
85 undelievered_messages = \
86 {kafka_url for stat in stats.values() for kafka_url, active_flag in
87 stat.get('kafka_delivery_failures', {}).items() if active_flag}
88 return undelievered_messages
91def get_active_kafka_consumer_bootstep_names(inspector):
92 stats = inspector.stats()
93 active_kafka_consumer_urls = {consumer for stat in stats.values() for
94 consumer in stat.get(
95 'active_kafka_consumers', ()
96 )}
97 return active_kafka_consumer_urls
100def get_expected_kafka_consumer_bootstep_names(app):
101 return {name for name in app.conf['kafka_consumer_config'].keys()}
104def get_celery_queue_length(app):
105 return app.backend.client.llen("celery")
108def get_recent_mdc_superevents():
109 """Get MDC superevents in last six hours"""
110 t_upper = lal.GPSTimeNow()
111 t_lower = t_upper - 6 * 3600
112 query = "{} .. {} {}".format(t_lower, t_upper, 'MDC')
113 recent_superevents = tasks.gracedb.get_superevents(query)
114 return recent_superevents, t_lower, t_upper
117def get_distr_delay_latest_llhoft(app):
118 """Get the GPS time of the latest llhoft data distributed to the node"""
119 detectors = ['H1', 'L1']
120 max_delays = {}
122 now = int(lal.GPSTimeNow())
123 for ifo in detectors:
124 pattern = app.conf['llhoft_glob'].format(detector=ifo)
125 filenames = sorted(glob.glob(pattern))
126 try:
127 latest_gps = int(filenames[-1].split('-')[-2])
128 max_delays[ifo] = now - latest_gps
129 except IndexError:
130 max_delays[ifo] = 999999999
132 return max_delays
135def check_status(app):
136 # Check if '/dev/shm/kafka/' exists, otherwise skip
137 if Path(app.conf['llhoft_glob']).parents[1].exists():
138 max_llhoft_delays = get_distr_delay_latest_llhoft(app)
139 max_delay = 10 * 60 # 10 minutes of no llhoft is worrying
140 if any(np.array(list(max_llhoft_delays.values())) > max_delay):
141 raise NagiosCriticalError(
142 'Low-latency hoft is not being streamed') \
143 from AssertionError(
144 f"Newest llhoft is this many seconds old: "
145 f"{str(max_llhoft_delays)}")
147 connection = app.connection()
148 try:
149 connection.ensure_connection(max_retries=1)
150 except kombu.exceptions.OperationalError as e:
151 raise NagiosCriticalError('No connection to broker') from e
153 inspector = app.control.inspect()
155 active = get_active_queues(inspector)
156 expected = get_expected_queues(app)
157 missing = expected - active
158 if missing:
159 raise NagiosCriticalError('Not all expected queues are active') from \
160 AssertionError('Missing queues: ' + ', '.join(missing))
162 active = get_active_igwn_alert_topics(inspector)
163 expected = get_expected_igwn_alert_topics(app)
164 missing = expected - active
165 extra = active - expected
166 if missing:
167 raise NagiosCriticalError('Not all IGWN alert topics are subscribed') \
168 from AssertionError('Missing topics: ' + ', '.join(missing))
169 if extra:
170 raise NagiosCriticalError(
171 'Too many IGWN alert topics are subscribed') from AssertionError(
172 'Extra topics: ' + ', '.join(extra))
174 broker_peers, receiver_peers = get_active_voevent_peers(inspector)
175 if app.conf['voevent_broadcaster_whitelist'] and not broker_peers:
176 raise NagiosCriticalError(
177 'The VOEvent broker has no active connections') \
178 from AssertionError('voevent_broadcaster_whitelist: {}'.format(
179 app.conf['voevent_broadcaster_whitelist']))
180 if app.conf['voevent_receiver_address'] and not receiver_peers:
181 raise NagiosCriticalError(
182 'The VOEvent receiver has no active connections') \
183 from AssertionError('voevent_receiver_address: {}'.format(
184 app.conf['voevent_receiver_address']))
186 active = get_active_kafka_bootstep_urls(inspector)
187 expected = get_expected_kafka_bootstep_urls(inspector)
188 missing = expected - active
189 if missing:
190 raise NagiosCriticalError('Not all Kafka bootstep URLs are active') \
191 from AssertionError('Missing urls: ' + ', '.join(missing))
193 undelivered_messages = get_undelivered_message_urls(inspector)
194 if undelivered_messages:
195 raise NagiosCriticalError(
196 'Not all Kafka messages have been succesfully delivered'
197 ) from AssertionError(
198 'URLs with undelivered messages: ' + ', '.join(missing)
199 )
201 celery_queue_length = get_celery_queue_length(app)
202 if celery_queue_length > 50:
203 raise NagiosCriticalError(
204 'Tasks are piled up in Celery queue') from AssertionError(
205 'Length of celery queue is {}'.format(celery_queue_length))
207 recent_mdc_superevents, t_lower, t_now = get_recent_mdc_superevents()
208 no_superevents = len(recent_mdc_superevents) == 0
209 to_utc = lambda t: tconvert(t).isoformat() # noqa E731
210 if no_superevents:
211 raise NagiosCriticalError(
212 'No MDC superevents found in past six hours') \
213 from AssertionError(
214 f'Last entry earlier than GPSTime {t_lower} = '
215 f'{to_utc(t_lower)} UTC')
216 last_superevent = recent_mdc_superevents[0]
217 # check presence in last hour with a tolerance
218 none_in_last_hour = (
219 t_now - tconvert(last_superevent['created'])
220 ) > (3600 + 600)
221 if none_in_last_hour:
222 raise NagiosCriticalError(
223 'No MDC superevents found in last one hour') \
224 from AssertionError(
225 f"Last entry is for {last_superevent['superevent_id']}"
226 f"GPSTime {tconvert(last_superevent['created'])} ="
227 f"{last_superevent['created']}")
229 active = get_active_kafka_consumer_bootstep_names(inspector)
230 expected = get_expected_kafka_consumer_bootstep_names(app)
231 missing = expected - active
232 if missing:
233 raise NagiosCriticalError('Not all Kafka consumer bootstep topics are '
234 'active') \
235 from AssertionError('Missing urls: ' + ', '.join(missing))
238@click.command(help=__doc__)
239@click.pass_context
240def nagios(ctx):
241 try:
242 check_status(ctx.obj.app)
243 except NagiosCriticalError as e:
244 status = NagiosPluginStatus.CRITICAL
245 output, = e.args
246 e = e.__cause__
247 detail = ''.join(format_exception(type(e), e, e.__traceback__))
248 except: # noqa: E722
249 status = NagiosPluginStatus.UNKNOWN
250 output = 'Unexpected error'
251 detail = format_exc()
252 else:
253 status = NagiosPluginStatus.OK
254 output = 'Running normally'
255 detail = None
256 print('{}: {}'.format(status.name, output))
257 if detail:
258 print(detail)
259 exit(status)