Coverage for gwcelery/tasks/detchar.py: 95%
234 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-17 06:48 +0000
1"""Data quality and detector characterization tasks.
3These tasks are mostly focused on checking interferometer state vectors. By
4design, the [LIGO]_ and [Virgo]_ state vectors share the same definitions for
5the first 8 fields.
7LIGO also has a [DMT]_ DQ vector that provides some additional instrumental
8checks.
10References
11----------
12.. [LIGO] https://wiki.ligo.org/Calibration/TDCalibReview
13.. [Virgo] https://dcc.ligo.org/G1801125/
14.. [DMT] https://wiki.ligo.org/DetChar/DmtDqVector
16"""
17import getpass
18import glob
19import io
20import json
21import math
22import socket
23import time
25import matplotlib.pyplot as plt
26import numpy as np
27import sentry_sdk
28from astropy.time import Time
29from celery import group
30from celery.utils.log import get_task_logger
31from glue.lal import Cache
32from gwdatafind import find_urls
33from gwpy.plot import Plot
34from gwpy.timeseries import Bits, StateVector, TimeSeries
36from .. import _version, app
37from ..jinja import env
38from ..util import closing_figures
39from . import gracedb
41__author__ = 'Geoffrey Mo <geoffrey.mo@ligo.org>'
43log = get_task_logger(__name__)
46def create_cache(ifo, start, end):
47 """Find .gwf files and create cache. Will first look in the llhoft, and
48 if the frames have expired from llhoft, will call gwdatafind.
50 Parameters
51 ----------
52 ifo : str
53 Interferometer name (e.g. ``H1``).
54 start, end: int or float
55 GPS start and end times desired.
57 Returns
58 -------
59 :class:`glue.lal.Cache`
61 Example
62 -------
63 >>> create_cache('H1', 1198800018, 1198800618)
64 [<glue.lal.CacheEntry at 0x7fbae6b71278>,
65 <glue.lal.CacheEntry at 0x7fbae6ae5b38>,
66 <glue.lal.CacheEntry at 0x7fbae6ae5c50>,
67 ...
68 <glue.lal.CacheEntry at 0x7fbae6b15080>,
69 <glue.lal.CacheEntry at 0x7fbae6b15828>]
71 """
72 pattern = app.conf['llhoft_glob'].format(detector=ifo)
73 filenames = glob.glob(pattern)
74 cache = Cache.from_urls(filenames)
76 try:
77 cache_segment = list(cache.to_segmentlistdict().values())[0][0]
78 cache_starttime = cache_segment[0]
79 cache_endtime = cache_segment[1]
81 except IndexError:
82 log.exception('Files do not exist in llhoft_glob')
83 return cache # returns empty cache
85 if (cache_starttime <= start) and (end <= cache_endtime):
86 # required data is in llhoft
87 return cache
89 # otherwise, required data is not in the low latency cache
90 high_latency = app.conf['high_latency_frame_types'][ifo]
91 urls = find_urls(ifo[0], high_latency, start, end)
92 if urls:
93 return Cache.from_urls(urls)
95 # required data not in high latency frames
96 low_latency = app.conf['low_latency_frame_types'][ifo]
97 urls = find_urls(ifo[0], low_latency, start, end)
98 if not urls: # required data not in low latency frames
99 log.error('This data cannot be found, or does not exist.')
101 return Cache.from_urls(urls)
104@app.task(shared=False)
105@closing_figures()
106def make_omegascan(ifo, t0, durs):
107 """Helper function to create a single omegascan image, with
108 multiple durations.
110 Parameters
111 ----------
112 ifo : str
113 'H1', 'L1', or 'V1'
114 t0 : int or float
115 Central time of the omegascan.
116 durs : list of tuples
117 List of three tuples, with time before and time after t0 in seconds.
118 Example: [(0.75, 0.25), (1.5, 0.5), (7.5, 2.5)]
120 Returns
121 -------
122 bytes or None
123 bytes of png of the omegascan, or None if no omegascan created.
125 """
126 # Explicitly use a non-interactive Matplotlib backend.
127 plt.switch_backend('agg')
129 # Collect data
130 durs = np.array(durs)
131 longest_before, longest_after = durs[:, 0].max(), durs[:, 1].max()
132 # Add extra data buffer to avoid whitening window issues
133 # Use integer times to avoid passing too much precision to gwpy
134 long_start = math.floor(t0 - longest_before - 2)
135 long_end = math.ceil(t0 + longest_after + 2)
136 cache = create_cache(ifo, long_start, long_end)
137 strain_name = app.conf['strain_channel_names'][ifo]
138 try:
139 ts = TimeSeries.read(cache, strain_name,
140 start=long_start, end=long_end).astype('float64')
141 # Do q_transforms for the different durations
142 qgrams = [ts.q_transform(
143 frange=(10, 4096), gps=t0,
144 outseg=(t0 - before, t0 + after), logf=True)
145 for before, after in durs]
146 except (IndexError, FloatingPointError, ValueError) as err:
147 # data from cache can't be properly read, or data is weird
148 if not isinstance(err, FloatingPointError):
149 # don't log FloatingPointError
150 # it means that the frames are full of exactly zero
151 sentry_sdk.capture_exception(err)
152 fig = plt.figure(figsize=(4, 1))
153 plt.axis("off")
154 plt.text(0.5, 0.5, f"Failed to create {ifo} omegascan",
155 horizontalalignment='center', verticalalignment='center',
156 fontsize=17)
157 else:
158 fig = Plot(*qgrams,
159 figsize=(12 * len(durs), 6),
160 geometry=(1, len(durs)),
161 yscale='log',
162 method='pcolormesh',
163 clim=(0, 30),
164 cmap='viridis')
165 for i, ax in enumerate(fig.axes):
166 ax.set_title(f'Q = {qgrams[i].q:.3g}')
167 if i in [1, 2]:
168 ax.set_ylabel('')
169 if i == 2:
170 fig.colorbar(ax=ax, label='Normalized energy')
171 ax.set_epoch(t0)
172 fig.suptitle(f'Omegascans of {strain_name} at {t0}', fontweight="bold")
173 plt.subplots_adjust(wspace=0.08)
174 outfile = io.BytesIO()
175 fig.savefig(outfile, format='png', dpi=150, bbox_inches='tight')
176 return outfile.getvalue()
179@app.task(shared=False)
180def omegascan(t0, graceid):
181 """Create omegascan for a certain event.
183 Parameters
184 ----------
185 t0 : float
186 Central event time.
187 graceid : str
188 GraceDB ID to which to upload the omegascan.
190 """
191 durs = app.conf['omegascan_durations']
192 durs = np.array(durs)
194 # Delay for early warning events (ie queries for times before now)
195 longest_after = durs[:, 1].max() + 2 # extra data for whitening window
196 if t0 + longest_after > Time.now().gps:
197 log.info("Delaying omegascan because %s is in the future",
198 graceid)
199 waittime = t0 - Time.now().gps + longest_after + 10
200 else:
201 waittime = longest_after + 10
203 group(
204 make_omegascan.s(ifo, t0, durs).set(countdown=waittime)
205 |
206 gracedb.upload.s(f"{ifo}_omegascan.png", graceid,
207 f"{ifo} omegascan", tags=['data_quality'])
208 for ifo in ['H1', 'L1', 'V1']
209 ).delay()
212def generate_table(title, high_bit_list, low_bit_list, unknown_bit_list):
213 """Make a nice table which shows the status of the bits checked.
215 Parameters
216 ----------
217 title : str
218 Title of the table.
219 high_bit_list: list
220 List of bit names which are high.
221 low_bit_list: list
222 List of bit names which are low.
223 unknown_bit_list: list
224 List of bit names which are unknown.
226 Returns
227 -------
228 str
229 HTML string of the table.
231 """
232 template = env.get_template('vector_table.jinja2')
233 return template.render(title=title, high_bit_list=high_bit_list,
234 low_bit_list=low_bit_list,
235 unknown_bit_list=unknown_bit_list)
238def dqr_json(state, summary):
239 """Generate DQR-compatible json-ready dictionary from process results, as
240 described in :class:`data-quality-report.design`.
242 Parameters
243 ----------
244 state : {'pass', 'fail'}
245 State of the detchar checks.
246 summary : str
247 Summary of results from the process.
249 Returns
250 -------
251 dict
252 Ready to be converted into json.
254 """
255 links = [
256 {
257 "href":
258 "https://gwcelery.readthedocs.io/en/latest/gwcelery.tasks.detchar.html#gwcelery.tasks.detchar.check_vectors", # noqa
259 "innerHTML": "a link to the documentation for this process"
260 },
261 {
262 "href":
263 "https://git.ligo.org/emfollow/gwcelery/blob/main/gwcelery/tasks/detchar.py", # noqa
264 "innerHTML": "a link to the source code in the gwcelery repo"
265 }
266 ]
267 return dict(
268 state=state,
269 process_name=__name__,
270 process_version=_version.get_versions()['version'],
271 librarian='Geoffrey Mo (geoffrey.mo@ligo.org)',
272 date=time.strftime("%H:%M:%S UTC %a %d %b %Y", time.gmtime()),
273 hostname=socket.gethostname(),
274 username=getpass.getuser(),
275 summary=summary,
276 figures=[],
277 tables=[],
278 links=links,
279 extra=[],
280 )
283def ifo_from_channel(channel):
284 '''Get detector prefix from a channel.
286 Parameters
287 ----------
288 channel : str
289 Channel, e.g., H1:GDS-CALIB_STRAIN.
291 Returns
292 -------
293 str
294 Detector prefix, e.g., H1.
295 '''
296 ifo = channel.split(':')[0]
297 assert len(ifo) == 2, "Detector name should be two letters"
298 return ifo
301def check_idq(cache, channel, start, end):
302 """Looks for iDQ frame and reads them.
304 Parameters
305 ----------
306 cache : :class:`glue.lal.Cache`
307 Cache from which to check.
308 channel : str
309 which idq channel (FAP)
310 start, end: int or float
311 GPS start and end times desired.
313 Returns
314 -------
315 tuple
316 Tuple mapping iDQ channel to its minimum FAP.
318 Example
319 -------
320 >>> check_idq(cache, 'H1:IDQ-FAP_OVL_100_1000',
321 1216496260, 1216496262)
322 ('H1:IDQ-FAP_OVL_100_1000', 0.003)
324 """
325 if cache:
326 try:
327 idq_fap = TimeSeries.read(
328 cache, channel, start=start, end=end)
329 return (channel, float(idq_fap.min().value))
330 except (IndexError, RuntimeError, ValueError):
331 log.exception('Failed to read from low-latency iDQ frame files')
332 # FIXME: figure out how to get access to low-latency frames outside
333 # of the cluster. Until we figure that out, actual I/O errors have
334 # to be non-fatal.
335 return (channel, None)
338def check_vector(cache, channel, start, end, bits, logic_type='all'):
339 """Check timeseries of decimals against a bitmask.
340 This is inclusive of the start time and exclusive of the end time, i.e.
341 [start, ..., end).
343 Parameters
344 ----------
345 cache : :class:`glue.lal.Cache`
346 Cache from which to check.
347 channel : str
348 Channel to look at, e.g. ``H1:DMT-DQ_VECTOR``.
349 start, end : int or float
350 GPS start and end times desired.
351 bits: :class:`gwpy.TimeSeries.Bits`
352 Definitions of the bits in the channel.
353 logic_type : str, optional
354 Type of logic to apply for vetoing.
355 If ``all``, then all samples in the window must pass the bitmask.
356 If ``any``, then one or more samples in the window must pass.
358 Returns
359 -------
360 dict
361 Maps each bit in channel to its state.
363 Example
364 -------
365 >>> check_vector(cache, 'H1:GDS-CALIB_STATE_VECTOR', 1216496260,
366 1216496262, ligo_state_vector_bits)
367 {'H1:HOFT_OK': True,
368 'H1:OBSERVATION_INTENT': True,
369 'H1:NO_STOCH_HW_INJ': True,
370 'H1:NO_CBC_HW_INJ': True,
371 'H1:NO_BURST_HW_INJ': True,
372 'H1:NO_DETCHAR_HW_INJ': True}
374 """
375 if logic_type not in ('any', 'all'):
376 raise ValueError("logic_type must be either 'all' or 'any'.")
377 else:
378 logic_map = {'any': np.any, 'all': np.all}
379 bitname = '{}:{}'
380 if cache:
381 try:
382 statevector = StateVector.read(cache, channel,
383 start=start, end=end, bits=bits)
384 except (IndexError, TypeError, ValueError):
385 log.exception('Failed to read from low-latency frame files')
386 else:
387 # FIXME: In the playground environment, the Virgo state vector
388 # channel is stored as a float. Is this also the case in the
389 # production environment?
390 statevector = statevector.astype(np.uint32)
391 if len(statevector) > 0: # statevector must not be empty
392 return {bitname.format(ifo_from_channel(channel), key):
393 bool(logic_map[logic_type](
394 value.value if len(value.value) > 0 else None))
395 for key, value in statevector.get_bit_series().items()}
396 # FIXME: figure out how to get access to low-latency frames outside
397 # of the cluster. Until we figure that out, actual I/O errors have
398 # to be non-fatal.
399 return {bitname.format(ifo_from_channel(channel), key):
400 None for key in bits if key is not None}
403@app.task(shared=False, bind=True, default_retry_delay=5)
404def check_vectors(self, event, graceid, start, end):
405 """Perform data quality checks for an event and labels/logs results to
406 GraceDB.
408 Depending on the pipeline, a certain amount of time (specified in
409 :obj:`~gwcelery.conf.check_vector_prepost`) is appended to either side of
410 the superevent start and end time. This is to catch DQ issues slightly
411 before and after the event, such as that appearing in L1 just before
412 GW170817.
414 A cache is then created for H1, L1, and V1, regardless of the detectors
415 involved in the event. Then, the bits and channels specified in the
416 configuration file (:obj:`~gwcelery.conf.llhoft_channels`) are checked.
417 If an injection is found in the active detectors, 'INJ' is labeled to
418 GraceDB. If an injection is found in any detector, a message with the
419 injection found is logged to GraceDB. If no injections are found across
420 all detectors, this is logged to GraceDB.
422 A similar task is performed for the DQ states described in the
423 DMT-DQ_VECTOR, LIGO GDS-CALIB_STATE_VECTOR, and Virgo
424 DQ_ANALYSIS_STATE_VECTOR. If no DQ issues are found in active detectors,
425 'DQOK' is labeled to GraceDB. Otherwise, 'DQV' is labeled. In all cases,
426 the DQ states of all the state vectors checked are logged to GraceDB.
428 This skips MDC events.
430 Parameters
431 ----------
432 event : dict
433 Details of event.
434 graceid : str
435 GraceID of event to which to log.
436 start, end : int or float
437 GPS start and end times desired.
439 Returns
440 -------
441 event : dict
442 Details of the event, reflecting any labels that were added.
444 """
445 # Skip early warning events (ie queries for times before now)
446 if end > Time.now().gps:
447 log.info("Skipping detchar checks because %s is in the future",
448 event['graceid'])
449 return event
451 # Skip MDC events.
452 if event.get('search') == 'MDC':
453 log.info("Skipping detchar checks because %s is an MDC",
454 event['graceid'])
455 return event
457 # Create caches for all detectors
458 instruments = event['instruments'].split(',')
459 pipeline = event['pipeline']
460 pre, post = app.conf['check_vector_prepost'][pipeline]
461 start, end = start - pre, end + post
462 prepost_msg = "Check looked within -{}/+{} seconds of superevent. ".format(
463 pre, post)
465 ifos = {ifo_from_channel(key) for key, val in
466 app.conf['llhoft_channels'].items()}
467 caches = {ifo: create_cache(ifo, start, end) for ifo in ifos}
468 bit_defs = {channel_type: Bits(channel=bitdef['channel'],
469 bits=bitdef['bits'])
470 for channel_type, bitdef
471 in app.conf['detchar_bit_definitions'].items()}
473 # Examine injection and DQ states
474 # Do not analyze DMT-DQ_VECTOR if pipeline uses gated h(t)
475 states = {}
476 analysis_channels = app.conf['llhoft_channels'].items()
477 if app.conf['uses_gatedhoft'][pipeline]:
478 analysis_channels = {k: v for k, v in analysis_channels
479 if k[3:] != 'DMT-DQ_VECTOR'}.items()
480 for channel, bits in analysis_channels:
481 try:
482 states.update(check_vector(
483 caches[ifo_from_channel(channel)], channel,
484 start, end, bit_defs[bits]))
485 except ValueError as exc:
486 # check_vector likely failed to find the requested data
487 # in the cache because it has yet to arrive
488 raise self.retry(exc=exc, max_retries=4)
489 # Pick out DQ and injection states, then filter for active detectors
490 dq_states = {key: value for key, value in states.items()
491 if key.split('_')[-1] != 'INJ'}
492 inj_states = {key: value for key, value in states.items()
493 if key.split('_')[-1] == 'INJ'}
494 active_dq_states = {key: value for key, value in dq_states.items()
495 if ifo_from_channel(key) in instruments}
496 active_inj_states = {key: value for key, value in inj_states.items()
497 if ifo_from_channel(channel) in instruments}
499 # Check iDQ states and filter for active instruments
500 idq_faps = dict(check_idq(caches[ifo_from_channel(channel)],
501 channel, start, end)
502 for channel in app.conf['idq_channels']
503 if ifo_from_channel(channel) in instruments)
504 idq_oks = dict(check_idq(caches[ifo_from_channel(channel)],
505 channel, start, end)
506 for channel in app.conf['idq_ok_channels']
507 if ifo_from_channel(channel) in instruments)
509 # Logging iDQ to GraceDB
510 # Checking the IDQ-OK vector
511 idq_not_ok_ifos = [
512 ifo_from_channel(ok_channel)
513 for ok_channel, min_value in idq_oks.items()
514 if min_value == 0 or min_value is None]
515 idq_not_ok_fap_chans = [
516 chan for chan in idq_faps.keys()
517 if ifo_from_channel(chan) in idq_not_ok_ifos]
518 # Remove iDQ FAP channels if their IDQ_OK values are bad
519 for idq_not_ok_chan in idq_not_ok_fap_chans:
520 del idq_faps[idq_not_ok_chan]
521 if len(idq_not_ok_ifos) > 0:
522 idq_ok_msg = (f"Not checking iDQ for {', '.join(idq_not_ok_ifos)} "
523 "because it has times where IDQ_OK = 0. ")
524 else:
525 idq_ok_msg = ''
527 if None not in idq_faps.values() and len(idq_faps) > 0:
528 idq_faps_readable = {k: round(v, 3) for k, v in idq_faps.items()}
529 if min(idq_faps.values()) <= app.conf['idq_fap_thresh']:
530 idq_msg = ("iDQ false alarm probability is low "
531 "(below {} threshold), "
532 "i.e., there could be a data quality issue: "
533 "minimum FAP is {}. ").format(
534 app.conf['idq_fap_thresh'],
535 json.dumps(idq_faps_readable)[1:-1])
536 # If iDQ FAP is low and pipeline enabled, apply DQV
537 if app.conf['idq_veto'][pipeline]:
538 gracedb.remove_label('DQOK', graceid)
539 gracedb.create_label('DQV', graceid)
540 # Add labels to return value to avoid querying GraceDB again.
541 event = dict(event, labels=event.get('labels', []) + ['DQV'])
542 try:
543 event['labels'].remove('DQOK')
544 except ValueError: # not in list
545 pass
546 else:
547 idq_msg = ("iDQ false alarm probabilities for active detectors "
548 "are good (above {} threshold). "
549 "Minimum FAP is {}. ").format(
550 app.conf['idq_fap_thresh'],
551 json.dumps(idq_faps_readable)[1:-1])
552 elif None in idq_faps.values():
553 idq_msg = "iDQ false alarm probabilities unknown. "
554 else:
555 idq_msg = ''
556 gracedb.upload.delay(
557 None, None, graceid,
558 idq_ok_msg + idq_msg + prepost_msg, ['data_quality'])
560 # Labeling INJ to GraceDB
561 if False in active_inj_states.values():
562 # Label 'INJ' if injection found in active IFOs
563 gracedb.create_label('INJ', graceid)
564 # Add labels to return value to avoid querying GraceDB again.
565 event = dict(event, labels=event.get('labels', []) + ['INJ'])
566 if False in inj_states.values():
567 # Write all found injections into GraceDB log
568 injs = [k for k, v in inj_states.items() if v is False]
569 inj_fmt = "Injection found.\n{}\n"
570 inj_msg = inj_fmt.format(
571 generate_table('Injection bits', [], injs, []))
572 elif all(inj_states.values()) and len(inj_states.values()) > 0:
573 inj_msg = 'No HW injections found. '
574 gracedb.remove_label('INJ', graceid)
575 event = dict(event, labels=list(event.get('labels', [])))
576 try:
577 event['labels'].remove('INJ')
578 except ValueError: # not in list
579 pass
580 else:
581 inj_msg = 'Injection state unknown. '
582 gracedb.upload.delay(
583 None, None, graceid, inj_msg + prepost_msg, ['data_quality'])
585 # Determining overall_dq_active_state
586 if None in active_dq_states.values() or len(
587 active_dq_states.values()) == 0:
588 overall_dq_active_state = None
589 elif False in active_dq_states.values():
590 overall_dq_active_state = False
591 elif all(active_dq_states.values()):
592 overall_dq_active_state = True
593 goods = [k for k, v in dq_states.items() if v is True]
594 bads = [k for k, v in dq_states.items() if v is False]
595 unknowns = [k for k, v in dq_states.items() if v is None]
596 fmt = "Detector state for active instruments is {}.\n{}"
597 msg = fmt.format(
598 {None: 'unknown', False: 'bad', True: 'good'}[overall_dq_active_state],
599 generate_table('Data quality bits', goods, bads, unknowns)
600 )
601 if app.conf['uses_gatedhoft'][pipeline]:
602 gate_msg = ('Pipeline {} uses gated h(t),'
603 ' LIGO DMT-DQ_VECTOR not checked.').format(pipeline)
604 else:
605 gate_msg = ''
607 # Labeling DQOK/DQV to GraceDB
608 gracedb.upload.delay(
609 None, None, graceid, msg + prepost_msg + gate_msg, ['data_quality'])
610 if overall_dq_active_state is True:
611 state = "pass"
612 gracedb.remove_label('DQV', graceid)
613 gracedb.create_label('DQOK', graceid)
614 # Add labels to return value to avoid querying GraceDB again.
615 event = dict(event, labels=event.get('labels', []) + ['DQOK'])
616 try:
617 event['labels'].remove('DQV')
618 except ValueError: # not in list
619 pass
620 elif overall_dq_active_state is False:
621 state = "fail"
622 gracedb.remove_label('DQOK', graceid)
623 gracedb.create_label('DQV', graceid)
624 # Add labels to return value to avoid querying GraceDB again.
625 event = dict(event, labels=event.get('labels', []) + ['DQV'])
626 try:
627 event['labels'].remove('DQOK')
628 except ValueError: # not in list
629 pass
630 else:
631 state = "unknown"
633 # Create and upload DQR-compatible json
634 state_summary = '{} {} {}'.format(
635 inj_msg, idq_ok_msg + idq_msg, msg)
636 if state == "unknown":
637 json_state = "error"
638 else:
639 json_state = state
640 file = dqr_json(json_state, state_summary)
641 filename = 'gwcelerydetcharcheckvectors-{}.json'.format(graceid)
642 message = "DQR-compatible json generated from check_vectors results"
643 gracedb.upload.delay(
644 json.dumps(file), filename, graceid, message)
646 return event