From 4c0220a2284f0f9cdf39ef6c0bd2f441b74b0cee Mon Sep 17 00:00:00 2001 From: AlexandreRouma Date: Wed, 26 Jan 2022 13:23:55 +0100 Subject: [PATCH] compression --- core/CMakeLists.txt | 4 +++ core/src/dsp/compression.h | 7 +++- core/src/dsp/stream.h | 7 ++++ core/src/server.cpp | 32 +++++++++++++------ core/src/server_protocol.h | 2 ++ .../sdrpp_server_source/CMakeLists.txt | 4 +++ .../sdrpp_server_source/src/main.cpp | 18 +++++++++-- .../src/sdrpp_server_client.cpp | 14 ++++++++ .../src/sdrpp_server_client.h | 4 +++ 9 files changed, 79 insertions(+), 13 deletions(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 04709668..9be5af2a 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -71,6 +71,10 @@ if (MSVC) # WinSock2 target_link_libraries(sdrpp_core PUBLIC wsock32 ws2_32) + # ZSTD + find_package(zstd CONFIG REQUIRED) + target_link_libraries(sdrpp_core PRIVATE zstd::libzstd_shared) + else() find_package(PkgConfig) find_package(OpenGL REQUIRED) diff --git a/core/src/dsp/compression.h b/core/src/dsp/compression.h index 5e882119..b3f4d34c 100644 --- a/core/src/dsp/compression.h +++ b/core/src/dsp/compression.h @@ -12,11 +12,16 @@ namespace dsp { public: DynamicRangeCompressor() {} - DynamicRangeCompressor(stream* in, PCMType pcmType) { init(in, pcmType); } + DynamicRangeCompressor(stream* in, PCMType pcmType) { + init(in, pcmType); + } void init(stream* in, PCMType pcmType) { _in = in; _pcmType = pcmType; + + out.setBufferSize((sizeof(dsp::complex_t) * STREAM_BUFFER_SIZE) + 8); + generic_block::registerInput(_in); generic_block::registerOutput(&out); generic_block::_block_init = true; diff --git a/core/src/dsp/stream.h b/core/src/dsp/stream.h index abe5dd5b..f186d83f 100644 --- a/core/src/dsp/stream.h +++ b/core/src/dsp/stream.h @@ -31,6 +31,13 @@ namespace dsp { volk_free(readBuf); } + void setBufferSize(int samples) { + volk_free(writeBuf); + volk_free(readBuf); + writeBuf = (T*)volk_malloc(samples * sizeof(T), volk_get_alignment()); + readBuf = (T*)volk_malloc(samples * sizeof(T), volk_get_alignment()); + } + bool swap(int size) { { // Wait to either swap or stop diff --git a/core/src/server.cpp b/core/src/server.cpp index 91074259..f53991af 100644 --- a/core/src/server.cpp +++ b/core/src/server.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace server { dsp::stream dummyInput; @@ -35,11 +36,14 @@ namespace server { SmGui::DrawListElem dummyElem; + ZSTD_CCtx* cctx; + net::Listener listener; OptionList sourceList; int sourceId = 0; bool running = false; + bool compression = false; double sampleRate = 1000000.0; int main() { @@ -68,9 +72,9 @@ namespace server { bb_pkt_hdr = (PacketHeader*)bbuf; bb_pkt_data = &bbuf[sizeof(PacketHeader)]; - // Terminate config manager - core::configManager.disableAutoSave(); - core::configManager.save(); + // Initialize compressor + cctx = ZSTD_createCCtx(); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 1); core::configManager.acquire(); std::string modulesDir = core::configManager.conf["modulesDirectory"]; @@ -183,6 +187,7 @@ namespace server { // Perform settings reset sigpath::sourceManager.stop(); comp.setPCMType(dsp::PCM_TYPE_I16); + compression = false; sendSampleRate(sampleRate); @@ -218,14 +223,20 @@ namespace server { } void _testServerHandler(uint8_t* data, int count, void* ctx) { - // Build data packet - PacketHeader* hdr = (PacketHeader*)bbuf; - hdr->type = PACKET_TYPE_BASEBAND; - hdr->size = sizeof(PacketHeader) + count; - memcpy(&bbuf[sizeof(PacketHeader)], data, count); + // Compress data if needed and fill out header fields + if (compression) { + bb_pkt_hdr->type = PACKET_TYPE_BASEBAND_COMPRESSED; + bb_pkt_hdr->size = sizeof(PacketHeader) + (uint32_t)ZSTD_compress2(cctx, &bbuf[sizeof(PacketHeader)], SERVER_MAX_PACKET_SIZE, data, count); + + } + else { + bb_pkt_hdr->type = PACKET_TYPE_BASEBAND; + bb_pkt_hdr->size = sizeof(PacketHeader) + count; + memcpy(&bbuf[sizeof(PacketHeader)], data, count); + } // Write to network - if (client && client->isOpen()) { client->write(hdr->size, bbuf); } + if (client && client->isOpen()) { client->write(bb_pkt_hdr->size, bbuf); } } void setInput(dsp::stream* stream) { @@ -281,6 +292,9 @@ namespace server { dsp::PCMType type = (dsp::PCMType)*(uint8_t*)data; comp.setPCMType(type); } + else if (cmd == COMMAND_SET_COMPRESSION && len == 1) { + compression = *(uint8_t*)data; + } else { spdlog::error("Invalid Command: {0} (len = {1})", cmd, len); sendError(ERROR_INVALID_COMMAND); diff --git a/core/src/server_protocol.h b/core/src/server_protocol.h index cec747d7..80e4378a 100644 --- a/core/src/server_protocol.h +++ b/core/src/server_protocol.h @@ -11,6 +11,7 @@ namespace server { PACKET_TYPE_COMMAND, PACKET_TYPE_COMMAND_ACK, PACKET_TYPE_BASEBAND, + PACKET_TYPE_BASEBAND_COMPRESSED, PACKET_TYPE_VFO, PACKET_TYPE_FFT, PACKET_TYPE_ERROR @@ -25,6 +26,7 @@ namespace server { COMMAND_SET_FREQUENCY, COMMAND_GET_SAMPLERATE, COMMAND_SET_SAMPLE_TYPE, + COMMAND_SET_COMPRESSION, // Server to client COMMAND_SET_SAMPLERATE = 0x80, diff --git a/source_modules/sdrpp_server_source/CMakeLists.txt b/source_modules/sdrpp_server_source/CMakeLists.txt index 477e6484..09905371 100644 --- a/source_modules/sdrpp_server_source/CMakeLists.txt +++ b/source_modules/sdrpp_server_source/CMakeLists.txt @@ -19,6 +19,10 @@ endif () if(WIN32) target_link_libraries(sdrpp_server_source PRIVATE wsock32 ws2_32) + + # ZSTD + find_package(zstd CONFIG REQUIRED) + target_link_libraries(sdrpp_server_source PRIVATE zstd::libzstd_shared) endif() # Install directives diff --git a/source_modules/sdrpp_server_source/src/main.cpp b/source_modules/sdrpp_server_source/src/main.cpp index 85552824..f45bddcc 100644 --- a/source_modules/sdrpp_server_source/src/main.cpp +++ b/source_modules/sdrpp_server_source/src/main.cpp @@ -194,11 +194,18 @@ private: config.conf["servers"][_this->devConfName]["sampleType"] = _this->sampleTypeList.key(_this->sampleTypeId); config.release(true); } + + if (ImGui::Checkbox("Compression", &_this->compression)) { + _this->client->setCompression(_this->compression); - bool dummy = false; + // Save config + config.acquire(); + config.conf["servers"][_this->devConfName]["compression"] = _this->compression; + config.release(true); + } + + bool dummy = true; style::beginDisabled(); - ImGui::Checkbox("Compression", &dummy); - dummy = true; ImGui::Checkbox("Full IQ", &dummy); style::endDisabled(); @@ -237,9 +244,13 @@ private: std::string key = config.conf["servers"][devConfName]["sampleType"]; if (sampleTypeList.keyExists(key)) { sampleTypeId = sampleTypeList.keyId(key); } } + if (config.conf["servers"][devConfName].contains("compression")) { + compression = config.conf["servers"][devConfName]["compression"]; + } // Set settings client->setSampleType(sampleTypeList[sampleTypeId]); + client->setCompression(compression); } std::string name; @@ -261,6 +272,7 @@ private: OptionList sampleTypeList; int sampleTypeId; + bool compression = false; server::Client client; }; diff --git a/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp b/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp index 09c66f2b..bcfacc7f 100644 --- a/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp +++ b/source_modules/sdrpp_server_source/src/sdrpp_server_client.cpp @@ -26,7 +26,11 @@ namespace server { s_cmd_hdr = (CommandHeader*)s_pkt_data; s_cmd_data = &sbuffer[sizeof(PacketHeader) + sizeof(CommandHeader)]; + // Initialize decompressor + dctx = ZSTD_createDCtx(); + // Initialize DSP + decompIn.setBufferSize((sizeof(dsp::complex_t) * STREAM_BUFFER_SIZE) + 8); decomp.init(&decompIn); link.init(&decomp.out, output); decomp.start(); @@ -43,6 +47,7 @@ namespace server { ClientClass::~ClientClass() { close(); + ZSTD_freeDCtx(dctx); delete[] rbuffer; delete[] sbuffer; } @@ -108,6 +113,11 @@ namespace server { sendCommand(COMMAND_SET_SAMPLE_TYPE, 1); } + void ClientClass::setCompression(bool enabled) { + s_cmd_data[0] = enabled; + sendCommand(COMMAND_SET_COMPRESSION, 1); + } + void ClientClass::start() { if (!client || !client->isOpen()) { return; } sendCommand(COMMAND_START, 0); @@ -189,6 +199,10 @@ namespace server { memcpy(_this->decompIn.writeBuf, &buf[sizeof(PacketHeader)], _this->r_pkt_hdr->size - sizeof(PacketHeader)); _this->decompIn.swap(_this->r_pkt_hdr->size - sizeof(PacketHeader)); } + else if (_this->r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) { + size_t outCount = ZSTD_decompressDCtx(_this->dctx, _this->decompIn.writeBuf, STREAM_BUFFER_SIZE, _this->r_pkt_data, _this->r_pkt_hdr->size - sizeof(PacketHeader)); + if (outCount) { _this->decompIn.swap(outCount); }; + } else if (_this->r_pkt_hdr->type == PACKET_TYPE_ERROR) { spdlog::error("SDR++ Server Error: {0}", buf[sizeof(PacketHeader)]); } diff --git a/source_modules/sdrpp_server_source/src/sdrpp_server_client.h b/source_modules/sdrpp_server_source/src/sdrpp_server_client.h index 172138b5..afdba726 100644 --- a/source_modules/sdrpp_server_source/src/sdrpp_server_client.h +++ b/source_modules/sdrpp_server_source/src/sdrpp_server_client.h @@ -11,6 +11,7 @@ #include #include #include +#include #define RFSPACE_MAX_SIZE 8192 #define RFSPACE_HEARTBEAT_INTERVAL_MS 1000 @@ -85,6 +86,7 @@ namespace server { double getSampleRate(); void setSampleType(dsp::PCMType type); + void setCompression(bool enabled); void start(); void stop(); @@ -133,6 +135,8 @@ namespace server { SmGui::DrawList dl; std::mutex dlMtx; + ZSTD_DCtx* dctx; + double currentSampleRate = 1000000.0; };