Управляем Gqrx при помощи Python

29 июня 2020

Gqrx имеет интересную фичу — управлять программой можно по TCP при помощи незамысловатого протокола. Помимо прочего, это позволяет интегрировать Gqrx с Gpredict для компенсации эффекта Допплера. Я давно хотел поиграться с этой возможностью, только не мог придумать правдоподобную задачу. Идею подкинул Kevin Loughin, KB9RLW в своем видео Network sockets and remote control of GQRX SDR with telnet and python.

Первым делом был сделан простенький клиент к Gqrx:

import telnetlib

class GqrxClient:
    def __init__(self, host, port):
        """Creates a new GqrxClient"""
        self.tn = telnetlib.Telnet(host, port)

    def _write_line(self, line):
        self.tn.write((line+'\r\n').encode('ascii'))

    def _read_line(self):
        return self.tn.read_some().decode('ascii')

    def _get_rprt(self):
        resp = self._read_line()
        (rprt, code) = resp.split()
        if rprt != "RPRT":
            raise Exception("Unexpected response: "+resp)
        return int(code)

    def get_frequency(self):
        """Returns current frequency in Hz"""
        self._write_line("f")
        freq = self._read_line()
        return int(freq)

    def set_frequency(self, freq):
        """Sets current frequency to `freq` Hz"""
        self._write_line("F "+str(freq))
        return self._get_rprt()

    def get_mode(self):
        """Returns current (mode, width) pair"""
        self._write_line("m")
        resp = self._read_line()
        (mode, width) = resp.split()
        return (mode, int(width))

    def set_mode(self, mode, width):
        """Sets mode (AM, FM, ...) and width"""
        self._write_line("M "+mode+" "+str(width))
        return self._get_rprt()

    def get_signal_strength(self):
        """Returns signal strength in dBFS"""
        self._write_line("l")
        strength = self._read_line()
        return float(strength)

Более подробное описание протокола можно найти здесь.

Далее была написана мониторилка уровня сигнала от разных КВ-радиостанций:

#!/usr/bin/python3 -u

from GqrxClient import GqrxClient
from datetime import datetime
import argparse
import time
import sys

scan = [
    ('wwv_2.5Mhz',    2500000,  'AM', 4000),
    ('wwv_5Mhz',      5000000,  'AM', 4000),
    ('wwv_10Mhz',    10000000,  'AM', 4000),
    ('wwv_15Mhz',    15000000,  'AM', 4000),
    ('wwv_20Mhz',    20000000,  'AM', 4000),
    ('rwm_4996',      4996000,  'AM', 4000),
    ('rwm_9996',      9996000,  'AM', 4000),
    ('rwm_14996',    14996000,  'AM', 4000),
    ('chu_3330',      3330000, 'USB', 2250),
    ('chu_7850',      7850000, 'USB', 2250),
    ('chu_14670',    14670000, 'USB', 2250),
    ('buzzer',        4625000, 'USB', 2500),
    ('sqwheel_night', 3828000, 'USB', 2500),
    ('sqwheel_day',   5473000, 'USB', 2500),
    ('pip_night',     3756000, 'USB', 2500),
    ('pip_day',       5448000, 'USB', 2500),
]

parser = argparse.ArgumentParser(
        description='Monitor HF stations and record signal level'
    )
parser.add_argument(
    '-p', '--port', metavar='N', type=int, default=7356,
    help='gqrx port')
parser.add_argument(
    '--host', metavar='H', type=str, default='127.0.0.1',
    help='gqrx host')
parser.add_argument(
    '-s', '--sleep', metavar='S', type=int, default=300,
    help='sleep time before scans in seconds')
args = parser.parse_args()

client = GqrxClient(args.host, args.port)
print("Connected.", file=sys.stderr)

while True:
    print("Scanning frequencies...", file=sys.stderr)
    tstamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ")
    for (name, freq, mode, width) in scan:
        client.set_mode(mode, width)
        client.set_frequency(freq)
        levels = []
        for i in range(0, 50):
            levels += [ str(client.get_signal_strength()) ]
            time.sleep(.1)
        print("{};{};{}".format(tstamp, name, ";".join(levels)))
    print("Sleeping {} seconds".format(args.sleep), file=sys.stderr)
    time.sleep(args.sleep)

