Source code for satnogsclient.waterfall

#!/usr/bin/env python3
#
# Notes:
# Plots generated with this script are occasionally used with software that generates
# overlays with additional information [1]. To allows this software to create accurately
# aligned overlays, textual metadata is provided inside the PNG image.
#
# PNG metadata keywords:
#
# | Keyword           | Description                                                            |
# |-------------------|------------------------------------------------------------------------|
# | "satnogs:wf-plot" | JSON-serialized, parameters used for plotting (figsize, gridspec, etc) |
# | "satnogs:wf-dat"  | JSON-serialized, content of the waterfall.dat header                   |
#
# References:
# [1]: https://gitlab.com/adamkalis/ikhnos/

import json
import logging
from argparse import ArgumentParser
from datetime import datetime, timedelta

import matplotlib
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.dates import date2num
from matplotlib.gridspec import GridSpec

matplotlib.use('Agg')

LOGGER = logging.getLogger(__name__)

OFFSET_IN_STDS = -2.0
SCALE_IN_STDS = 8.0

FIGSIZE = (8.32, 16.03)
GRIDSPEC = {
    'nrows': 1,
    'ncols': 2,
    'width_ratios': [0.93, 0.03],
    'left': 0.09,
    'right': 0.91,
    'top': 0.995,
    'bottom': 0.03,
}


[docs]class EmptyArrayError(Exception): """Empty data array exception"""
[docs]def _read_waterfall(datafile_path): """Read waterfall data file :param datafile_path: Path to data file :type datafile_path: str :raises EmptyArrayError: Empty waterfall data :raises IndexError: Invalid waterfall data :raises FileNotFoundError: No waterfall data file found :raises OSError: Could not open waterfall data file :return: Waterfall data and metadata :rtype: tuple (dict, dict) """ LOGGER.debug('Reading waterfall file') with open(datafile_path, mode='rb') as datafile: metadata = { 'timestamp': np.fromfile(datafile, dtype='|S32', count=1)[0].decode('utf-8'), 'nchan': np.fromfile(datafile, dtype='>i4', count=1)[0], 'samp_rate': np.fromfile(datafile, dtype='>i4', count=1)[0], 'nfft_per_row': np.fromfile(datafile, dtype='>i4', count=1)[0], 'center_freq': np.fromfile(datafile, dtype='>f4', count=1)[0], 'endianness': np.fromfile(datafile, dtype='<i4', count=1)[0], } dtype_prefix = '<' if metadata['endianness'] else '>' data_dtypes = np.dtype([('tabs', dtype_prefix + 'i8'), ('spec', dtype_prefix + 'f4', (metadata['nchan'], ))]) waterfall = { 'data': np.fromfile(datafile, dtype=data_dtypes), } if not waterfall['data'].size: raise EmptyArrayError return waterfall, metadata
[docs]def _compress_waterfall(waterfall): """Compress spectra of waterfall :param waterfall: Watefall data :type waterfall: dict :return: Compressed spectra :rtype: dict """ spec = waterfall['data']['spec'] std = np.std(spec, axis=0) offset = np.mean(spec, axis=0) + OFFSET_IN_STDS * std scale = SCALE_IN_STDS * std / 255.0 values = np.clip((spec - offset) / scale, 0.0, 255.0).astype('uint8') return {'offset': offset, 'scale': scale, 'values': values}
[docs]def _get_waterfall(datafile_path): """Get waterfall data :param datafile_path: Path to data file :type datafile_path: str_array :return: Waterfall data including compressed data :rtype: dict """ waterfall, metadata = _read_waterfall(datafile_path) nint = waterfall['data']['spec'].shape[0] waterfall['trel'] = np.arange(nint) * metadata['nfft_per_row'] * metadata['nchan'] / float( metadata['samp_rate']) waterfall['freq'] = np.linspace(-0.5 * metadata['samp_rate'], 0.5 * metadata['samp_rate'], metadata['nchan'], endpoint=False) waterfall['compressed'] = _compress_waterfall(waterfall) return waterfall, metadata
[docs]class Waterfall(): # pylint: disable=R0903 """Parse waterfall data file :param datafile_path: Path to data file :type datafile_path: str_array """ def __init__(self, datafile_path): """Class constructor""" self.data, self.metadata = _get_waterfall(datafile_path)
[docs] def plot(self, figure_path, vmin=None, vmax=None): """Plot waterfall into a figure :param figure_path: Path of figure file to save :type figure_path: str :param vmin: Minimum value range :type vmin: int :param vmax: Maximum value range :type vmax: int """ tmin = np.min(self.data['data']['tabs'] / 1000000.0) tmax = np.max(self.data['data']['tabs'] / 1000000.0) fmin = np.min(self.data['freq'] / 1000.0) fmax = np.max(self.data['freq'] / 1000.0) timefmt = '%Y-%m-%dT%H:%M:%S.%fZ' t_ref = datetime.strptime(self.metadata['timestamp'], timefmt) dt_min = t_ref + timedelta(seconds=tmin) dt_max = t_ref + timedelta(seconds=tmax) if vmin is None or vmax is None: vmin = -100 vmax = -50 c_idx = self.data['data']['spec'] > -200.0 if np.sum(c_idx) > 100: data_mean = np.mean(self.data['data']['spec'][c_idx]) data_std = np.std(self.data['data']['spec'][c_idx]) vmin = data_mean - 2.0 * data_std vmax = data_mean + 6.0 * data_std fig = plt.figure(figsize=FIGSIZE) gs = GridSpec(**GRIDSPEC) axis = fig.add_subplot(gs[0]) axis_cbar = fig.add_subplot(gs[1]) im = axis.imshow(self.data['data']['spec'], origin='lower', aspect='auto', interpolation='None', extent=[fmin, fmax, date2num(dt_min), date2num(dt_max)], vmin=vmin, vmax=vmax, cmap='viridis') axis.yaxis_date() axis.yaxis.set_major_locator(mdates.MinuteLocator(interval=1)) axis.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) axis2 = axis.twinx() axis2.set_ylim(tmin, tmax) axis.set_xlabel('Frequency (kHz)') axis.set_ylabel('Time (UTC)') axis2.set_ylabel('Time (seconds)') cbar = plt.colorbar(im, aspect=50, cax=axis_cbar) cbar.set_label('Power (dB)') # Prepare metadata dat_metadata = {key: str(value) for key, value in self.metadata.items()} plot_metadata = { 'figsize': FIGSIZE, 'gridspec': GRIDSPEC, 'xlim_kHz': [round(val, 6) for val in axis.get_xlim()], 'ylim_s': [round(val, 6) for val in axis2.get_ylim()], 'ylim_num': [round(val, 6) for val in axis.get_ylim()], } # Serialze with JSON and combine metadata = { 'satnogs:wf-dat': json.dumps(dat_metadata), 'satnogs:wf-plot': json.dumps(plot_metadata), } fig.savefig(figure_path, metadata=metadata) plt.close()
[docs]def main(): parser = ArgumentParser(description='Make a waterfall plot') parser.add_argument('data_path', help='Data path (dat file)') parser.add_argument('png_path', help='Output path (png file)') args = parser.parse_args() waterfall = Waterfall(args.data_path) waterfall.plot(args.png_path)
if __name__ == '__main__': main()