From be8edbfa9e9568f260a694250d8b1e4274a8c624 Mon Sep 17 00:00:00 2001 From: AlexandreRouma Date: Sun, 28 Jan 2024 21:46:54 +0100 Subject: [PATCH] revamp sdr++ server source networking code --- .../sdrpp_server_source/src/main.cpp | 20 +- .../src/sdrpp_server_client.cpp | 187 +++++++++--------- .../src/sdrpp_server_client.h | 22 +-- 3 files changed, 117 insertions(+), 112 deletions(-) diff --git a/source_modules/sdrpp_server_source/src/main.cpp b/source_modules/sdrpp_server_source/src/main.cpp index 38c187de..08c7e063 100644 --- a/source_modules/sdrpp_server_source/src/main.cpp +++ b/source_modules/sdrpp_server_source/src/main.cpp @@ -17,7 +17,7 @@ SDRPP_MOD_INFO{ /* Name: */ "sdrpp_server_source", /* Description: */ "SDR++ Server source module for SDR++", /* Author: */ "Ryzerth", - /* Version: */ 0, 1, 0, + /* Version: */ 0, 2, 0, /* Max instances */ 1 }; @@ -109,10 +109,10 @@ private: SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; if (_this->running) { return; } - // Try to connect if not already connected - if (!_this->client) { + // Try to connect if not already connected (Play button is locked anyway so not sure why I put this here) + if (!_this->connected()) { _this->tryConnect(); - if (!_this->client) { return; } + if (!_this->connected()) { return; } } // Set configuration @@ -127,7 +127,7 @@ private: SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; if (!_this->running) { return; } - if (_this->client) { _this->client->stop(); } + if (_this->connected()) { _this->client->stop(); } _this->running = false; flog::info("SDRPPServerSourceModule '{0}': Stop!", _this->name); @@ -135,7 +135,7 @@ private: static void tune(double freq, void* ctx) { SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; - if (_this->running && _this->client) { + if (_this->running && _this->connected()) { _this->client->setFrequency(freq); } _this->freq = freq; @@ -146,7 +146,7 @@ private: SDRPPServerSourceModule* _this = (SDRPPServerSourceModule*)ctx; float menuWidth = ImGui::GetContentRegionAvail().x; - bool connected = (_this->client && _this->client->isOpen()); + bool connected = _this->connected(); gui::mainWindow.playButtonLocked = !connected; ImGui::GenericDialog("##sdrpp_srv_src_err_dialog", _this->serverBusy, GENERIC_DIALOG_BUTTONS_OK, [=](){ @@ -227,6 +227,10 @@ private: } } + bool connected() { + return client && client->isOpen(); + } + void tryConnect() { try { if (client) { client.reset(); } @@ -281,7 +285,7 @@ private: int sampleTypeId; bool compression = false; - server::Client client; + std::shared_ptr client; }; MOD_EXPORT void _INIT_() { diff --git a/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp b/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp index 738e504b..0e25efe7 100644 --- a/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp +++ b/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp @@ -7,8 +7,8 @@ using namespace std::chrono_literals; namespace server { - ClientClass::ClientClass(net::Conn conn, dsp::stream* out) { - client = std::move(conn); + Client::Client(std::shared_ptr sock, dsp::stream* out) { + this->sock = sock; output = out; // Allocate buffers @@ -37,8 +37,8 @@ namespace server { decomp.start(); link.start(); - // Start readers - client->readAsync(sizeof(PacketHeader), rbuffer, tcpHandler, this); + // Start worker thread + workerThread = std::thread(&Client::worker, this); // Ask for a UI int res = getUI(); @@ -46,14 +46,14 @@ namespace server { else if (res == -2) { throw std::runtime_error("Server busy"); } } - ClientClass::~ClientClass() { + Client::~Client() { close(); ZSTD_freeDCtx(dctx); delete[] rbuffer; delete[] sbuffer; } - void ClientClass::showMenu() { + void Client::showMenu() { std::string diffId = ""; SmGui::DrawListElem diffValue; bool syncRequired = false; @@ -96,8 +96,8 @@ namespace server { } } - void ClientClass::setFrequency(double freq) { - if (!client || !client->isOpen()) { return; } + void Client::setFrequency(double freq) { + if (!isOpen()) { return; } *(double*)s_cmd_data = freq; sendCommand(COMMAND_SET_FREQUENCY, sizeof(double)); auto waiter = awaitCommandAck(COMMAND_SET_FREQUENCY); @@ -105,119 +105,126 @@ namespace server { waiter->handled(); } - double ClientClass::getSampleRate() { + double Client::getSampleRate() { return currentSampleRate; } - void ClientClass::setSampleType(dsp::compression::PCMType type) { + void Client::setSampleType(dsp::compression::PCMType type) { + if (!isOpen()) { return; } s_cmd_data[0] = type; sendCommand(COMMAND_SET_SAMPLE_TYPE, 1); } - void ClientClass::setCompression(bool enabled) { + void Client::setCompression(bool enabled) { + if (!isOpen()) { return; } s_cmd_data[0] = enabled; sendCommand(COMMAND_SET_COMPRESSION, 1); } - void ClientClass::start() { - if (!client || !client->isOpen()) { return; } + void Client::start() { + if (!isOpen()) { return; } sendCommand(COMMAND_START, 0); getUI(); } - void ClientClass::stop() { - if (!client || !client->isOpen()) { return; } + void Client::stop() { + if (!isOpen()) { return; } sendCommand(COMMAND_STOP, 0); getUI(); } - void ClientClass::close() { + void Client::close() { + // Stop worker + decompIn.stopWriter(); + if (sock) { sock->close(); } + if (workerThread.joinable()) { workerThread.join(); } + decompIn.clearWriteStop(); + + // Stop DSP decomp.stop(); link.stop(); - decompIn.stopWriter(); - client->close(); - decompIn.clearWriteStop(); } - bool ClientClass::isOpen() { - return client->isOpen(); + bool Client::isOpen() { + return sock && sock->isOpen(); } - void ClientClass::tcpHandler(int count, uint8_t* buf, void* ctx) { - ClientClass* _this = (ClientClass*)ctx; - - // Read the rest of the data (TODO: CHECK SIZE OR SHIT WILL BE FUCKED) - int len = 0; - int read = 0; - int goal = _this->r_pkt_hdr->size - sizeof(PacketHeader); - while (len < goal) { - read = _this->client->read(goal - len, &buf[sizeof(PacketHeader) + len]); - if (read < 0) { - return; - }; - len += read; - } - _this->bytes += _this->r_pkt_hdr->size; - - if (_this->r_pkt_hdr->type == PACKET_TYPE_COMMAND) { - // TODO: Move to command handler - if (_this->r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && _this->r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) { - _this->currentSampleRate = *(double*)_this->r_cmd_data; - core::setInputSampleRate(_this->currentSampleRate); + void Client::worker() { + while (true) { + // Receive header + if (sock->recv(rbuffer, sizeof(PacketHeader), true) <= 0) { + break; } - else if (_this->r_cmd_hdr->cmd == COMMAND_DISCONNECT) { - flog::error("Asked to disconnect by the server"); - _this->serverBusy = true; - // Cancel waiters + // Receive remaining data + if (sock->recv(&rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader), true, PROTOCOL_TIMEOUT_MS) <= 0) { + break; + } + + // Increment data counter + bytes += r_pkt_hdr->size; + + // Decode packet + if (r_pkt_hdr->type == PACKET_TYPE_COMMAND) { + // TODO: Move to command handler + if (r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) { + currentSampleRate = *(double*)r_cmd_data; + core::setInputSampleRate(currentSampleRate); + } + else if (r_cmd_hdr->cmd == COMMAND_DISCONNECT) { + flog::error("Asked to disconnect by the server"); + serverBusy = true; + + // Cancel waiters + std::vector toBeRemoved; + for (auto& [waiter, cmd] : commandAckWaiters) { + waiter->cancel(); + toBeRemoved.push_back(waiter); + } + + // Remove handled waiters + for (auto& waiter : toBeRemoved) { + commandAckWaiters.erase(waiter); + delete waiter; + } + } + } + else if (r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) { + // Notify waiters std::vector toBeRemoved; - for (auto& [waiter, cmd] : _this->commandAckWaiters) { - waiter->cancel(); + for (auto& [waiter, cmd] : commandAckWaiters) { + if (cmd != r_cmd_hdr->cmd) { continue; } + waiter->notify(); toBeRemoved.push_back(waiter); } // Remove handled waiters for (auto& waiter : toBeRemoved) { - _this->commandAckWaiters.erase(waiter); + commandAckWaiters.erase(waiter); delete waiter; } } - } - else if (_this->r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) { - // Notify waiters - std::vector toBeRemoved; - for (auto& [waiter, cmd] : _this->commandAckWaiters) { - if (cmd != _this->r_cmd_hdr->cmd) { continue; } - waiter->notify(); - toBeRemoved.push_back(waiter); + else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND) { + memcpy(decompIn.writeBuf, &rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader)); + if (!decompIn.swap(r_pkt_hdr->size - sizeof(PacketHeader))) { break; } } - - // Remove handled waiters - for (auto& waiter : toBeRemoved) { - _this->commandAckWaiters.erase(waiter); - delete waiter; + else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) { + size_t outCount = ZSTD_decompressDCtx(dctx, decompIn.writeBuf, STREAM_BUFFER_SIZE, r_pkt_data, r_pkt_hdr->size - sizeof(PacketHeader)); + if (outCount) { + if (!decompIn.swap(outCount)) { break; } + }; + } + else if (r_pkt_hdr->type == PACKET_TYPE_ERROR) { + flog::error("SDR++ Server Error: {0}", rbuffer[sizeof(PacketHeader)]); + } + else { + flog::error("Invalid packet type: {0}", r_pkt_hdr->type); } } - else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND) { - memcpy(_this->decompIn.writeBuf, &buf[sizeof(PacketHeader)], _this->r_pkt_hdr->size - sizeof(PacketHeader)); - _this->decompIn.swap(_this->r_pkt_hdr->size - sizeof(PacketHeader)); - } - else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) { - size_t outCount = ZSTD_decompressDCtx(_this->dctx, _this->decompIn.writeBuf, STREAM_BUFFER_SIZE, _this->r_pkt_data, _this->r_pkt_hdr->size - sizeof(PacketHeader)); - if (outCount) { _this->decompIn.swap(outCount); }; - } - else if (_this->r_pkt_hdr->type == PACKET_TYPE_ERROR) { - flog::error("SDR++ Server Error: {0}", buf[sizeof(PacketHeader)]); - } - else { - flog::error("Invalid packet type: {0}", _this->r_pkt_hdr->type); - } - - // Restart an async read - _this->client->readAsync(sizeof(PacketHeader), _this->rbuffer, tcpHandler, _this); } - int ClientClass::getUI() { + int Client::getUI() { + if (!isOpen()) { return -1; } auto waiter = awaitCommandAck(COMMAND_GET_UI); sendCommand(COMMAND_GET_UI, 0); if (waiter->await(PROTOCOL_TIMEOUT_MS)) { @@ -233,37 +240,35 @@ namespace server { return 0; } - void ClientClass::sendPacket(PacketType type, int len) { + void Client::sendPacket(PacketType type, int len) { s_pkt_hdr->type = type; s_pkt_hdr->size = sizeof(PacketHeader) + len; - client->write(s_pkt_hdr->size, sbuffer); + sock->send(sbuffer, s_pkt_hdr->size); } - void ClientClass::sendCommand(Command cmd, int len) { + void Client::sendCommand(Command cmd, int len) { s_cmd_hdr->cmd = cmd; sendPacket(PACKET_TYPE_COMMAND, sizeof(CommandHeader) + len); } - void ClientClass::sendCommandAck(Command cmd, int len) { + void Client::sendCommandAck(Command cmd, int len) { s_cmd_hdr->cmd = cmd; sendPacket(PACKET_TYPE_COMMAND_ACK, sizeof(CommandHeader) + len); } - PacketWaiter* ClientClass::awaitCommandAck(Command cmd) { + PacketWaiter* Client::awaitCommandAck(Command cmd) { PacketWaiter* waiter = new PacketWaiter; commandAckWaiters[waiter] = cmd; return waiter; } - void ClientClass::dHandler(dsp::complex_t *data, int count, void *ctx) { - ClientClass* _this = (ClientClass*)ctx; + void Client::dHandler(dsp::complex_t *data, int count, void *ctx) { + Client* _this = (Client*)ctx; memcpy(_this->output->writeBuf, data, count * sizeof(dsp::complex_t)); _this->output->swap(count); } - Client connect(std::string host, uint16_t port, dsp::stream* out) { - net::Conn conn = net::connect(host, port); - if (!conn) { return NULL; } - return Client(new ClientClass(std::move(conn), out)); + std::shared_ptr connect(std::string host, uint16_t port, dsp::stream* out) { + return std::make_shared(net::connect(host, port), out); } } diff --git a/source_modules/sdrpp_server_source/src/sdrpp_server_client.h b/source_modules/sdrpp_server_source/src/sdrpp_server_client.h index d3b75fb2..d06c8499 100644 --- a/source_modules/sdrpp_server_source/src/sdrpp_server_client.h +++ b/source_modules/sdrpp_server_source/src/sdrpp_server_client.h @@ -1,5 +1,5 @@ #pragma once -#include +#include #include #include #include @@ -13,10 +13,6 @@ #include #include -#define RFSPACE_MAX_SIZE 8192 -#define RFSPACE_HEARTBEAT_INTERVAL_MS 1000 -#define RFSPACE_TIMEOUT_MS 3000 - #define PROTOCOL_TIMEOUT_MS 10000 namespace server { @@ -75,10 +71,10 @@ namespace server { std::mutex handledMtx; }; - class ClientClass { + class Client { public: - ClientClass(net::Conn conn, dsp::stream* out); - ~ClientClass(); + Client(std::shared_ptr sock, dsp::stream* out); + ~Client(); void showMenu(); @@ -98,7 +94,7 @@ namespace server { bool serverBusy = false; private: - static void tcpHandler(int count, uint8_t* buf, void* ctx); + void worker(); int getUI(); @@ -112,7 +108,7 @@ namespace server { static void dHandler(dsp::complex_t *data, int count, void *ctx); - net::Conn client; + std::shared_ptr sock; dsp::stream decompIn; dsp::compression::SampleStreamDecompressor decomp; @@ -137,10 +133,10 @@ namespace server { ZSTD_DCtx* dctx; + std::thread workerThread; + double currentSampleRate = 1000000.0; }; - typedef std::unique_ptr Client; - - Client connect(std::string host, uint16_t port, dsp::stream* out); + std::shared_ptr connect(std::string host, uint16_t port, dsp::stream* out); }