revamp sdr++ server source networking code

This commit is contained in:
AlexandreRouma 2024-01-28 21:46:54 +01:00
parent 11a7c382e8
commit be8edbfa9e
3 changed files with 117 additions and 112 deletions

View File

@ -17,7 +17,7 @@ SDRPP_MOD_INFO{
/* Name: */ "sdrpp_server_source", /* Name: */ "sdrpp_server_source",
/* Description: */ "SDR++ Server source module for SDR++", /* Description: */ "SDR++ Server source module for SDR++",
/* Author: */ "Ryzerth", /* Author: */ "Ryzerth",
/* Version: */ 0, 1, 0, /* Version: */ 0, 2, 0,
/* Max instances */ 1 /* Max instances */ 1
}; };
@ -109,10 +109,10 @@ private:
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
if (_this->running) { return; } if (_this->running) { return; }
// Try to connect if not already connected // Try to connect if not already connected (Play button is locked anyway so not sure why I put this here)
if (!_this->client) { if (!_this->connected()) {
_this->tryConnect(); _this->tryConnect();
if (!_this->client) { return; } if (!_this->connected()) { return; }
} }
// Set configuration // Set configuration
@ -127,7 +127,7 @@ private:
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
if (!_this->running) { return; } if (!_this->running) { return; }
if (_this->client) { _this->client->stop(); } if (_this->connected()) { _this->client->stop(); }
_this->running = false; _this->running = false;
flog::info("SDRPPServerSourceModule '{0}': Stop!", _this->name); flog::info("SDRPPServerSourceModule '{0}': Stop!", _this->name);
@ -135,7 +135,7 @@ private:
static void tune(double freq, void* ctx) { static void tune(double freq, void* ctx) {
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
if (_this->running && _this->client) { if (_this->running && _this->connected()) {
_this->client->setFrequency(freq); _this->client->setFrequency(freq);
} }
_this->freq = freq; _this->freq = freq;
@ -146,7 +146,7 @@ private:
SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx;
float menuWidth = ImGui::GetContentRegionAvail().x; float menuWidth = ImGui::GetContentRegionAvail().x;
bool connected = (_this->client && _this->client->isOpen()); bool connected = _this->connected();
gui::mainWindow.playButtonLocked = !connected; gui::mainWindow.playButtonLocked = !connected;
ImGui::GenericDialog("##sdrpp_srv_src_err_dialog", _this->serverBusy, GENERIC_DIALOG_BUTTONS_OK, [=](){ ImGui::GenericDialog("##sdrpp_srv_src_err_dialog", _this->serverBusy, GENERIC_DIALOG_BUTTONS_OK, [=](){
@ -227,6 +227,10 @@ private:
} }
} }
bool connected() {
return client && client->isOpen();
}
void tryConnect() { void tryConnect() {
try { try {
if (client) { client.reset(); } if (client) { client.reset(); }
@ -281,7 +285,7 @@ private:
int sampleTypeId; int sampleTypeId;
bool compression = false; bool compression = false;
server::Client client; std::shared_ptr<server::Client> client;
}; };
MOD_EXPORT void _INIT_() { MOD_EXPORT void _INIT_() {

View File

@ -7,8 +7,8 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
namespace server { namespace server {
ClientClass::ClientClass(net::Conn conn, dsp::stream<dsp::complex_t>* out) { Client::Client(std::shared_ptr<net::Socket> sock, dsp::stream<dsp::complex_t>* out) {
client = std::move(conn); this->sock = sock;
output = out; output = out;
// Allocate buffers // Allocate buffers
@ -37,8 +37,8 @@ namespace server {
decomp.start(); decomp.start();
link.start(); link.start();
// Start readers // Start worker thread
client->readAsync(sizeof(PacketHeader), rbuffer, tcpHandler, this); workerThread = std::thread(&Client::worker, this);
// Ask for a UI // Ask for a UI
int res = getUI(); int res = getUI();
@ -46,14 +46,14 @@ namespace server {
else if (res == -2) { throw std::runtime_error("Server busy"); } else if (res == -2) { throw std::runtime_error("Server busy"); }
} }
ClientClass::~ClientClass() { Client::~Client() {
close(); close();
ZSTD_freeDCtx(dctx); ZSTD_freeDCtx(dctx);
delete[] rbuffer; delete[] rbuffer;
delete[] sbuffer; delete[] sbuffer;
} }
void ClientClass::showMenu() { void Client::showMenu() {
std::string diffId = ""; std::string diffId = "";
SmGui::DrawListElem diffValue; SmGui::DrawListElem diffValue;
bool syncRequired = false; bool syncRequired = false;
@ -96,8 +96,8 @@ namespace server {
} }
} }
void ClientClass::setFrequency(double freq) { void Client::setFrequency(double freq) {
if (!client || !client->isOpen()) { return; } if (!isOpen()) { return; }
*(double*)s_cmd_data = freq; *(double*)s_cmd_data = freq;
sendCommand(COMMAND_SET_FREQUENCY, sizeof(double)); sendCommand(COMMAND_SET_FREQUENCY, sizeof(double));
auto waiter = awaitCommandAck(COMMAND_SET_FREQUENCY); auto waiter = awaitCommandAck(COMMAND_SET_FREQUENCY);
@ -105,119 +105,126 @@ namespace server {
waiter->handled(); waiter->handled();
} }
double ClientClass::getSampleRate() { double Client::getSampleRate() {
return currentSampleRate; return currentSampleRate;
} }
void ClientClass::setSampleType(dsp::compression::PCMType type) { void Client::setSampleType(dsp::compression::PCMType type) {
if (!isOpen()) { return; }
s_cmd_data[0] = type; s_cmd_data[0] = type;
sendCommand(COMMAND_SET_SAMPLE_TYPE, 1); sendCommand(COMMAND_SET_SAMPLE_TYPE, 1);
} }
void ClientClass::setCompression(bool enabled) { void Client::setCompression(bool enabled) {
if (!isOpen()) { return; }
s_cmd_data[0] = enabled; s_cmd_data[0] = enabled;
sendCommand(COMMAND_SET_COMPRESSION, 1); sendCommand(COMMAND_SET_COMPRESSION, 1);
} }
void ClientClass::start() { void Client::start() {
if (!client || !client->isOpen()) { return; } if (!isOpen()) { return; }
sendCommand(COMMAND_START, 0); sendCommand(COMMAND_START, 0);
getUI(); getUI();
} }
void ClientClass::stop() { void Client::stop() {
if (!client || !client->isOpen()) { return; } if (!isOpen()) { return; }
sendCommand(COMMAND_STOP, 0); sendCommand(COMMAND_STOP, 0);
getUI(); getUI();
} }
void ClientClass::close() { void Client::close() {
// Stop worker
decompIn.stopWriter();
if (sock) { sock->close(); }
if (workerThread.joinable()) { workerThread.join(); }
decompIn.clearWriteStop();
// Stop DSP
decomp.stop(); decomp.stop();
link.stop(); link.stop();
decompIn.stopWriter();
client->close();
decompIn.clearWriteStop();
} }
bool ClientClass::isOpen() { bool Client::isOpen() {
return client->isOpen(); return sock && sock->isOpen();
} }
void ClientClass::tcpHandler(int count, uint8_t* buf, void* ctx) { void Client::worker() {
ClientClass* _this = (ClientClass*)ctx; while (true) {
// Receive header
// Read the rest of the data (TODO: CHECK SIZE OR SHIT WILL BE FUCKED) if (sock->recv(rbuffer, sizeof(PacketHeader), true) <= 0) {
int len = 0; break;
int read = 0;
int goal = _this->r_pkt_hdr->size - sizeof(PacketHeader);
while (len < goal) {
read = _this->client->read(goal - len, &buf[sizeof(PacketHeader) + len]);
if (read < 0) {
return;
};
len += read;
}
_this->bytes += _this->r_pkt_hdr->size;
if (_this->r_pkt_hdr->type == PACKET_TYPE_COMMAND) {
// TODO: Move to command handler
if (_this->r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && _this->r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) {
_this->currentSampleRate = *(double*)_this->r_cmd_data;
core::setInputSampleRate(_this->currentSampleRate);
} }
else if (_this->r_cmd_hdr->cmd == COMMAND_DISCONNECT) {
flog::error("Asked to disconnect by the server");
_this->serverBusy = true;
// Cancel waiters // Receive remaining data
if (sock->recv(&rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader), true, PROTOCOL_TIMEOUT_MS) <= 0) {
break;
}
// Increment data counter
bytes += r_pkt_hdr->size;
// Decode packet
if (r_pkt_hdr->type == PACKET_TYPE_COMMAND) {
// TODO: Move to command handler
if (r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) {
currentSampleRate = *(double*)r_cmd_data;
core::setInputSampleRate(currentSampleRate);
}
else if (r_cmd_hdr->cmd == COMMAND_DISCONNECT) {
flog::error("Asked to disconnect by the server");
serverBusy = true;
// Cancel waiters
std::vector<PacketWaiter*> toBeRemoved;
for (auto& [waiter, cmd] : commandAckWaiters) {
waiter->cancel();
toBeRemoved.push_back(waiter);
}
// Remove handled waiters
for (auto& waiter : toBeRemoved) {
commandAckWaiters.erase(waiter);
delete waiter;
}
}
}
else if (r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) {
// Notify waiters
std::vector<PacketWaiter*> toBeRemoved; std::vector<PacketWaiter*> toBeRemoved;
for (auto& [waiter, cmd] : _this->commandAckWaiters) { for (auto& [waiter, cmd] : commandAckWaiters) {
waiter->cancel(); if (cmd != r_cmd_hdr->cmd) { continue; }
waiter->notify();
toBeRemoved.push_back(waiter); toBeRemoved.push_back(waiter);
} }
// Remove handled waiters // Remove handled waiters
for (auto& waiter : toBeRemoved) { for (auto& waiter : toBeRemoved) {
_this->commandAckWaiters.erase(waiter); commandAckWaiters.erase(waiter);
delete waiter; delete waiter;
} }
} }
} else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND) {
else if (_this->r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) { memcpy(decompIn.writeBuf, &rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader));
// Notify waiters if (!decompIn.swap(r_pkt_hdr->size - sizeof(PacketHeader))) { break; }
std::vector<PacketWaiter*> toBeRemoved;
for (auto& [waiter, cmd] : _this->commandAckWaiters) {
if (cmd != _this->r_cmd_hdr->cmd) { continue; }
waiter->notify();
toBeRemoved.push_back(waiter);
} }
else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) {
// Remove handled waiters size_t outCount = ZSTD_decompressDCtx(dctx, decompIn.writeBuf, STREAM_BUFFER_SIZE, r_pkt_data, r_pkt_hdr->size - sizeof(PacketHeader));
for (auto& waiter : toBeRemoved) { if (outCount) {
_this->commandAckWaiters.erase(waiter); if (!decompIn.swap(outCount)) { break; }
delete waiter; };
}
else if (r_pkt_hdr->type == PACKET_TYPE_ERROR) {
flog::error("SDR++ Server Error: {0}", rbuffer[sizeof(PacketHeader)]);
}
else {
flog::error("Invalid packet type: {0}", r_pkt_hdr->type);
} }
} }
else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND) {
memcpy(_this->decompIn.writeBuf, &buf[sizeof(PacketHeader)], _this->r_pkt_hdr->size - sizeof(PacketHeader));
_this->decompIn.swap(_this->r_pkt_hdr->size - sizeof(PacketHeader));
}
else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) {
size_t outCount = ZSTD_decompressDCtx(_this->dctx, _this->decompIn.writeBuf, STREAM_BUFFER_SIZE, _this->r_pkt_data, _this->r_pkt_hdr->size - sizeof(PacketHeader));
if (outCount) { _this->decompIn.swap(outCount); };
}
else if (_this->r_pkt_hdr->type == PACKET_TYPE_ERROR) {
flog::error("SDR++ Server Error: {0}", buf[sizeof(PacketHeader)]);
}
else {
flog::error("Invalid packet type: {0}", _this->r_pkt_hdr->type);
}
// Restart an async read
_this->client->readAsync(sizeof(PacketHeader), _this->rbuffer, tcpHandler, _this);
} }
int ClientClass::getUI() { int Client::getUI() {
if (!isOpen()) { return -1; }
auto waiter = awaitCommandAck(COMMAND_GET_UI); auto waiter = awaitCommandAck(COMMAND_GET_UI);
sendCommand(COMMAND_GET_UI, 0); sendCommand(COMMAND_GET_UI, 0);
if (waiter->await(PROTOCOL_TIMEOUT_MS)) { if (waiter->await(PROTOCOL_TIMEOUT_MS)) {
@ -233,37 +240,35 @@ namespace server {
return 0; return 0;
} }
void ClientClass::sendPacket(PacketType type, int len) { void Client::sendPacket(PacketType type, int len) {
s_pkt_hdr->type = type; s_pkt_hdr->type = type;
s_pkt_hdr->size = sizeof(PacketHeader) + len; s_pkt_hdr->size = sizeof(PacketHeader) + len;
client->write(s_pkt_hdr->size, sbuffer); sock->send(sbuffer, s_pkt_hdr->size);
} }
void ClientClass::sendCommand(Command cmd, int len) { void Client::sendCommand(Command cmd, int len) {
s_cmd_hdr->cmd = cmd; s_cmd_hdr->cmd = cmd;
sendPacket(PACKET_TYPE_COMMAND, sizeof(CommandHeader) + len); sendPacket(PACKET_TYPE_COMMAND, sizeof(CommandHeader) + len);
} }
void ClientClass::sendCommandAck(Command cmd, int len) { void Client::sendCommandAck(Command cmd, int len) {
s_cmd_hdr->cmd = cmd; s_cmd_hdr->cmd = cmd;
sendPacket(PACKET_TYPE_COMMAND_ACK, sizeof(CommandHeader) + len); sendPacket(PACKET_TYPE_COMMAND_ACK, sizeof(CommandHeader) + len);
} }
PacketWaiter* ClientClass::awaitCommandAck(Command cmd) { PacketWaiter* Client::awaitCommandAck(Command cmd) {
PacketWaiter* waiter = new PacketWaiter; PacketWaiter* waiter = new PacketWaiter;
commandAckWaiters[waiter] = cmd; commandAckWaiters[waiter] = cmd;
return waiter; return waiter;
} }
void ClientClass::dHandler(dsp::complex_t *data, int count, void *ctx) { void Client::dHandler(dsp::complex_t *data, int count, void *ctx) {
ClientClass* _this = (ClientClass*)ctx; Client* _this = (Client*)ctx;
memcpy(_this->output->writeBuf, data, count * sizeof(dsp::complex_t)); memcpy(_this->output->writeBuf, data, count * sizeof(dsp::complex_t));
_this->output->swap(count); _this->output->swap(count);
} }
Client connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) { std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) {
net::Conn conn = net::connect(host, port); return std::make_shared<Client>(net::connect(host, port), out);
if (!conn) { return NULL; }
return Client(new ClientClass(std::move(conn), out));
} }
} }

View File

@ -1,5 +1,5 @@
#pragma once #pragma once
#include <utils/networking.h> #include <utils/net.h>
#include <dsp/stream.h> #include <dsp/stream.h>
#include <dsp/types.h> #include <dsp/types.h>
#include <atomic> #include <atomic>
@ -13,10 +13,6 @@
#include <dsp/routing/stream_link.h> #include <dsp/routing/stream_link.h>
#include <zstd.h> #include <zstd.h>
#define RFSPACE_MAX_SIZE 8192
#define RFSPACE_HEARTBEAT_INTERVAL_MS 1000
#define RFSPACE_TIMEOUT_MS 3000
#define PROTOCOL_TIMEOUT_MS 10000 #define PROTOCOL_TIMEOUT_MS 10000
namespace server { namespace server {
@ -75,10 +71,10 @@ namespace server {
std::mutex handledMtx; std::mutex handledMtx;
}; };
class ClientClass { class Client {
public: public:
ClientClass(net::Conn conn, dsp::stream<dsp::complex_t>* out); Client(std::shared_ptr<net::Socket> sock, dsp::stream<dsp::complex_t>* out);
~ClientClass(); ~Client();
void showMenu(); void showMenu();
@ -98,7 +94,7 @@ namespace server {
bool serverBusy = false; bool serverBusy = false;
private: private:
static void tcpHandler(int count, uint8_t* buf, void* ctx); void worker();
int getUI(); int getUI();
@ -112,7 +108,7 @@ namespace server {
static void dHandler(dsp::complex_t *data, int count, void *ctx); static void dHandler(dsp::complex_t *data, int count, void *ctx);
net::Conn client; std::shared_ptr<net::Socket> sock;
dsp::stream<uint8_t> decompIn; dsp::stream<uint8_t> decompIn;
dsp::compression::SampleStreamDecompressor decomp; dsp::compression::SampleStreamDecompressor decomp;
@ -137,10 +133,10 @@ namespace server {
ZSTD_DCtx* dctx; ZSTD_DCtx* dctx;
std::thread workerThread;
double currentSampleRate = 1000000.0; double currentSampleRate = 1000000.0;
}; };
typedef std::unique_ptr<ClientClass> Client; std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
Client connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
} }