Номерные радиостанции нам уже знакомы. WWV и CHU — это радиостанции точного времени, расположенные недалеко от Форт-Коллинс, США и Оттавы, Канада. WWV и CHU широчайше известны среди радиолюбителей Северной Америки. В России сигналы от этих радиостанция бывают слышны не всегда. Зато у нас превосходно слышна RWM. Это радиостанция эталонного времени, расположенная в Москве.

Идея была в том, чтобы понаблюдать за уровнем сигнала от этих радиостанций в течение суток, а затем построить графики в Matplotlib. В итоге получилось довольно красиво.

Так, например, выглядят графики «скрипучего колеса»:

Уровень сигнала Скрипучего Колеса

На графике видно, что переключение радиостанции на дневную частоту было произведено около 05:00 UTC, а обратно на ночную — около 18:00 UTC. Время переключения соответствует приведенному на сайте priyom.org. По графику также можно сделать выводы о том, как менялось прохождение в течение дня.

График WWV вышел таким:

Уровень сигнала WWV

Видим, что максимальный уровен сигнала наблюдался около 10:00 UTC на частоте 15 МГц. Меня такой поворот ничуть не удивляет. Действительно, работая на 20 метрах в телеграфе на общий вызов где-то с 10:00 до 12:00 UTC, мне нередко удавалось провести QSO с Северной Америкой.

Полный код скрипта, строящего графики:

#!/usr/bin/env python3

import matplotlib as mpl
import matplotlib.pyplot as plt
import argparse
import csv
import re

parser = argparse.ArgumentParser(description='Plot dBFS vs time')
parser.add_argument(
    '-i', '--input', metavar='IF', type=str, required=True,
    help='input file')
parser.add_argument(
    '-o', '--output', metavar='OF', type=str, required=True,
    help='output file')
parser.add_argument(
    '-s', '--stations', metavar='S', type=str, required=True,
    help='comma separated list of station names')
args = parser.parse_args()

stations = args.stations.split(",")
hours = []
values = {}

with open(args.input, newline = '') as f:
    for row in csv.reader(f, delimiter = ';', quotechar = '"'):
        m = re.search("[\d-]{10} (\d{2}):", row[0])
        h = m.group(1)
        name = row[1]
        vals = [float(x) for x in row[2:]]
        new_val = round(max(vals))
        if name not in stations:
            continue
        if name not in values:
            values[name] = []
        if hours == [] or hours[-1] != h:
            hours += [h]
        if len(values[name]) < len(hours):
            # make a new list of values for a given hour
            values[name] += [[ new_val ]]
        else:
            # append to the list of values for a given hour
            values[name][-1] += [ new_val ]

dpi = 80
fig = plt.figure(dpi = dpi, figsize = (512 / dpi, 384 / dpi) )
mpl.rcParams.update({'font.size': 10})

plt.xlabel('UTC Time')
plt.ylabel('dBFS')

ax = plt.axes()
ax.yaxis.grid(True)

for name in values.keys():
    vals = [ min(x) for x in values[name] ]
    plt.plot(hours, vals, linestyle = 'solid', label = name)

plt.legend(loc='upper left', frameon = False)
fig.savefig(args.output)

Зачем все это может быть нужно? Допустим, меня интересует день, время и частота, оптимальные для приема какой-то конкретной радиостанции, например, Kyodo News. Я могу понаблюдать за ее частотами при помощи приведенных скриптов. Теперь мне известно, как проходила радиостанция в заданный день. С хорошей степенью уверенности можно утверждать, что условия прохождения будут похожими через 27 дней. В двух словах, эффект связан с периодом обращения Солнца вокруг своей оси при наблюдении с Земли. Этот момент более подробно описан в соответствующей литературе. Список книг, посвященных прохождению, ранее приводился в заметке Прогнозируем прохождение на КВ с помощью VOACAP.

Вот такая полезная наработка получилась. Как обычно, буду рад вашим вопросам и дополнениям.

Метки: , , .