Coverage for gwcelery/tools/nagios.py: 95%

147 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-17 06:48 +0000

1"""A Nagios plugin for monitoring GWCelery. 

2 

3See https://nagios-plugins.org/doc/guidelines.html. 

4""" 

5import glob 

6from enum import IntEnum 

7from pathlib import Path 

8from sys import exit 

9from traceback import format_exc, format_exception 

10 

11import click 

12import kombu.exceptions 

13import lal 

14import numpy as np 

15from gwpy.time import tconvert 

16 

17# Make sure that all tasks are registered 

18from .. import tasks # noqa: F401 

19 

20 

21class NagiosPluginStatus(IntEnum): 

22 """Nagios plugin status codes.""" 

23 

24 OK = 0 

25 WARNING = 1 

26 CRITICAL = 2 

27 UNKNOWN = 3 

28 

29 

30class NagiosCriticalError(Exception): 

31 """An exception that maps to a Nagios status of `CRITICAL`.""" 

32 

33 

34def get_active_queues(inspector): 

35 return {queue['name'] 

36 for queues in (inspector.active_queues() or {}).values() 

37 for queue in queues} 

38 

39 

40def get_active_igwn_alert_topics(inspector): 

41 return {node for stat in inspector.stats().values() 

42 for node in stat.get('igwn-alert-topics', ())} 

43 

44 

45def get_expected_queues(app): 

46 # Get the queues for all registered tasks. 

47 result = {getattr(task, 'queue', None) for task in app.tasks.values()} 

48 # We use 'celery' for all tasks that do not explicitly specify a queue. 

49 result -= {None} 

50 result |= {'celery'} 

51 # Done. 

52 return result 

53 

54 

55def get_expected_igwn_alert_topics(app): 

56 return app.conf['igwn_alert_topics'] 

57 

58 

59def get_active_voevent_peers(inspector): 

60 stats = inspector.stats() 

61 broker_peers, receiver_peers = ( 

62 {peer for stat in stats.values() for peer in stat.get(key, ())} 

63 for key in ['voevent-broker-peers', 'voevent-receiver-peers']) 

64 return broker_peers, receiver_peers 

65 

66 

67def get_expected_kafka_bootstep_urls(inspector): 

68 stats = inspector.stats() 

69 expected_kafka_urls = \ 

70 {peer for stat in stats.values() for peer in 

71 stat.get('kafka_topic_up', {})} 

72 return expected_kafka_urls 

73 

74 

75def get_active_kafka_bootstep_urls(inspector): 

76 stats = inspector.stats() 

77 active_kafka_urls = \ 

78 {kafka_url for stat in stats.values() for kafka_url, active_flag in 

79 stat.get('kafka_topic_up', {}).items() if active_flag} 

80 return active_kafka_urls 

81 

82 

83def get_undelivered_message_urls(inspector): 

84 stats = inspector.stats() 

85 undelievered_messages = \ 

86 {kafka_url for stat in stats.values() for kafka_url, active_flag in 

87 stat.get('kafka_delivery_failures', {}).items() if active_flag} 

88 return undelievered_messages 

89 

90 

91def get_active_kafka_consumer_bootstep_names(inspector): 

92 stats = inspector.stats() 

93 active_kafka_consumer_urls = {consumer for stat in stats.values() for 

94 consumer in stat.get( 

95 'active_kafka_consumers', () 

96 )} 

97 return active_kafka_consumer_urls 

98 

99 

100def get_expected_kafka_consumer_bootstep_names(app): 

101 return {name for name in app.conf['kafka_consumer_config'].keys()} 

102 

103 

104def get_celery_queue_length(app): 

105 return app.backend.client.llen("celery") 

106 

107 

108def get_recent_mdc_superevents(): 

109 """Get MDC superevents in last six hours""" 

110 t_upper = lal.GPSTimeNow() 

111 t_lower = t_upper - 6 * 3600 

112 query = "{} .. {} {}".format(t_lower, t_upper, 'MDC') 

113 recent_superevents = tasks.gracedb.get_superevents(query) 

114 return recent_superevents, t_lower, t_upper 

115 

116 

117def get_distr_delay_latest_llhoft(app): 

118 """Get the GPS time of the latest llhoft data distributed to the node""" 

119 detectors = ['H1', 'L1'] 

120 max_delays = {} 

121 

122 now = int(lal.GPSTimeNow()) 

123 for ifo in detectors: 

124 pattern = app.conf['llhoft_glob'].format(detector=ifo) 

125 filenames = sorted(glob.glob(pattern)) 

126 try: 

127 latest_gps = int(filenames[-1].split('-')[-2]) 

128 max_delays[ifo] = now - latest_gps 

129 except IndexError: 

130 max_delays[ifo] = 999999999 

131 

132 return max_delays 

133 

134 

135def check_status(app): 

136 # Check if '/dev/shm/kafka/' exists, otherwise skip 

137 if Path(app.conf['llhoft_glob']).parents[1].exists(): 

138 max_llhoft_delays = get_distr_delay_latest_llhoft(app) 

139 max_delay = 10 * 60 # 10 minutes of no llhoft is worrying 

140 if any(np.array(list(max_llhoft_delays.values())) > max_delay): 

