Merge pull request #1302 from AlexandreRouma/master

keep new_rds branch updated
This commit is contained in:
AlexandreRouma
2024-01-29 01:45:32 +01:00
committed by GitHub
48 changed files with 1338 additions and 600 deletions

View File

@@ -141,10 +141,10 @@ public:
return;
}
}
catch (std::exception e) {
catch (const std::exception& e) {
char buf[1024];
sprintf(buf, "%016" PRIX64, serial);
flog::error("Could not open Airspy {0}", buf);
flog::error("Could not open Airspy {}", buf);
}
selectedSerial = serial;

View File

@@ -144,10 +144,10 @@ public:
return;
}
}
catch (std::exception e) {
catch (const std::exception& e) {
char buf[1024];
sprintf(buf, "%016" PRIX64, serial);
flog::error("Could not open Airspy HF+ {0}", buf);
flog::error("Could not open Airspy HF+ {}", buf);
}
selectedSerial = serial;

View File

@@ -35,6 +35,10 @@ public:
AudioSourceModule(std::string name) {
this->name = name;
#if RTAUDIO_VERSION_MAJOR >= 6
audio.setErrorCallback(&errorCallback);
#endif
sampleRate = 48000.0;
handler.ctx = this;
@@ -83,21 +87,28 @@ public:
void refresh() {
devices.clear();
#if RTAUDIO_VERSION_MAJOR >= 6
for (int i : audio.getDeviceIds()) {
#else
int count = audio.getDeviceCount();
for (int i = 0; i < count; i++) {
#endif
try {
// Get info
auto info = audio.getDeviceInfo(i);
#if !defined(RTAUDIO_VERSION_MAJOR) || RTAUDIO_VERSION_MAJOR < 6
if (!info.probed) { continue; }
#endif
// Check that it has a stereo input
if (info.probed && info.inputChannels < 2) { continue; }
if (info.inputChannels < 2) { continue; }
// Save info
DeviceInfo dinfo = { info, i };
devices.define(info.name, info.name, dinfo);
}
catch (std::exception e) {
flog::error("Error getting audio device info: {0}", e.what());
catch (const std::exception& e) {
flog::error("Error getting audio device ({}) info: {}", i, e.what());
}
}
}
@@ -189,11 +200,11 @@ private:
_this->audio.startStream();
_this->running = true;
}
catch (std::exception e) {
flog::error("Error opening audio device: {0}", e.what());
catch (const std::exception& e) {
flog::error("Error opening audio device: {}", e.what());
}
flog::info("AudioSourceModule '{0}': Start!", _this->name);
flog::info("AudioSourceModule '{}': Start!", _this->name);
}
static void stop(void* ctx) {
@@ -254,6 +265,22 @@ private:
return 0;
}
#if RTAUDIO_VERSION_MAJOR >= 6
static void errorCallback(RtAudioErrorType type, const std::string& errorText) {
switch (type) {
case RtAudioErrorType::RTAUDIO_NO_ERROR:
return;
case RtAudioErrorType::RTAUDIO_WARNING:
case RtAudioErrorType::RTAUDIO_NO_DEVICES_FOUND:
case RtAudioErrorType::RTAUDIO_DEVICE_DISCONNECT:
flog::warn("AudioSourceModule Warning: {} ({})", errorText, (int)type);
break;
default:
throw std::runtime_error(errorText);
}
}
#endif
std::string name;
bool enabled = true;
dsp::stream<dsp::complex_t> stream;
@@ -290,4 +317,4 @@ MOD_EXPORT void _DELETE_INSTANCE_(ModuleManager::Instance* instance) {
MOD_EXPORT void _END_() {
config.disableAutoSave();
config.save();
}
}

View File

@@ -139,8 +139,8 @@ private:
//gui::freqSelect.maxFreq = _this->centerFreq + (_this->sampleRate/2);
//gui::freqSelect.limitFreq = true;
}
catch (std::exception& e) {
flog::error("Error: {0}", e.what());
catch (const std::exception& e) {
flog::error("Error: {}", e.what());
}
config.acquire();
config.conf["path"] = _this->fileSelect.path;

View File

@@ -2,6 +2,13 @@
#include <utils/flog.h>
namespace hermes {
const int SAMPLERATE_LIST[] = {
48000,
96000,
192000,
384000
};
Client::Client(std::shared_ptr<net::Socket> sock) {
this->sock = sock;
@@ -33,6 +40,7 @@ namespace hermes {
void Client::setSamplerate(HermesLiteSamplerate samplerate) {
writeReg(0, (uint32_t)samplerate << 24);
blockSize = SAMPLERATE_LIST[samplerate] / 200;
}
void Client::setFrequency(double freq) {
@@ -157,12 +165,15 @@ namespace hermes {
void Client::worker() {
uint8_t rbuf[2048];
MetisUSBPacket* pkt = (MetisUSBPacket*)rbuf;
int sampleCount = 0;
while (true) {
// Wait for a packet or exit if connection closed
int len = sock->recv(rbuf, 2048);
if (len <= 0) { break; }
// Ignore anything that's not a USB packet
// TODO: Gotta check the endpoint
if (htons(pkt->hdr.signature) != HERMES_METIS_SIGNATURE || pkt->hdr.type != METIS_PKT_USB) {
continue;
}
@@ -183,9 +194,10 @@ namespace hermes {
flog::warn("Got response! Reg={0}, Seq={1}", reg, (uint32_t)htonl(pkt->seq));
}
// Decode and send IQ to stream
// Decode and save IQ to buffer
uint8_t* iq = &frame[8];
for (int i = 0; i < 63; i++) {
dsp::complex_t* writeBuf = &out.writeBuf[sampleCount];
for (int i = 0; i < HERMES_SAMPLES_PER_FRAME; i++) {
// Convert to 32bit
int32_t si = ((uint32_t)iq[(i*8) + 0] << 16) | ((uint32_t)iq[(i*8) + 1] << 8) | (uint32_t)iq[(i*8) + 2];
int32_t sq = ((uint32_t)iq[(i*8) + 3] << 16) | ((uint32_t)iq[(i*8) + 4] << 8) | (uint32_t)iq[(i*8) + 5];
@@ -195,18 +207,23 @@ namespace hermes {
sq = (sq << 8) >> 8;
// Convert to float (IQ swapped for some reason)
out.writeBuf[i].im = (float)si / (float)0x1000000;
out.writeBuf[i].re = (float)sq / (float)0x1000000;
writeBuf[i].im = (float)si / (float)0x1000000;
writeBuf[i].re = (float)sq / (float)0x1000000;
}
sampleCount += HERMES_SAMPLES_PER_FRAME;
// If enough samples are in the buffer, send to stream
if (sampleCount >= blockSize) {
out.swap(sampleCount);
sampleCount = 0;
}
out.swap(63);
// TODO: Buffer the data to avoid having a very high DSP frame rate
}
}
}
std::vector<Info> discover() {
// TODO: Maybe try to instead detect on each interface as a work around for 0.0.0.0 not receiving anything?
auto sock = net::openudp("0.0.0.0", 1024);
// Open a UDP broadcast socket (TODO: Figure out why 255.255.255.255 doesn't work on windows with local = 0.0.0.0)
auto sock = net::openudp("255.255.255.255", 1024, "0.0.0.0", 0, true);
// Build discovery packet
uint8_t discoveryPkt[64];
@@ -225,6 +242,7 @@ namespace hermes {
}
}
// Await all responses
std::vector<Info> devices;
while (true) {
// Wait for a response
@@ -258,7 +276,9 @@ namespace hermes {
devices.push_back(info);
}
// Close broadcast socket
sock->close();
return devices;
}

View File

@@ -7,11 +7,12 @@
#include <string>
#include <thread>
#define HERMES_METIS_REPEAT 5
#define HERMES_METIS_TIMEOUT 1000
#define HERMES_METIS_SIGNATURE 0xEFFE
#define HERMES_HPSDR_USB_SYNC 0x7F
#define HERMES_I2C_DELAY 50
#define HERMES_METIS_REPEAT 5
#define HERMES_METIS_TIMEOUT 1000
#define HERMES_METIS_SIGNATURE 0xEFFE
#define HERMES_HPSDR_USB_SYNC 0x7F
#define HERMES_I2C_DELAY 50
#define HERMES_SAMPLES_PER_FRAME 63
namespace hermes {
enum MetisPacketType {
@@ -140,7 +141,7 @@ namespace hermes {
dsp::stream<dsp::complex_t> out;
//private:
private:
void sendMetisUSB(uint8_t endpoint, void* frame0, void* frame1 = NULL);
void sendMetisControl(MetisControl ctrl);
@@ -149,12 +150,12 @@ namespace hermes {
void writeI2C(I2CPort port, uint8_t addr, uint8_t reg, uint8_t data);
void worker();
double freq = 0;
int blockSize = 63;
std::thread workerThread;
std::shared_ptr<net::Socket> sock;
uint32_t usbSeq = 0;

View File

@@ -17,7 +17,7 @@ SDRPP_MOD_INFO{
/* Name: */ "hermes_source",
/* Description: */ "Hermes Lite 2 source module for SDR++",
/* Author: */ "Ryzerth",
/* Version: */ 0, 1, 0,
/* Version: */ 0, 1, 1,
/* Max instances */ 1
};

View File

@@ -7,7 +7,9 @@
#include <gui/smgui.h>
#include <iio.h>
#include <ad9361.h>
#include <utils/optionlist.h>
#include <algorithm>
#include <regex>
#define CONCAT(a, b) ((std::string(a) + b).c_str())
@@ -15,16 +17,10 @@ SDRPP_MOD_INFO{
/* Name: */ "plutosdr_source",
/* Description: */ "PlutoSDR source module for SDR++",
/* Author: */ "Ryzerth",
/* Version: */ 0, 1, 0,
/* Version: */ 0, 2, 0,
/* Max instances */ 1
};
const char* gainModes[] = {
"manual", "fast_attack", "slow_attack", "hybrid"
};
const char* gainModesTxt = "Manual\0Fast Attack\0Slow Attack\0Hybrid\0";
ConfigManager config;
class PlutoSDRSourceModule : public ModuleManager::Instance {
@@ -32,34 +28,34 @@ public:
PlutoSDRSourceModule(std::string name) {
this->name = name;
// Define valid samplerates
for (int sr = 1000000; sr <= 61440000; sr += 500000) {
samplerates.define(sr, getBandwdithScaled(sr), sr);
}
samplerates.define(61440000, getBandwdithScaled(61440000.0), 61440000.0);
// Define valid bandwidths
bandwidths.define(0, "Auto", 0);
for (int bw = 1000000.0; bw <= 52000000; bw += 500000) {
bandwidths.define(bw, getBandwdithScaled(bw), bw);
}
// Define gain modes
gainModes.define("manual", "Manual", "manual");
gainModes.define("fast_attack", "Fast Attack", "fast_attack");
gainModes.define("slow_attack", "Slow Attack", "slow_attack");
gainModes.define("hybrid", "Hybrid", "hybrid");
// Enumerate devices
refresh();
// Select device
config.acquire();
std::string _ip = config.conf["IP"];
strcpy(&ip[3], _ip.c_str());
sampleRate = config.conf["sampleRate"];
gainMode = config.conf["gainMode"];
gain = config.conf["gain"];
devDesc = config.conf["device"];
config.release();
select(devDesc);
// Generate the samplerate list and find srId
bool found = false;
int id = 0;
for (double sr = 1000000; sr <= 20000000; sr += 500000) {
sampleRates.push_back(sr);
sampleRatesTxt += getBandwdithScaled(sr);
sampleRatesTxt += '\0';
if (sr == sampleRate) {
found = true;
srId = id;
}
id++;
}
if (!found) {
srId = 0;
sampleRate = sampleRates[0];
}
// Register source
handler.ctx = this;
handler.selectHandler = menuSelected;
handler.deselectHandler = menuDeselected;
@@ -105,9 +101,157 @@ private:
return std::string(buf);
}
void refresh() {
// Clear device list
devices.clear();
// Create scan context
iio_scan_context* sctx = iio_create_scan_context(NULL, 0);
if (!sctx) {
flog::error("Failed get scan context");
return;
}
// Create parsing regexes
std::regex backendRgx(".+(?=:)", std::regex::ECMAScript);
std::regex modelRgx("\\(.+(?=\\),)", std::regex::ECMAScript);
std::regex serialRgx("serial=[0-9A-Za-z]+", std::regex::ECMAScript);
// Enumerate devices
iio_context_info** ctxInfoList;
ssize_t count = iio_scan_context_get_info_list(sctx, &ctxInfoList);
if (count < 0) {
flog::error("Failed to enumerate contexts");
return;
}
for (ssize_t i = 0; i < count; i++) {
iio_context_info* info = ctxInfoList[i];
std::string desc = iio_context_info_get_description(info);
std::string duri = iio_context_info_get_uri(info);
// If the device is not a plutosdr, don't include it
if (desc.find("PlutoSDR") == std::string::npos) {
flog::warn("Ignored IIO device: [{}] {}", duri, desc);
continue;
}
// Extract the backend
std::string backend = "unknown";
std::smatch backendMatch;
if (std::regex_search(duri, backendMatch, backendRgx)) {
backend = backendMatch[0];
}
// Extract the model
std::string model = "Unknown";
std::smatch modelMatch;
if (std::regex_search(desc, modelMatch, modelRgx)) {
model = modelMatch[0];
int parenthPos = model.find('(');
if (parenthPos != std::string::npos) {
model = model.substr(parenthPos+1);
}
}
// Extract the serial
std::string serial = "unknown";
std::smatch serialMatch;
if (std::regex_search(desc, serialMatch, serialRgx)) {
serial = serialMatch[0].str().substr(7);
}
// Construct the device name
std::string devName = '(' + backend + ") " + model + " [" + serial + ']';
// Save device
devices.define(desc, devName, duri);
}
iio_context_info_list_free(ctxInfoList);
// Destroy scan context
iio_scan_context_destroy(sctx);
#ifdef __ANDROID__
// On Android, a default IP entry must be made (TODO: This is not ideal since the IP cannot be changed)
const char* androidURI = "ip:192.168.2.1";
const char* androidName = "Default (192.168.2.1)";
devices.define(androidName, androidName, androidURI);
#endif
}
void select(const std::string& desc) {
// If no device is available, give up
if (devices.empty()) {
devDesc.clear();
return;
}
// If the device is not available, select the first one
if (!devices.keyExists(desc)) {
select(devices.key(0));
}
// Update URI
devDesc = desc;
uri = devices.value(devices.keyId(desc));
// TODO: Enumerate capabilities
// Load defaults
samplerate = 4000000;
bandwidth = 0;
gmId = 0;
gain = -1.0f;
// Load device config
config.acquire();
if (config.conf["devices"][devDesc].contains("samplerate")) {
samplerate = config.conf["devices"][devDesc]["samplerate"];
}
if (config.conf["devices"][devDesc].contains("bandwidth")) {
bandwidth = config.conf["devices"][devDesc]["bandwidth"];
}
if (config.conf["devices"][devDesc].contains("gainMode")) {
// Select given gain mode or default if invalid
std::string gm = config.conf["devices"][devDesc]["gainMode"];
if (gainModes.keyExists(gm)) {
gmId = gainModes.keyId(gm);
}
else {
gmId = 0;
}
}
if (config.conf["devices"][devDesc].contains("gain")) {
gain = config.conf["devices"][devDesc]["gain"];
gain = std::clamp<int>(gain, -1.0f, 73.0f);
}
config.release();
// Update samplerate ID
if (samplerates.keyExists(samplerate)) {
srId = samplerates.keyId(samplerate);
}
else {
srId = 0;
samplerate = samplerates.value(srId);
}
// Update bandwidth ID
if (bandwidths.keyExists(bandwidth)) {
bwId = bandwidths.keyId(bandwidth);
}
else {
bwId = 0;
bandwidth = bandwidths.value(bwId);
}
// Update core samplerate
core::setInputSampleRate(samplerate);
}
static void menuSelected(void* ctx) {
PlutoSDRSourceModule* _this = (PlutoSDRSourceModule*)ctx;
core::setInputSampleRate(_this->sampleRate);
core::setInputSampleRate(_this->samplerate);
flog::info("PlutoSDRSourceModule '{0}': Menu Select!", _this->name);
}
@@ -120,12 +264,17 @@ private:
PlutoSDRSourceModule* _this = (PlutoSDRSourceModule*)ctx;
if (_this->running) { return; }
// TODO: INIT CONTEXT HERE
_this->ctx = iio_create_context_from_uri(_this->ip);
// If no device is selected, give up
if (_this->devDesc.empty() || _this->uri.empty()) { return; }
// Open context
_this->ctx = iio_create_context_from_uri(_this->uri.c_str());
if (_this->ctx == NULL) {
flog::error("Could not open pluto");
flog::error("Could not open pluto ({})", _this->uri);
return;
}
// Get phy and device handle
_this->phy = iio_context_find_device(_this->ctx, "ad9361-phy");
if (_this->phy == NULL) {
flog::error("Could not connect to pluto phy");
@@ -139,17 +288,27 @@ private:
return;
}
// Configure pluto
// Get RX channels
_this->rxChan = iio_device_find_channel(_this->phy, "voltage0", false);
_this->rxLO = iio_device_find_channel(_this->phy, "altvoltage0", true);
// Enable RX LO and disable TX
iio_channel_attr_write_bool(iio_device_find_channel(_this->phy, "altvoltage1", true), "powerdown", true);
iio_channel_attr_write_bool(iio_device_find_channel(_this->phy, "altvoltage0", true), "powerdown", false);
iio_channel_attr_write_bool(_this->rxLO, "powerdown", false);
iio_channel_attr_write(iio_device_find_channel(_this->phy, "voltage0", false), "rf_port_select", "A_BALANCED");
iio_channel_attr_write_longlong(iio_device_find_channel(_this->phy, "altvoltage0", true), "frequency", round(_this->freq)); // Freq
iio_channel_attr_write_longlong(iio_device_find_channel(_this->phy, "voltage0", false), "sampling_frequency", round(_this->sampleRate)); // Sample rate
iio_channel_attr_write(iio_device_find_channel(_this->phy, "voltage0", false), "gain_control_mode", gainModes[_this->gainMode]); // manual gain
iio_channel_attr_write_longlong(iio_device_find_channel(_this->phy, "voltage0", false), "hardwaregain", round(_this->gain)); // gain
ad9361_set_bb_rate(_this->phy, round(_this->sampleRate));
// Configure RX channel
iio_channel_attr_write(_this->rxChan, "rf_port_select", "A_BALANCED");
iio_channel_attr_write_longlong(_this->rxLO, "frequency", round(_this->freq)); // Freq
iio_channel_attr_write_bool(_this->rxChan, "filter_fir_en", true); // Digital filter
iio_channel_attr_write_longlong(_this->rxChan, "sampling_frequency", round(_this->samplerate)); // Sample rate
iio_channel_attr_write_double(_this->rxChan, "hardwaregain", _this->gain); // Gain
iio_channel_attr_write(_this->rxChan, "gain_control_mode", _this->gainModes.value(_this->gmId).c_str()); // Gain mode
_this->setBandwidth(_this->bandwidth);
// Configure the ADC filters
ad9361_set_bb_rate(_this->phy, round(_this->samplerate));
// Start worker thread
_this->running = true;
_this->workerThread = std::thread(worker, _this);
flog::info("PlutoSDRSourceModule '{0}': Start!", _this->name);
@@ -158,12 +317,14 @@ private:
static void stop(void* ctx) {
PlutoSDRSourceModule* _this = (PlutoSDRSourceModule*)ctx;
if (!_this->running) { return; }
// Stop worker thread
_this->running = false;
_this->stream.stopWriter();
_this->workerThread.join();
_this->stream.clearWriteStop();
// DESTROY CONTEXT HERE
// Close device
if (_this->ctx != NULL) {
iio_context_destroy(_this->ctx);
_this->ctx = NULL;
@@ -176,8 +337,8 @@ private:
PlutoSDRSourceModule* _this = (PlutoSDRSourceModule*)ctx;
_this->freq = freq;
if (_this->running) {
// SET PLUTO FREQ HERE
iio_channel_attr_write_longlong(iio_device_find_channel(_this->phy, "altvoltage0", true), "frequency", round(freq));
// Tune device
iio_channel_attr_write_longlong(_this->rxLO, "frequency", round(freq));
}
flog::info("PlutoSDRSourceModule '{0}': Tune: {1}!", _this->name, freq);
}
@@ -186,120 +347,184 @@ private:
PlutoSDRSourceModule* _this = (PlutoSDRSourceModule*)ctx;
if (_this->running) { SmGui::BeginDisabled(); }
SmGui::LeftLabel("IP");
SmGui::FillWidth();
if (SmGui::InputText(CONCAT("##_pluto_ip_", _this->name), &_this->ip[3], 16)) {
SmGui::ForceSync();
if (SmGui::Combo("##plutosdr_dev_sel", &_this->devId, _this->devices.txt)) {
_this->select(_this->devices.key(_this->devId));
config.acquire();
config.conf["IP"] = &_this->ip[3];
config.conf["device"] = _this->devices.key(_this->devId);
config.release(true);
}
SmGui::LeftLabel("Samplerate");
if (SmGui::Combo(CONCAT("##_pluto_sr_", _this->name), &_this->srId, _this->samplerates.txt)) {
_this->samplerate = _this->samplerates.value(_this->srId);
core::setInputSampleRate(_this->samplerate);
if (!_this->devDesc.empty()) {
config.acquire();
config.conf["devices"][_this->devDesc]["samplerate"] = _this->samplerate;
config.release(true);
}
}
// Refresh button
SmGui::SameLine();
SmGui::FillWidth();
if (SmGui::Combo(CONCAT("##_pluto_sr_", _this->name), &_this->srId, _this->sampleRatesTxt.c_str())) {
_this->sampleRate = _this->sampleRates[_this->srId];
core::setInputSampleRate(_this->sampleRate);
config.acquire();
config.conf["sampleRate"] = _this->sampleRate;
config.release(true);
SmGui::ForceSync();
if (SmGui::Button(CONCAT("Refresh##_pluto_refr_", _this->name))) {
_this->refresh();
_this->select(_this->devDesc);
}
if (_this->running) { SmGui::EndDisabled(); }
SmGui::LeftLabel("Bandwidth");
SmGui::FillWidth();
if (SmGui::Combo(CONCAT("##_pluto_bw_", _this->name), &_this->bwId, _this->bandwidths.txt)) {
_this->bandwidth = _this->bandwidths.value(_this->bwId);
if (_this->running) {
_this->setBandwidth(_this->bandwidth);
}
if (!_this->devDesc.empty()) {
config.acquire();
config.conf["devices"][_this->devDesc]["bandwidth"] = _this->bandwidth;
config.release(true);
}
}
SmGui::LeftLabel("Gain Mode");
SmGui::FillWidth();
SmGui::ForceSync();
if (SmGui::Combo(CONCAT("##_gainmode_select_", _this->name), &_this->gainMode, gainModesTxt)) {
if (SmGui::Combo(CONCAT("##_pluto_gainmode_select_", _this->name), &_this->gmId, _this->gainModes.txt)) {
if (_this->running) {
iio_channel_attr_write(iio_device_find_channel(_this->phy, "voltage0", false), "gain_control_mode", gainModes[_this->gainMode]);
iio_channel_attr_write(_this->rxChan, "gain_control_mode", _this->gainModes.value(_this->gmId).c_str());
}
if (!_this->devDesc.empty()) {
config.acquire();
config.conf["devices"][_this->devDesc]["gainMode"] = _this->gainModes.key(_this->gmId);
config.release(true);
}
config.acquire();
config.conf["gainMode"] = _this->gainMode;
config.release(true);
}
SmGui::LeftLabel("PGA Gain");
if (_this->gainMode) { SmGui::BeginDisabled(); }
SmGui::LeftLabel("Gain");
if (_this->gmId) { SmGui::BeginDisabled(); }
SmGui::FillWidth();
if (SmGui::SliderFloat(CONCAT("##_gain_select_", _this->name), &_this->gain, 0, 76)) {
if (SmGui::SliderFloatWithSteps(CONCAT("##_pluto_gain__", _this->name), &_this->gain, -1.0f, 73.0f, 1.0f, SmGui::FMT_STR_FLOAT_DB_NO_DECIMAL)) {
if (_this->running) {
iio_channel_attr_write_longlong(iio_device_find_channel(_this->phy, "voltage0", false), "hardwaregain", round(_this->gain));
iio_channel_attr_write_double(_this->rxChan, "hardwaregain", _this->gain);
}
if (!_this->devDesc.empty()) {
config.acquire();
config.conf["devices"][_this->devDesc]["gain"] = _this->gain;
config.release(true);
}
config.acquire();
config.conf["gain"] = _this->gain;
config.release(true);
}
if (_this->gainMode) { SmGui::EndDisabled(); }
if (_this->gmId) { SmGui::EndDisabled(); }
}
void setBandwidth(int bw) {
if (bw > 0) {
iio_channel_attr_write_longlong(rxChan, "rf_bandwidth", bw);
}
else {
iio_channel_attr_write_longlong(rxChan, "rf_bandwidth", std::min<int>(samplerate, 52000000));
}
}
static void worker(void* ctx) {
PlutoSDRSourceModule* _this = (PlutoSDRSourceModule*)ctx;
int blockSize = _this->sampleRate / 200.0f;
int blockSize = _this->samplerate / 200.0f;
struct iio_channel *rx0_i, *rx0_q;
struct iio_buffer* rxbuf;
rx0_i = iio_device_find_channel(_this->dev, "voltage0", 0);
rx0_q = iio_device_find_channel(_this->dev, "voltage1", 0);
// Acquire channels
iio_channel* rx0_i = iio_device_find_channel(_this->dev, "voltage0", 0);
iio_channel* rx0_q = iio_device_find_channel(_this->dev, "voltage1", 0);
if (!rx0_i || !rx0_q) {
flog::error("Failed to acquire RX channels");
return;
}
// Start streaming
iio_channel_enable(rx0_i);
iio_channel_enable(rx0_q);
rxbuf = iio_device_create_buffer(_this->dev, blockSize, false);
// Allocate buffer
iio_buffer* rxbuf = iio_device_create_buffer(_this->dev, blockSize, false);
if (!rxbuf) {
flog::error("Could not create RX buffer");
return;
}
// Receive loop
while (true) {
// Read samples here
// TODO: RECEIVE HERE
// Read samples
iio_buffer_refill(rxbuf);
// Get buffer pointer
int16_t* buf = (int16_t*)iio_buffer_first(rxbuf, rx0_i);
if (!buf) { break; }
for (int i = 0; i < blockSize; i++) {
_this->stream.writeBuf[i].re = (float)buf[i * 2] / 32768.0f;
_this->stream.writeBuf[i].im = (float)buf[(i * 2) + 1] / 32768.0f;
}
// Convert samples to CF32
volk_16i_s32f_convert_32f((float*)_this->stream.writeBuf, buf, 32768.0f, blockSize * 2);
// Send out the samples
if (!_this->stream.swap(blockSize)) { break; };
}
// Stop streaming
iio_channel_disable(rx0_i);
iio_channel_disable(rx0_q);
// Free buffer
iio_buffer_destroy(rxbuf);
}
std::string name;
bool enabled = true;
dsp::stream<dsp::complex_t> stream;
float sampleRate;
SourceManager::SourceHandler handler;
std::thread workerThread;
struct iio_context* ctx = NULL;
struct iio_device* phy = NULL;
struct iio_device* dev = NULL;
iio_context* ctx = NULL;
iio_device* phy = NULL;
iio_device* dev = NULL;
iio_channel* rxLO = NULL;
iio_channel* rxChan = NULL;
bool running = false;
bool ipMode = true;
double freq;
char ip[1024] = "ip:192.168.2.1";
int gainMode = 0;
float gain = 0;
int srId = 0;
std::vector<double> sampleRates;
std::string sampleRatesTxt;
std::string devDesc = "";
std::string uri = "";
double freq;
int samplerate = 4000000;
int bandwidth = 0;
float gain = -1;
int devId = 0;
int srId = 0;
int bwId = 0;
int gmId = 0;
OptionList<std::string, std::string> devices;
OptionList<int, double> samplerates;
OptionList<int, double> bandwidths;
OptionList<std::string, std::string> gainModes;
};
MOD_EXPORT void _INIT_() {
json defConf;
defConf["IP"] = "192.168.2.1";
defConf["sampleRate"] = 4000000.0f;
defConf["gainMode"] = 0;
defConf["gain"] = 0.0f;
json defConf = {};
defConf["device"] = "";
defConf["devices"] = {};
config.setPath(core::args["root"].s() + "/plutosdr_source_config.json");
config.load(defConf);
config.enableAutoSave();
// Reset the configuration if the old format is still used
config.acquire();
if (!config.conf.contains("device") || !config.conf.contains("devices")) {
config.conf = defConf;
config.release(true);
}
else {
config.release();
}
}
MOD_EXPORT ModuleManager::Instance* _CREATE_INSTANCE_(std::string name) {

View File

@@ -17,7 +17,7 @@ SDRPP_MOD_INFO{
/* Name: */ "rfspace_source",
/* Description: */ "RFspace source module for SDR++",
/* Author: */ "Ryzerth",
/* Version: */ 0, 1, 0,
/* Version: */ 0, 1, 1,
/* Max instances */ 1
};
@@ -154,8 +154,8 @@ private:
_this->client = rfspace::connect(_this->hostname, _this->port, &_this->stream);
_this->deviceInit();
}
catch (std::exception e) {
flog::error("Could not connect to SDR: {0}", e.what());
catch (const std::exception& e) {
flog::error("Could not connect to SDR: {}", e.what());
}
}
else if (connected && SmGui::Button("Disconnect##rfspace_source")) {
@@ -231,7 +231,7 @@ private:
}
// Create samplerate list
auto srs = client->getValidSampleRates();
auto srs = client->getSamplerates();
sampleRates.clear();
for (auto& sr : srs) {
sampleRates.define(sr, getBandwdithScaled(sr), sr);
@@ -317,7 +317,7 @@ private:
dsp::stream<dsp::complex_t> stream;
SourceManager::SourceHandler handler;
rfspace::RFspaceClient client;
std::shared_ptr<rfspace::Client> client;
};
MOD_EXPORT void _INIT_() {

View File

@@ -6,15 +6,13 @@
using namespace std::chrono_literals;
namespace rfspace {
RFspaceClientClass::RFspaceClientClass(net::Conn conn, net::Conn udpConn, dsp::stream<dsp::complex_t>* out) {
client = std::move(conn);
udpClient = std::move(udpConn);
Client::Client(std::shared_ptr<net::Socket> tcp, std::shared_ptr<net::Socket> udp, dsp::stream<dsp::complex_t>* out) {
this->tcp = tcp;
this->udp = udp;
output = out;
// Allocate buffers
rbuffer = new uint8_t[RFSPACE_MAX_SIZE];
sbuffer = new uint8_t[RFSPACE_MAX_SIZE];
ubuffer = new uint8_t[RFSPACE_MAX_SIZE];
// Clear write stop of stream just in case
output->clearWriteStop();
@@ -22,9 +20,9 @@ namespace rfspace {
// Send UDP packet so that a router opens the port
sendDummyUDP();
// Start readers
client->readAsync(sizeof(tcpHeader), (uint8_t*)&tcpHeader, tcpHandler, this);
udpClient->readAsync(RFSPACE_MAX_SIZE, ubuffer, udpHandler, this);
// Start workers
tcpWorkerThread = std::thread(&Client::tcpWorker, this);
udpWorkerThread = std::thread(&Client::udpWorker, this);
// Get device ID and wait for response
getControlItem(RFSPACE_CTRL_ITEM_PROD_ID, NULL, 0);
@@ -43,22 +41,20 @@ namespace rfspace {
setPort(RFSPACE_RF_PORT_1);
// Start heartbeat
heartBeatThread = std::thread(&RFspaceClientClass::heartBeatWorker, this);
heartBeatThread = std::thread(&Client::heartBeatWorker, this);
}
RFspaceClientClass::~RFspaceClientClass() {
Client::~Client() {
close();
delete[] rbuffer;
delete[] sbuffer;
delete[] ubuffer;
}
void RFspaceClientClass::sendDummyUDP() {
void Client::sendDummyUDP() {
uint8_t dummy = 0x5A;
udpClient->write(1, &dummy);
udp->send(&dummy, 1);
}
int RFspaceClientClass::getControlItem(ControlItem item, void* param, int len) {
int Client::getControlItem(ControlItem item, void* param, int len) {
// Build packet
uint16_t* header = (uint16_t*)&sbuffer[0];
uint16_t* item_val = (uint16_t*)&sbuffer[2];
@@ -66,12 +62,12 @@ namespace rfspace {
*item_val = item;
// Send packet
client->write(4, sbuffer);
tcp->send(sbuffer, 4);
return -1;
}
void RFspaceClientClass::setControlItem(ControlItem item, void* param, int len) {
void Client::setControlItem(ControlItem item, void* param, int len) {
// Build packet
uint16_t* header = (uint16_t*)&sbuffer[0];
uint16_t* item_val = (uint16_t*)&sbuffer[2];
@@ -80,10 +76,10 @@ namespace rfspace {
memcpy(&sbuffer[4], param, len);
// Send packet
client->write(len + 4, sbuffer);
tcp->send(sbuffer, len + 4);
}
void RFspaceClientClass::setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len) {
void Client::setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len) {
// Build packet
uint16_t* header = (uint16_t*)&sbuffer[0];
uint16_t* item_val = (uint16_t*)&sbuffer[2];
@@ -94,10 +90,10 @@ namespace rfspace {
memcpy(&sbuffer[5], param, len);
// Send packet
client->write(len + 5, sbuffer);
tcp->send(sbuffer, len + 5);
}
std::vector<uint32_t> RFspaceClientClass::getValidSampleRates() {
std::vector<uint32_t> Client::getSamplerates() {
std::vector<uint32_t> sr;
switch (deviceId) {
@@ -119,92 +115,145 @@ namespace rfspace {
return sr;
}
void RFspaceClientClass::setFrequency(uint64_t freq) {
void Client::setFrequency(uint64_t freq) {
setControlItemWithChanID(RFSPACE_CTRL_ITEM_NCO_FREQUENCY, 0, &freq, 5);
}
void RFspaceClientClass::setPort(RFPort port) {
void Client::setPort(RFPort port) {
uint8_t value = port;
setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_PORT, 0, &value, sizeof(value));
}
void RFspaceClientClass::setGain(int8_t gain) {
void Client::setGain(int8_t gain) {
setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_GAIN, 0, &gain, sizeof(gain));
}
void RFspaceClientClass::setSampleRate(uint32_t sampleRate) {
void Client::setSampleRate(uint32_t sampleRate) {
// Acquire the buffer variables
std::lock_guard<std::mutex> lck(bufferMtx);
// Update block size
blockSize = sampleRate / 200;
// Send samplerate to device
setControlItemWithChanID(RFSPACE_CTRL_ITEM_IQ_SAMP_RATE, 0, &sampleRate, sizeof(sampleRate));
}
void RFspaceClientClass::start(SampleFormat sampleFormat, SampleDepth sampleDepth) {
void Client::start(SampleFormat sampleFormat, SampleDepth sampleDepth) {
// Acquire the buffer variables
std::lock_guard<std::mutex> lck(bufferMtx);
// Reset buffer
inBuffer = 0;
// Start device
uint8_t args[4] = { (uint8_t)sampleFormat, (uint8_t)RFSPACE_STATE_RUN, (uint8_t)sampleDepth, 0 };
setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args));
}
void RFspaceClientClass::stop() {
void Client::stop() {
uint8_t args[4] = { 0, RFSPACE_STATE_IDLE, 0, 0 };
setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args));
}
void RFspaceClientClass::close() {
void Client::close() {
// Stop UDP worker
output->stopWriter();
udp->close();
if (udpWorkerThread.joinable()) { udpWorkerThread.join(); }
output->clearWriteStop();
// Stop heartbeat worker
stopHeartBeat = true;
heartBeatCnd.notify_all();
if (heartBeatThread.joinable()) { heartBeatThread.join(); }
client->close();
udpClient->close();
output->clearWriteStop();
// Stop TCP worker
tcp->close();
if (tcpWorkerThread.joinable()) { tcpWorkerThread.join(); }
}
bool RFspaceClientClass::isOpen() {
return client->isOpen();
bool Client::isOpen() {
return tcp->isOpen() || udp->isOpen();
}
void RFspaceClientClass::tcpHandler(int count, uint8_t* buf, void* ctx) {
RFspaceClientClass* _this = (RFspaceClientClass*)ctx;
uint8_t type = _this->tcpHeader >> 13;
uint16_t size = _this->tcpHeader & 0b1111111111111;
void Client::tcpWorker() {
// Allocate receive buffer
uint8_t* buffer = new uint8_t[RFSPACE_MAX_SIZE];
// Read the rest of the data
if (size > 2) {
_this->client->read(size - 2, &_this->rbuffer[2]);
}
// Receive loop
while (true) {
// Receive header
uint16_t header;
if (tcp->recv((uint8_t*)&header, sizeof(uint16_t), true) <= 0) { break; }
// flog::warn("TCP received: {0} {1}", type, size);
// Decode header
uint8_t type = header >> 13;
uint16_t size = header & 0b1111111111111;
// Check for a device ID
uint16_t* controlItem = (uint16_t*)&_this->rbuffer[2];
if (type == RFSPACE_MSG_TYPE_T2H_SET_CTRL_ITEM_RESP && *controlItem == RFSPACE_CTRL_ITEM_PROD_ID) {
{
std::lock_guard<std::mutex> lck(_this->devIdMtx);
_this->deviceId = (DeviceID)*(uint32_t*)&_this->rbuffer[4];
_this->devIdAvailable = true;
// Receive data
if (tcp->recv(buffer, size - 2, true, RFSPACE_TIMEOUT_MS) <= 0) { break; }
// Check for a device ID
uint16_t* controlItem = (uint16_t*)&buffer[0];
if (type == RFSPACE_MSG_TYPE_T2H_SET_CTRL_ITEM_RESP && *controlItem == RFSPACE_CTRL_ITEM_PROD_ID) {
{
std::lock_guard<std::mutex> lck(devIdMtx);
deviceId = (DeviceID)*(uint32_t*)&buffer[2];
devIdAvailable = true;
}
devIdCnd.notify_all();
}
_this->devIdCnd.notify_all();
}
// Restart an async read
_this->client->readAsync(sizeof(_this->tcpHeader), (uint8_t*)&_this->tcpHeader, tcpHandler, _this);
// Free receive buffer
delete[] buffer;
}
void RFspaceClientClass::udpHandler(int count, uint8_t* buf, void* ctx) {
RFspaceClientClass* _this = (RFspaceClientClass*)ctx;
uint16_t hdr = (uint16_t)buf[0] | ((uint16_t)buf[1] << 8);
uint8_t type = hdr >> 13;
uint16_t size = hdr & 0b1111111111111;
void Client::udpWorker() {
// Allocate receive buffer
uint8_t* buffer = new uint8_t[RFSPACE_MAX_SIZE];
uint16_t* header = (uint16_t*)&buffer[0];
if (type == RFSPACE_MSG_TYPE_T2H_DATA_ITEM_0) {
int16_t* samples = (int16_t*)&buf[4];
int sampCount = (size - 4) / (2 * sizeof(int16_t));
volk_16i_s32f_convert_32f((float*)_this->output->writeBuf, samples, 32768.0f, sampCount * 2);
_this->output->swap(sampCount);
// Receive loop
while (true) {
// Receive datagram
int rsize = udp->recv(buffer, RFSPACE_MAX_SIZE);
if (rsize <= 0) { break; }
// Decode header
uint8_t type = (*header) >> 13;
uint16_t size = (*header) & 0b1111111111111;
if (rsize != size) {
flog::error("Datagram size mismatch: {} vs {}", rsize, size);
continue;
}
// Check for a sample packet
if (type == RFSPACE_MSG_TYPE_T2H_DATA_ITEM_0) {
// Acquire the buffer variables
std::lock_guard<std::mutex> lck(bufferMtx);
// Convert samples to complex float
int16_t* samples = (int16_t*)&buffer[4];
int sampCount = (size - 4) / (2 * sizeof(int16_t));
volk_16i_s32f_convert_32f((float*)&output->writeBuf[inBuffer], samples, 32768.0f, sampCount * 2);
inBuffer += sampCount;
// Send out samples if enough are buffered
if (inBuffer >= blockSize) {
if (!output->swap(inBuffer)) { break; };
inBuffer = 0;
}
}
}
// Restart an async read
_this->udpClient->readAsync(RFSPACE_MAX_SIZE, _this->ubuffer, udpHandler, _this);
// Free receive buffer
delete[] buffer;
}
void RFspaceClientClass::heartBeatWorker() {
void Client::heartBeatWorker() {
uint8_t dummy[4];
while (true) {
getControlItem(RFSPACE_CTRL_ITEM_STATE, dummy, sizeof(dummy));
@@ -216,11 +265,9 @@ namespace rfspace {
}
}
RFspaceClient connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) {
net::Conn conn = net::connect(host, port);
if (!conn) { return NULL; }
net::Conn udpConn = net::openUDP("0.0.0.0", port, host, port, true);
if (!udpConn) { return NULL; }
return RFspaceClient(new RFspaceClientClass(std::move(conn), std::move(udpConn), out));
std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) {
auto tcp = net::connect(host, port);
auto udp = net::openudp(host, port, "0.0.0.0", port);
return std::make_shared<Client>(tcp, udp, out);
}
}

View File

@@ -1,9 +1,10 @@
#pragma once
#include <utils/networking.h>
#include <utils/net.h>
#include <dsp/stream.h>
#include <dsp/types.h>
#include <atomic>
#include <queue>
#include <thread>
#include <vector>
#include <mutex>
#define RFSPACE_MAX_SIZE 8192
#define RFSPACE_HEARTBEAT_INTERVAL_MS 1000
@@ -96,10 +97,10 @@ namespace rfspace {
RFSPACE_CTRL_ITEM_ERROR_LOG = 0x0410
};
class RFspaceClientClass {
class Client {
public:
RFspaceClientClass(net::Conn conn, net::Conn udpConn, dsp::stream<dsp::complex_t>* out);
~RFspaceClientClass();
Client(std::shared_ptr<net::Socket> tcp, std::shared_ptr<net::Socket> udp, dsp::stream<dsp::complex_t>* out);
~Client();
void sendDummyUDP();
@@ -107,7 +108,7 @@ namespace rfspace {
void setControlItem(ControlItem item, void* param, int len);
void setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len);
std::vector<uint32_t> getValidSampleRates();
std::vector<uint32_t> getSamplerates();
void setFrequency(uint64_t freq);
void setPort(RFPort port);
@@ -123,21 +124,22 @@ namespace rfspace {
DeviceID deviceId;
private:
static void tcpHandler(int count, uint8_t* buf, void* ctx);
static void udpHandler(int count, uint8_t* buf, void* ctx);
void tcpWorker();
void udpWorker();
void heartBeatWorker();
net::Conn client;
net::Conn udpClient;
std::shared_ptr<net::Socket> tcp;
std::shared_ptr<net::Socket> udp;
dsp::stream<dsp::complex_t>* output;
uint16_t tcpHeader;
uint16_t udpHeader;
uint8_t* rbuffer = NULL;
uint8_t* sbuffer = NULL;
uint8_t* ubuffer = NULL;
std::thread tcpWorkerThread;
std::thread udpWorkerThread;
std::thread heartBeatThread;
std::mutex heartBeatMtx;
@@ -147,10 +149,12 @@ namespace rfspace {
bool devIdAvailable = false;
std::condition_variable devIdCnd;
std::mutex devIdMtx;
std::mutex bufferMtx;
int blockSize = 256;
int inBuffer = 0;
};
typedef std::unique_ptr<RFspaceClientClass> RFspaceClient;
RFspaceClient connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
}

View File

@@ -171,7 +171,7 @@ public:
#ifndef __ANDROID__
int oret = rtlsdr_open(&openDev, id);
#else
int oret = rtlsdr_open_fd(&openDev, devFd);
int oret = rtlsdr_open_sys_dev(&openDev, devFd);
#endif
if (oret < 0) {
@@ -285,7 +285,7 @@ private:
#ifndef __ANDROID__
int oret = rtlsdr_open(&_this->openDev, _this->devId);
#else
int oret = rtlsdr_open_fd(&_this->openDev, _this->devFd);
int oret = rtlsdr_open_sys_dev(&_this->openDev, _this->devFd);
#endif
if (oret < 0) {

View File

@@ -132,8 +132,8 @@ private:
try {
_this->client = rtltcp::connect(&_this->stream, _this->ip, _this->port);
}
catch (std::exception e) {
flog::error("Could connect to RTL-TCP server: {0}", e.what());
catch (const std::exception& e) {
flog::error("Could connect to RTL-TCP server: {}", e.what());
return;
}

View File

@@ -17,7 +17,7 @@ SDRPP_MOD_INFO{
/* Name: */ "sdrpp_server_source",
/* Description: */ "SDR++ Server source module for SDR++",
/* Author: */ "Ryzerth",
/* Version: */ 0, 1, 0,
/* Version: */ 0, 2, 0,
/* Max instances */ 1
};
@@ -109,10 +109,10 @@ private:
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
if (_this->running) { return; }
// Try to connect if not already connected
if (!_this->client) {
// Try to connect if not already connected (Play button is locked anyway so not sure why I put this here)
if (!_this->connected()) {
_this->tryConnect();
if (!_this->client) { return; }
if (!_this->connected()) { return; }
}
// Set configuration
@@ -127,7 +127,7 @@ private:
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
if (!_this->running) { return; }
if (_this->client) { _this->client->stop(); }
if (_this->connected()) { _this->client->stop(); }
_this->running = false;
flog::info("SDRPPServerSourceModule '{0}': Stop!", _this->name);
@@ -135,7 +135,7 @@ private:
static void tune(double freq, void* ctx) {
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
if (_this->running && _this->client) {
if (_this->running && _this->connected()) {
_this->client->setFrequency(freq);
}
_this->freq = freq;
@@ -146,7 +146,7 @@ private:
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
float menuWidth = ImGui::GetContentRegionAvail().x;
bool connected = (_this->client && _this->client->isOpen());
bool connected = _this->connected();
gui::mainWindow.playButtonLocked = !connected;
ImGui::GenericDialog("##sdrpp_srv_src_err_dialog", _this->serverBusy, GENERIC_DIALOG_BUTTONS_OK, [=](){
@@ -227,14 +227,18 @@ private:
}
}
bool connected() {
return client && client->isOpen();
}
void tryConnect() {
try {
if (client) { client.reset(); }
client = server::connect(hostname, port, &stream);
deviceInit();
}
catch (std::exception e) {
flog::error("Could not connect to SDR: {0}", e.what());
catch (const std::exception& e) {
flog::error("Could not connect to SDR: {}", e.what());
if (!strcmp(e.what(), "Server busy")) { serverBusy = true; }
}
}
@@ -281,7 +285,7 @@ private:
int sampleTypeId;
bool compression = false;
server::Client client;
std::shared_ptr<server::Client> client;
};
MOD_EXPORT void _INIT_() {

View File

@@ -7,8 +7,8 @@
using namespace std::chrono_literals;
namespace server {
ClientClass::ClientClass(net::Conn conn, dsp::stream<dsp::complex_t>* out) {
client = std::move(conn);
Client::Client(std::shared_ptr<net::Socket> sock, dsp::stream<dsp::complex_t>* out) {
this->sock = sock;
output = out;
// Allocate buffers
@@ -37,8 +37,8 @@ namespace server {
decomp.start();
link.start();
// Start readers
client->readAsync(sizeof(PacketHeader), rbuffer, tcpHandler, this);
// Start worker thread
workerThread = std::thread(&Client::worker, this);
// Ask for a UI
int res = getUI();
@@ -46,14 +46,14 @@ namespace server {
else if (res == -2) { throw std::runtime_error("Server busy"); }
}
ClientClass::~ClientClass() {
Client::~Client() {
close();
ZSTD_freeDCtx(dctx);
delete[] rbuffer;
delete[] sbuffer;
}
void ClientClass::showMenu() {
void Client::showMenu() {
std::string diffId = "";
SmGui::DrawListElem diffValue;
bool syncRequired = false;
@@ -96,8 +96,8 @@ namespace server {
}
}
void ClientClass::setFrequency(double freq) {
if (!client || !client->isOpen()) { return; }
void Client::setFrequency(double freq) {
if (!isOpen()) { return; }
*(double*)s_cmd_data = freq;
sendCommand(COMMAND_SET_FREQUENCY, sizeof(double));
auto waiter = awaitCommandAck(COMMAND_SET_FREQUENCY);
@@ -105,119 +105,126 @@ namespace server {
waiter->handled();
}
double ClientClass::getSampleRate() {
double Client::getSampleRate() {
return currentSampleRate;
}
void ClientClass::setSampleType(dsp::compression::PCMType type) {
void Client::setSampleType(dsp::compression::PCMType type) {
if (!isOpen()) { return; }
s_cmd_data[0] = type;
sendCommand(COMMAND_SET_SAMPLE_TYPE, 1);
}
void ClientClass::setCompression(bool enabled) {
void Client::setCompression(bool enabled) {
if (!isOpen()) { return; }
s_cmd_data[0] = enabled;
sendCommand(COMMAND_SET_COMPRESSION, 1);
}
void ClientClass::start() {
if (!client || !client->isOpen()) { return; }
void Client::start() {
if (!isOpen()) { return; }
sendCommand(COMMAND_START, 0);
getUI();
}
void ClientClass::stop() {
if (!client || !client->isOpen()) { return; }
void Client::stop() {
if (!isOpen()) { return; }
sendCommand(COMMAND_STOP, 0);
getUI();
}
void ClientClass::close() {
void Client::close() {
// Stop worker
decompIn.stopWriter();
if (sock) { sock->close(); }
if (workerThread.joinable()) { workerThread.join(); }
decompIn.clearWriteStop();
// Stop DSP
decomp.stop();
link.stop();
decompIn.stopWriter();
client->close();
decompIn.clearWriteStop();
}
bool ClientClass::isOpen() {
return client->isOpen();
bool Client::isOpen() {
return sock && sock->isOpen();
}
void ClientClass::tcpHandler(int count, uint8_t* buf, void* ctx) {
ClientClass* _this = (ClientClass*)ctx;
// Read the rest of the data (TODO: CHECK SIZE OR SHIT WILL BE FUCKED)
int len = 0;
int read = 0;
int goal = _this->r_pkt_hdr->size - sizeof(PacketHeader);
while (len < goal) {
read = _this->client->read(goal - len, &buf[sizeof(PacketHeader) + len]);
if (read < 0) {
return;
};
len += read;
}
_this->bytes += _this->r_pkt_hdr->size;
if (_this->r_pkt_hdr->type == PACKET_TYPE_COMMAND) {
// TODO: Move to command handler
if (_this->r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && _this->r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) {
_this->currentSampleRate = *(double*)_this->r_cmd_data;
core::setInputSampleRate(_this->currentSampleRate);
void Client::worker() {
while (true) {
// Receive header
if (sock->recv(rbuffer, sizeof(PacketHeader), true) <= 0) {
break;
}
else if (_this->r_cmd_hdr->cmd == COMMAND_DISCONNECT) {
flog::error("Asked to disconnect by the server");
_this->serverBusy = true;
// Cancel waiters
// Receive remaining data
if (sock->recv(&rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader), true, PROTOCOL_TIMEOUT_MS) <= 0) {
break;
}
// Increment data counter
bytes += r_pkt_hdr->size;
// Decode packet
if (r_pkt_hdr->type == PACKET_TYPE_COMMAND) {
// TODO: Move to command handler
if (r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) {
currentSampleRate = *(double*)r_cmd_data;
core::setInputSampleRate(currentSampleRate);
}
else if (r_cmd_hdr->cmd == COMMAND_DISCONNECT) {
flog::error("Asked to disconnect by the server");
serverBusy = true;
// Cancel waiters
std::vector<PacketWaiter*> toBeRemoved;
for (auto& [waiter, cmd] : commandAckWaiters) {
waiter->cancel();
toBeRemoved.push_back(waiter);
}
// Remove handled waiters
for (auto& waiter : toBeRemoved) {
commandAckWaiters.erase(waiter);
delete waiter;
}
}
}
else if (r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) {
// Notify waiters
std::vector<PacketWaiter*> toBeRemoved;
for (auto& [waiter, cmd] : _this->commandAckWaiters) {
waiter->cancel();
for (auto& [waiter, cmd] : commandAckWaiters) {
if (cmd != r_cmd_hdr->cmd) { continue; }
waiter->notify();
toBeRemoved.push_back(waiter);
}
// Remove handled waiters
for (auto& waiter : toBeRemoved) {
_this->commandAckWaiters.erase(waiter);
commandAckWaiters.erase(waiter);
delete waiter;
}
}
}
else if (_this->r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) {
// Notify waiters
std::vector<PacketWaiter*> toBeRemoved;
for (auto& [waiter, cmd] : _this->commandAckWaiters) {
if (cmd != _this->r_cmd_hdr->cmd) { continue; }
waiter->notify();
toBeRemoved.push_back(waiter);
else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND) {
memcpy(decompIn.writeBuf, &rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader));
if (!decompIn.swap(r_pkt_hdr->size - sizeof(PacketHeader))) { break; }
}
// Remove handled waiters
for (auto& waiter : toBeRemoved) {
_this->commandAckWaiters.erase(waiter);
delete waiter;
else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) {
size_t outCount = ZSTD_decompressDCtx(dctx, decompIn.writeBuf, STREAM_BUFFER_SIZE, r_pkt_data, r_pkt_hdr->size - sizeof(PacketHeader));
if (outCount) {
if (!decompIn.swap(outCount)) { break; }
};
}
else if (r_pkt_hdr->type == PACKET_TYPE_ERROR) {
flog::error("SDR++ Server Error: {0}", rbuffer[sizeof(PacketHeader)]);
}
else {
flog::error("Invalid packet type: {0}", r_pkt_hdr->type);
}
}
else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND) {
memcpy(_this->decompIn.writeBuf, &buf[sizeof(PacketHeader)], _this->r_pkt_hdr->size - sizeof(PacketHeader));
_this->decompIn.swap(_this->r_pkt_hdr->size - sizeof(PacketHeader));
}
else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) {
size_t outCount = ZSTD_decompressDCtx(_this->dctx, _this->decompIn.writeBuf, STREAM_BUFFER_SIZE, _this->r_pkt_data, _this->r_pkt_hdr->size - sizeof(PacketHeader));
if (outCount) { _this->decompIn.swap(outCount); };
}
else if (_this->r_pkt_hdr->type == PACKET_TYPE_ERROR) {
flog::error("SDR++ Server Error: {0}", buf[sizeof(PacketHeader)]);
}
else {
flog::error("Invalid packet type: {0}", _this->r_pkt_hdr->type);
}
// Restart an async read
_this->client->readAsync(sizeof(PacketHeader), _this->rbuffer, tcpHandler, _this);
}
int ClientClass::getUI() {
int Client::getUI() {
if (!isOpen()) { return -1; }
auto waiter = awaitCommandAck(COMMAND_GET_UI);
sendCommand(COMMAND_GET_UI, 0);
if (waiter->await(PROTOCOL_TIMEOUT_MS)) {
@@ -233,37 +240,35 @@ namespace server {
return 0;
}
void ClientClass::sendPacket(PacketType type, int len) {
void Client::sendPacket(PacketType type, int len) {
s_pkt_hdr->type = type;
s_pkt_hdr->size = sizeof(PacketHeader) + len;
client->write(s_pkt_hdr->size, sbuffer);
sock->send(sbuffer, s_pkt_hdr->size);
}
void ClientClass::sendCommand(Command cmd, int len) {
void Client::sendCommand(Command cmd, int len) {
s_cmd_hdr->cmd = cmd;
sendPacket(PACKET_TYPE_COMMAND, sizeof(CommandHeader) + len);
}
void ClientClass::sendCommandAck(Command cmd, int len) {
void Client::sendCommandAck(Command cmd, int len) {
s_cmd_hdr->cmd = cmd;
sendPacket(PACKET_TYPE_COMMAND_ACK, sizeof(CommandHeader) + len);
}
PacketWaiter* ClientClass::awaitCommandAck(Command cmd) {
PacketWaiter* Client::awaitCommandAck(Command cmd) {
PacketWaiter* waiter = new PacketWaiter;
commandAckWaiters[waiter] = cmd;
return waiter;
}
void ClientClass::dHandler(dsp::complex_t *data, int count, void *ctx) {
ClientClass* _this = (ClientClass*)ctx;
void Client::dHandler(dsp::complex_t *data, int count, void *ctx) {
Client* _this = (Client*)ctx;
memcpy(_this->output->writeBuf, data, count * sizeof(dsp::complex_t));
_this->output->swap(count);
}
Client connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) {
net::Conn conn = net::connect(host, port);
if (!conn) { return NULL; }
return Client(new ClientClass(std::move(conn), out));
std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) {
return std::make_shared<Client>(net::connect(host, port), out);
}
}

View File

@@ -1,5 +1,5 @@
#pragma once
#include <utils/networking.h>
#include <utils/net.h>
#include <dsp/stream.h>
#include <dsp/types.h>
#include <atomic>
@@ -13,10 +13,6 @@
#include <dsp/routing/stream_link.h>
#include <zstd.h>
#define RFSPACE_MAX_SIZE 8192
#define RFSPACE_HEARTBEAT_INTERVAL_MS 1000
#define RFSPACE_TIMEOUT_MS 3000
#define PROTOCOL_TIMEOUT_MS 10000
namespace server {
@@ -75,10 +71,10 @@ namespace server {
std::mutex handledMtx;
};
class ClientClass {
class Client {
public:
ClientClass(net::Conn conn, dsp::stream<dsp::complex_t>* out);
~ClientClass();
Client(std::shared_ptr<net::Socket> sock, dsp::stream<dsp::complex_t>* out);
~Client();
void showMenu();
@@ -98,7 +94,7 @@ namespace server {
bool serverBusy = false;
private:
static void tcpHandler(int count, uint8_t* buf, void* ctx);
void worker();
int getUI();
@@ -112,7 +108,7 @@ namespace server {
static void dHandler(dsp::complex_t *data, int count, void *ctx);
net::Conn client;
std::shared_ptr<net::Socket> sock;
dsp::stream<uint8_t> decompIn;
dsp::compression::SampleStreamDecompressor decomp;
@@ -137,10 +133,10 @@ namespace server {
ZSTD_DCtx* dctx;
std::thread workerThread;
double currentSampleRate = 1000000.0;
};
typedef std::unique_ptr<ClientClass> Client;
Client connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
}

View File

@@ -1,5 +1,6 @@
#include "spectran_http_client.h"
#include <utils/flog.h>
#include <inttypes.h>
SpectranHTTPClient::SpectranHTTPClient(std::string host, int port, dsp::stream<dsp::complex_t>* stream) {
this->stream = stream;
@@ -50,10 +51,10 @@ void SpectranHTTPClient::setCenterFrequency(uint64_t freq) {
// Make request
net::http::RequestHeader rqhdr(net::http::METHOD_PUT, "/control", host);
char buf[1024];
sprintf(buf, "{\"frequencyCenter\":%d,\"frequencySpan\":%d,\"type\":\"capture\"}", freq, _samplerate);
sprintf(buf, "{\"frequencyCenter\":%" PRIu64 ",\"frequencySpan\":%" PRIu64 ",\"type\":\"capture\"}", freq, _samplerate);
std::string data = buf;
char lenBuf[16];
sprintf(lenBuf, "%d", data.size());
sprintf(lenBuf, "%" PRIu64, (uint64_t)data.size());
rqhdr.setField("Content-Length", lenBuf);
controlHttp.sendRequestHeader(rqhdr);
controlSock->sendstr(data);

View File

@@ -283,8 +283,8 @@ private:
flog::info("Connected to server");
}
}
catch (std::exception e) {
flog::error("Could not connect to spyserver {0}", e.what());
catch (const std::exception& e) {
flog::error("Could not connect to spyserver {}", e.what());
}
}