#!/usr/bin/env python3 # qo-100-gqrx-afc.py ~ (c) Alex, R2AUK 2026 # https://eax.me/2026/2026-06-08-qo-100-gqrx-afc.html import argparse import glob import os import socket import time import numpy as np from scipy.signal import find_peaks class GqrxClient: def __init__(self, host, port): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._file = self._sock.makefile("r") def _send(self, command): self._sock.sendall((command + "\r\n").encode("ascii")) def _recv(self): return self._file.readline().strip() def _get_rprt(self): resp = self._recv() parts = resp.split() if len(parts) != 2 or parts[0] != "RPRT": raise Exception(f"Unexpected response: {resp}") code = int(parts[1]) if code != 0: raise Exception(f"Command failed with RPRT {code}") return code def get_hw_freq(self): """Returns current HW center (VFO) frequency in Hz.""" self._send("gqrx_get_hw_freq") return int(self._recv()) def get_rx_freq(self): """Returns current RX (filter) offset relative to HW center in Hz.""" self._send("gqrx_get_rx_freq") return int(self._recv()) def set_rx_freq(self, freq): """Sets RX (filter) offset relative to HW center in Hz.""" self._send(f"gqrx_set_rx_freq {int(freq)}") return self._get_rprt() def get_lnb_lo(self): """Returns LNB LO frequency in Hz.""" self._send("LNB_LO") return int(self._recv()) def set_lnb_lo(self, freq): """Sets LNB LO frequency in Hz.""" self._send(f"LNB_LO {int(freq)}") return self._get_rprt() def start_iq_record(self): """Starts IQ recording.""" self._send("U IQRECORD 1") return self._get_rprt() def stop_iq_record(self): """Stops IQ recording.""" self._send("U IQRECORD 0") return self._get_rprt() parser = argparse.ArgumentParser( description="Frequency correction by QO-100 beacon for Gqrx" ) parser.add_argument( "--host", default="127.0.0.1", help="Gqrx host (default: 127.0.0.1)") parser.add_argument( "--port", type=int, default=7356, help="Gqrx TCP port (default: 7356)") parser.add_argument( "--interval", type=float, default=10, help="Interval between recordings in seconds (default: 10)") parser.add_argument( "--error-margin", type=float, default=25, help="Skip correction if beacon frequency error is within this margin in Hz (default: 25)") parser.add_argument( "--fft-size", type=int, default=131072, help="Number of FFT points (default: 131072)") parser.add_argument( "--peak-threshold", type=float, default=0, help="Magnitude threshold for peak search; 0 = auto-detect on first recording (default: 0)") parser.add_argument( "--rec-iq-dir", default="/tmp", help="Directory where Gqrx saves IQ recordings (default: /tmp)") parser.add_argument( "--max-drift", type=float, default=1000, help="Skip correction if it exceeds this value in Hz; first correction always applied (default: 1000)") parser.add_argument( "--record-time", type=float, default=15, help="Duration of each IQ recording in seconds (default: 15)") args = parser.parse_args() BEACON_TRUE_FREQ = 10_489_500_500 # True if extended remote protocol commands are available. # See pull request https://github.com/gqrx-sdr/gqrx/pull/1456. # Set to False at startup if these commands are not implemented. extended_commands_available = True def compute_magnitude(samples, fft_size): samples = samples - np.mean(samples) # remove DC offset windowed = samples * np.hanning(fft_size) spectrum = np.fft.fft(windowed, n=fft_size) return np.fft.fftshift(np.abs(spectrum)) def raw_to_complex(raw): iq = np.frombuffer(raw, dtype=np.float32) return iq[0::2] + 1j * iq[1::2] def build_max_spectrum(filename, fft_size): half = fft_size // 2 bytes_per_sample = 8 # I + Q, float32 bytes_full = bytes_per_sample * fft_size bytes_half = bytes_per_sample * half with open(filename, "rb") as f: raw = f.read(bytes_full) if len(raw) < bytes_full: return None chunk = raw_to_complex(raw) max_spectrum = compute_magnitude(chunk, fft_size) while True: raw = f.read(bytes_half) if len(raw) < bytes_half: break chunk = np.concatenate([chunk[half:], raw_to_complex(raw)]) max_spectrum = np.maximum(max_spectrum, compute_magnitude(chunk, fft_size)) return max_spectrum def auto_peak_threshold(max_spectrum): return round(np.percentile(max_spectrum, 99)) def find_beacon_frequency(max_spectrum, sampling_rate, fft_size, threshold): bin_width = sampling_rate / fft_size freqs = np.fft.fftshift(np.fft.fftfreq(fft_size, d=1.0 / sampling_rate)) peaks, _ = find_peaks(max_spectrum, height=threshold, distance=round(200 / bin_width)) if len(peaks) == 0: return None for i in range(min(3, len(peaks) - 1)): freq_diff = abs(freqs[peaks[i + 1]] - freqs[peaks[i]]) if abs(freq_diff - 400) <= 50: return freqs[peaks[i]] return None def find_raw_files(): return set(glob.glob(os.path.join(args.rec_iq_dir, "gqrx_*.raw"))) def main_loop(): first_correction_done = False while True: print("--------") if extended_commands_available: hw_freq_before = client.get_hw_freq() old_files = find_raw_files() print("Recording I/Q data...") client.start_iq_record() time.sleep(args.record_time) client.stop_iq_record() print("Recording stopped.") new_files = find_raw_files() - old_files if not new_files: print("No recording files found, retrying...") continue if len(new_files) != 1: print("Too many new files. Keeping them and retrying...") continue filename = new_files.pop() basename = os.path.basename(filename) if extended_commands_available and client.get_hw_freq() != hw_freq_before: os.remove(filename) print("HW frequency changed during recording, retrying...") continue parts = basename.split("_") hardware_center = int(parts[3]) sampling_rate = int(parts[4]) print(f"Processing file {basename} with sampling rate {sampling_rate}...") max_spectrum = build_max_spectrum(filename, args.fft_size) os.remove(filename) if args.peak_threshold == 0: args.peak_threshold = auto_peak_threshold(max_spectrum) print(f"Auto peak threshold: {args.peak_threshold}") beacon_freq = find_beacon_frequency(max_spectrum, sampling_rate, args.fft_size, args.peak_threshold) if beacon_freq is None: print("Beacon not found, retrying...") continue lnb_lo = client.get_lnb_lo() center_displayed = lnb_lo + hardware_center beacon_displayed = center_displayed + beacon_freq correction = beacon_displayed - BEACON_TRUE_FREQ new_lnb_lo = round((lnb_lo - correction) / args.error_margin) * args.error_margin print(f"Beacon offset: {beacon_freq} Hz, сorrection: {round(correction)} Hz") if abs(correction) <= args.error_margin: print("Correction is within error margin, skipping.") first_correction_done = True elif not first_correction_done or abs(correction) <= args.max_drift: print(f"Updating LNB LO: {int(new_lnb_lo)} Hz") lnb_lo_delta = new_lnb_lo - lnb_lo client.set_lnb_lo(new_lnb_lo) first_correction_done = True if extended_commands_available: rx_freq = client.get_rx_freq() client.set_rx_freq(rx_freq - lnb_lo_delta) else: print(f"Correction exceeds max drift ({args.max_drift} Hz), skipping.") time.sleep(args.interval) client = GqrxClient(args.host, args.port) client.stop_iq_record() try: client.get_hw_freq() except Exception: print("WARNING: extended remote protocol commands are not available") extended_commands_available = False try: main_loop() finally: print("Stopping IQ recording...") client.stop_iq_record()