141 raise NagiosCriticalError( 

142 'Low-latency hoft is not being streamed') \ 

143 from AssertionError( 

144 f"Newest llhoft is this many seconds old: " 

145 f"{str(max_llhoft_delays)}") 

146 

147 connection = app.connection() 

148 try: 

149 connection.ensure_connection(max_retries=1) 

150 except kombu.exceptions.OperationalError as e: 

151 raise NagiosCriticalError('No connection to broker') from e 

152 

153 inspector = app.control.inspect() 

154 

155 active = get_active_queues(inspector) 

156 expected = get_expected_queues(app) 

157 missing = expected - active 

158 if missing: 

159 raise NagiosCriticalError('Not all expected queues are active') from \ 

160 AssertionError('Missing queues: ' + ', '.join(missing)) 

161 

162 active = get_active_igwn_alert_topics(inspector) 

163 expected = get_expected_igwn_alert_topics(app) 

164 missing = expected - active 

165 extra = active - expected 

166 if missing: 

167 raise NagiosCriticalError('Not all IGWN alert topics are subscribed') \ 

168 from AssertionError('Missing topics: ' + ', '.join(missing)) 

169 if extra: 

170 raise NagiosCriticalError( 

171 'Too many IGWN alert topics are subscribed') from AssertionError( 

172 'Extra topics: ' + ', '.join(extra)) 

173 

174 broker_peers, receiver_peers = get_active_voevent_peers(inspector) 

175 if app.conf['voevent_broadcaster_whitelist'] and not broker_peers: 

176 raise NagiosCriticalError( 

177 'The VOEvent broker has no active connections') \ 

178 from AssertionError('voevent_broadcaster_whitelist: {}'.format( 

179 app.conf['voevent_broadcaster_whitelist'])) 

180 if app.conf['voevent_receiver_address'] and not receiver_peers: 

181 raise NagiosCriticalError( 

182 'The VOEvent receiver has no active connections') \ 

183 from AssertionError('voevent_receiver_address: {}'.format( 

184 app.conf['voevent_receiver_address'])) 

185 

186 active = get_active_kafka_bootstep_urls(inspector) 

187 expected = get_expected_kafka_bootstep_urls(inspector) 

188 missing = expected - active 

189 if missing: 

190 raise NagiosCriticalError('Not all Kafka bootstep URLs are active') \ 

191 from AssertionError('Missing urls: ' + ', '.join(missing)) 

192 

193 undelivered_messages = get_undelivered_message_urls(inspector) 

194 if undelivered_messages: 

195 raise NagiosCriticalError( 

196 'Not all Kafka messages have been succesfully delivered' 

197 ) from AssertionError( 

198 'URLs with undelivered messages: ' + ', '.join(missing) 

199 ) 

200 

201 celery_queue_length = get_celery_queue_length(app) 

202 if celery_queue_length > 50: 

203 raise NagiosCriticalError( 

204 'Tasks are piled up in Celery queue') from AssertionError( 

205 'Length of celery queue is {}'.format(celery_queue_length)) 

206 

207 recent_mdc_superevents, t_lower, t_now = get_recent_mdc_superevents() 

208 no_superevents = len(recent_mdc_superevents) == 0 

209 to_utc = lambda t: tconvert(t).isoformat() # noqa E731 

210 if no_superevents: 

211 raise NagiosCriticalError( 

212 'No MDC superevents found in past six hours') \ 

213 from AssertionError( 

214 f'Last entry earlier than GPSTime {t_lower} = ' 

215 f'{to_utc(t_lower)} UTC') 

216 last_superevent = recent_mdc_superevents[0] 

217 # check presence in last hour with a tolerance 

218 none_in_last_hour = ( 

219 t_now - tconvert(last_superevent['created']) 

220 ) > (3600 + 600) 

221 if none_in_last_hour: 

222 raise NagiosCriticalError( 

223 'No MDC superevents found in last one hour') \ 

224 from AssertionError( 

225 f"Last entry is for {last_superevent['superevent_id']}" 

226 f"GPSTime {tconvert(last_superevent['created'])} =" 

227 f"{last_superevent['created']}") 

228 

229 active = get_active_kafka_consumer_bootstep_names(inspector) 

230 expected = get_expected_kafka_consumer_bootstep_names(app) 

231 missing = expected - active 

232 if missing: 

233 raise NagiosCriticalError('Not all Kafka consumer bootstep topics are ' 

234 'active') \ 

235 from AssertionError('Missing urls: ' + ', '.join(missing)) 

236 

237 

238@click.command(help=__doc__) 

239@click.pass_context 

240def nagios(ctx): 

241 try: 

242 check_status(ctx.obj.app) 

243 except NagiosCriticalError as e: 

244 status = NagiosPluginStatus.CRITICAL 

245 output, = e.args 

246 e = e.__cause__ 

247 detail = ''.join(format_exception(type(e), e, e.__traceback__)) 

248 except: # noqa: E722 

249 status = NagiosPluginStatus.UNKNOWN 

250 output = 'Unexpected error' 

251 detail = format_exc() 

252 else: 

253 status = NagiosPluginStatus.OK 

254 output = 'Running normally' 

255 detail = None 

256 print('{}: {}'.format(status.name, output)) 

257 if detail: 

258 print(detail) 

259 exit(status)