Source code for satnogsclient.observer.observer

from __future__ import absolute_import, division, print_function

import json
import logging
import shlex
import subprocess
import uuid
from datetime import datetime
from pathlib import Path
from time import sleep

import pytz
import requests

import satnogsclient.config
import satnogsclient.radio.flowgraphs as flowgraphs
from satnogsclient import settings
from satnogsclient.artifacts import Artifacts
from satnogsclient.observer.worker import WorkerFreq, WorkerTrack
from satnogsclient.scheduler import SCHEDULER
from satnogsclient.waterfall import EmptyArrayError, Waterfall

try:
    from urllib.parse import urljoin
except ImportError:
    from urlparse import urljoin

LOGGER = logging.getLogger(__name__)


[docs]def post_artifacts(artifacts_file, observation_id): url = urljoin(settings.ARTIFACTS_API_URL, 'artifacts/') headers = {'Authorization': 'Token {0}'.format(settings.ARTIFACTS_API_TOKEN)} if not url.endswith('/'): url += '/' try: response = requests.post(url, headers=headers, files={ 'artifact_file': (str(uuid.uuid4()) + '.h5', artifacts_file, 'application/x-hdf5') }, verify=settings.SATNOGS_VERIFY_SSL, stream=True, timeout=settings.ARTIFACTS_API_TIMEOUT) response.raise_for_status() LOGGER.info('Artifacts upload successful.') artifacts_file.close() except requests.exceptions.Timeout: LOGGER.error('Upload of artifacts for observation %s failed ' 'due to timeout.', observation_id) except requests.exceptions.HTTPError: if response.status_code == 404: LOGGER.error( "Upload of artifacts for observation %s failed, %s doesn't exist (404)." 'Probably the observation was deleted.', observation_id, url) if response.status_code == 403 and 'has already been uploaded' in response.text: LOGGER.error('Upload of artifacts for observation %s is forbidden, %s\n URL: %s', observation_id, response.text, url) else: LOGGER.error( 'Upload of artifacts for observation %s failed, ' 'response status code: %s', observation_id, response.status_code)
[docs]class Observer(object): # Variables from settings # Mainly present so we can support multiple ground stations from the client def __init__(self): self.location = None self.rot_port = settings.SATNOGS_ROT_PORT self.rig_ip = settings.SATNOGS_RIG_IP self.rig_port = settings.SATNOGS_RIG_PORT self.observation_id = None self.tle = None self.timestamp = None self.observation_end = None self.frequency = None self.mode = None self.baud = None self.observation_raw_file = None self.observation_ogg_file = None self.observation_waterfall_file = None self.observation_waterfall_png = None self.observation_receiving_decoded_data = None # pylint: disable=C0103 self.observation_decoded_data = None self.observation_done_decoded_data = None self.tracker_freq = None self.tracker_rot = None
[docs] def setup(self, observation_id, tle, observation_end, frequency, mode, baud): """ Sets up required internal variables. * returns True if setup is ok * returns False if issue is encountered """ # Set attributes self.observation_id = observation_id self.tle = tle self.observation_end = observation_end self.frequency = frequency self.baud = baud self.mode = mode not_completed_prefix = 'receiving_satnogs' completed_prefix = 'satnogs' receiving_waterfall_prefix = 'receiving_waterfall' waterfall_prefix = 'waterfall' receiving_decoded_data_prefix = 'receiving_data' decoded_data_prefix = 'data' self.timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H-%M-%S%z') raw_file_extension = 'out' encoded_file_extension = 'ogg' waterfall_file_extension = 'dat' self.observation_raw_file = '{0}/{1}_{2}_{3}.{4}'.format(settings.SATNOGS_OUTPUT_PATH, not_completed_prefix, self.observation_id, self.timestamp, raw_file_extension) self.observation_ogg_file = '{0}/{1}_{2}_{3}.{4}'.format(settings.SATNOGS_OUTPUT_PATH, completed_prefix, self.observation_id, self.timestamp, encoded_file_extension) self.observation_waterfall_file = '{0}/{1}_{2}_{3}.{4}'.format( settings.SATNOGS_OUTPUT_PATH, receiving_waterfall_prefix, self.observation_id, self.timestamp, waterfall_file_extension) self.observation_waterfall_png = '{0}/{1}_{2}_{3}.{4}'.format( settings.SATNOGS_OUTPUT_PATH, waterfall_prefix, self.observation_id, self.timestamp, 'png') self.observation_receiving_decoded_data = '{0}/{1}_{2}_{3}.{4}'.format( settings.SATNOGS_OUTPUT_PATH, receiving_decoded_data_prefix, self.observation_id, self.timestamp, 'png') self.observation_done_decoded_data = '{0}/{1}_{2}_{3}.{4}'.format( settings.SATNOGS_OUTPUT_PATH, decoded_data_prefix, self.observation_id, self.timestamp, 'png') self.observation_decoded_data = '{0}/{1}_{2}'.format(settings.SATNOGS_OUTPUT_PATH, decoded_data_prefix, self.observation_id) return all([ self.observation_id, self.tle, self.timestamp, self.observation_end, self.frequency, self.observation_raw_file, self.observation_ogg_file, self.observation_waterfall_file, self.observation_waterfall_png, self.observation_decoded_data ])
[docs] def observe(self): # pylint: disable=R0915 """Starts threads for rotcrl and rigctl.""" if settings.SATNOGS_PRE_OBSERVATION_SCRIPT is not None: LOGGER.info('Executing pre-observation script.') script_name = flowgraphs.SATNOGS_FLOWGRAPH_MODES[ flowgraphs.SATNOGS_FLOWGRAPH_MODE_DEFAULT]['script_name'] if self.mode in flowgraphs.SATNOGS_FLOWGRAPH_MODES: script_name = flowgraphs.SATNOGS_FLOWGRAPH_MODES[self.mode]['script_name'] replacements = [ ('{{FREQ}}', str(self.frequency)), ('{{TLE}}', json.dumps(self.tle)), ('{{TIMESTAMP}}', self.timestamp), ('{{ID}}', str(self.observation_id)), ('{{BAUD}}', str(self.baud)), ('{{SCRIPT_NAME}}', script_name), ] pre_script = [] for arg in shlex.split(settings.SATNOGS_PRE_OBSERVATION_SCRIPT): for key, val in replacements: arg = arg.replace(key, val) pre_script.append(arg) subprocess.call(pre_script) # if it is APT we want to save with a prefix until the observation # is complete, then rename. if self.mode == 'APT': self.observation_decoded_data =\ self.observation_receiving_decoded_data # start thread for rotctl LOGGER.info('Start rotctrl thread.') self.run_rot() # start thread for rigctl LOGGER.info('Start rigctrl thread.') self.run_rig() sleep(1) LOGGER.info('Start gnuradio thread.') flowgraph = satnogsclient.radio.flowgraphs.Flowgraph( settings.SATNOGS_SOAPY_RX_DEVICE, settings.SATNOGS_RX_SAMP_RATE, self.frequency, self.mode, self.baud, { 'audio': self.observation_raw_file, 'waterfall': self.observation_waterfall_file, 'decoded': self.observation_decoded_data }) flowgraph.enabled = True # Polling gnuradio process status self.poll_gnu_proc_status(flowgraph) # Rename files and create waterfall self.rename_ogg_file() self.rename_data_file() LOGGER.info('Creating waterfall plot.') waterfall = None try: waterfall = Waterfall(self.observation_waterfall_file) self.plot_waterfall(waterfall) if settings.SATNOGS_REMOVE_RAW_FILES: self.remove_waterfall_file() except FileNotFoundError: LOGGER.error('No waterfall data file found') except EmptyArrayError: LOGGER.error('Waterfall data array is empty') # PUT client version and metadata base_url = urljoin(settings.SATNOGS_NETWORK_API_URL, 'observations/') headers = {'Authorization': 'Token {0}'.format(settings.SATNOGS_API_TOKEN)} url = urljoin(base_url, str(self.observation_id)) if not url.endswith('/'): url += '/' client_metadata = flowgraph.info client_metadata['latitude'] = settings.SATNOGS_STATION_LAT client_metadata['longitude'] = settings.SATNOGS_STATION_LON client_metadata['elevation'] = settings.SATNOGS_STATION_ELEV client_metadata['frequency'] = self.frequency try: resp = requests.put(url, headers=headers, data={ 'client_version': satnogsclient.config.VERSION, 'client_metadata': json.dumps(client_metadata) }, verify=settings.SATNOGS_VERIFY_SSL, stream=True, timeout=45) resp.raise_for_status() except requests.exceptions.ConnectionError: LOGGER.error('%s: Connection Refused', url) except requests.exceptions.Timeout: LOGGER.error('%s: Connection Timeout - no metadata uploaded', url) except requests.exceptions.HTTPError as http_error: LOGGER.error(http_error) except requests.exceptions.RequestException as err: LOGGER.error('%s: Unexpected error: %s', url, err) if settings.ARTIFACTS_ENABLED: artifact = Artifacts(waterfall, self.observation_id) artifact.create() SCHEDULER.add_job(post_artifacts, args=(artifact.artifacts_file, str(self.observation_id)))
[docs] def run_rot(self): self.tracker_rot = WorkerTrack(ip=None, port=self.rot_port, frequency=self.frequency, time_to_stop=self.observation_end, sleep_time=3) LOGGER.debug('TLE: %s', self.tle) LOGGER.debug('Observation end: %s', self.observation_end) self.tracker_rot.trackobject(self.location, self.tle) self.tracker_rot.trackstart()
[docs] def run_rig(self): self.tracker_freq = WorkerFreq(ip=self.rig_ip, port=self.rig_port, frequency=self.frequency, time_to_stop=self.observation_end) LOGGER.debug('Rig Frequency %s', self.frequency) LOGGER.debug('Observation end: %s', self.observation_end) self.tracker_freq.trackobject(self.location, self.tle) self.tracker_freq.trackstart()
[docs] def poll_gnu_proc_status(self, flowgraph): while flowgraph.enabled and datetime.now(pytz.utc) <= self.observation_end: sleep(1) flowgraph.enabled = False LOGGER.info('Tracking stopped.') self.tracker_freq.trackstop() self.tracker_rot.trackstop() LOGGER.info('Observation Finished') LOGGER.info('Executing post-observation script.') if settings.SATNOGS_POST_OBSERVATION_SCRIPT is not None: script_name = flowgraphs.SATNOGS_FLOWGRAPH_MODES[ flowgraphs.SATNOGS_FLOWGRAPH_MODE_DEFAULT]['script_name'] if self.mode in flowgraphs.SATNOGS_FLOWGRAPH_MODES: script_name = flowgraphs.SATNOGS_FLOWGRAPH_MODES[self.mode]['script_name'] replacements = [ ('{{FREQ}}', str(self.frequency)), ('{{TLE}}', json.dumps(self.tle)), ('{{TIMESTAMP}}', self.timestamp), ('{{ID}}', str(self.observation_id)), ('{{BAUD}}', str(self.baud)), ('{{SCRIPT_NAME}}', script_name), ] post_script = [] for arg in shlex.split(settings.SATNOGS_POST_OBSERVATION_SCRIPT): for key, val in replacements: arg = arg.replace(key, val) post_script.append(arg) subprocess.call(post_script)
[docs] def rename_ogg_file(self): observation_raw_file_path = Path(self.observation_raw_file) if observation_raw_file_path.exists(): observation_raw_file_path.rename(Path(self.observation_ogg_file)) LOGGER.info('Rename encoded file for uploading finished')
[docs] def rename_data_file(self): observation_receiving_decoded_data_path = Path(self.observation_receiving_decoded_data) if observation_receiving_decoded_data_path.exists(): observation_receiving_decoded_data_path.rename(Path( self.observation_done_decoded_data)) LOGGER.info('Rename data file for uploading finished')
[docs] def plot_waterfall(self, waterfall): vmin = None vmax = None if not settings.SATNOGS_WATERFALL_AUTORANGE: vmin = settings.SATNOGS_WATERFALL_MIN_VALUE vmax = settings.SATNOGS_WATERFALL_MAX_VALUE waterfall.plot(self.observation_waterfall_png, vmin, vmax)
[docs] def remove_waterfall_file(self): try: Path(self.observation_waterfall_file).unlink() except FileNotFoundError: LOGGER.error('Failed to remove waterfall file')