Source code for satnogsclient.waterfall

import logging

import matplotlib
import numpy as np


import matplotlib.pyplot as plt  # isort:skip # noqa: E402 # pylint: disable=C0411,C0412,C0413

LOGGER = logging.getLogger(__name__)


[docs]class EmptyArrayError(Exception): """Empty data array exception"""
[docs]def _read_waterfall(datafile_path): """Read waterfall data file :param datafile_path: Path to data file :type datafile_path: str :raises EmptyArrayError: Empty waterfall data :return: Waterfall data :rtype: dict """'Reading waterfall file') datafile = open(datafile_path, mode='rb') waterfall = { 'timestamp': np.fromfile(datafile, dtype='|S32', count=1)[0], 'nchan': np.fromfile(datafile, dtype='>i4', count=1)[0], 'samp_rate': np.fromfile(datafile, dtype='>i4', count=1)[0], 'nfft_per_row': np.fromfile(datafile, dtype='>i4', count=1)[0], 'center_freq': np.fromfile(datafile, dtype='>f4', count=1)[0], 'endianess': np.fromfile(datafile, dtype='>i4', count=1)[0] } data_dtypes = np.dtype([('tabs', 'int64'), ('spec', 'float32', (waterfall['nchan'], ))]) waterfall['data'] = np.fromfile(datafile, dtype=data_dtypes) if not waterfall['data'].size: raise EmptyArrayError datafile.close() return waterfall
[docs]def _compress_waterfall(waterfall): """Compress spectra of waterfall :param waterfall: Watefall data :type waterfall: dict :return: Compressed spectra :rtype: dict """ spec = waterfall['data']['spec'] std = np.std(spec, axis=0) offset = np.mean(spec, axis=0) + OFFSET_IN_STDS * std scale = SCALE_IN_STDS * std / 255.0 values = np.clip((spec - offset) / scale, 0.0, 255.0).astype('uint8') return {'offset': offset, 'scale': scale, 'values': values}
[docs]def _get_waterfall(datafile_path): """Get waterfall data :param datafile_path: Path to data file :type datafile_path: str_array :return: Waterfall data including compressed data :rtype: dict """ waterfall = _read_waterfall(datafile_path) nint = waterfall['data']['spec'].shape[0] waterfall['trel'] = np.arange(nint) * waterfall['nfft_per_row'] * waterfall['nchan'] / float( waterfall['samp_rate']) waterfall['freq'] = np.linspace(-0.5 * waterfall['samp_rate'], 0.5 * waterfall['samp_rate'], waterfall['nchan'], endpoint=False) waterfall['compressed'] = _compress_waterfall(waterfall) return waterfall
[docs]class Waterfall(): # pylint: disable=R0903 """Parse waterfall data file :param datafile_path: Path to data file :type datafile_path: str_array """ def __init__(self, datafile_path): """Class constructor""" = _get_waterfall(datafile_path)
[docs] def plot(self, figure_path, vmin=None, vmax=None): """Plot waterfall into a figure :param figure_path: Path of figure file to save :type figure_path: str :param vmin: Minimum value range :type vmin: int :param vmax: Maximum value range :type vmax: int """ tmin = np.min(['data']['tabs'] / 1000000.0) tmax = np.max(['data']['tabs'] / 1000000.0) fmin = np.min(['freq'] / 1000.0) fmax = np.max(['freq'] / 1000.0) if vmin is None or vmax is None: vmin = -100 vmax = -50 c_idx =['data']['spec'] > -200.0 if np.sum(c_idx) > 100: data_mean = np.mean(['data']['spec'][c_idx]) data_std = np.std(['data']['spec'][c_idx]) vmin = data_mean - 2.0 * data_std vmax = data_mean + 6.0 * data_std plt.figure(figsize=(10, 20)) plt.imshow(['data']['spec'], origin='lower', aspect='auto', interpolation='None', extent=[fmin, fmax, tmin, tmax], vmin=vmin, vmax=vmax, cmap='viridis') plt.xlabel('Frequency (kHz)') plt.ylabel('Time (seconds)') fig = plt.colorbar(aspect=50) fig.set_label('Power (dB)') plt.savefig(figure_path, bbox_inches='tight') plt.close()