From 859af77bd3123e41fb1665fd6942c541cb42b790 Mon Sep 17 00:00:00 2001 From: AlexandreRouma Date: Fri, 26 Jan 2024 16:57:11 +0100 Subject: [PATCH] revamp RFspace source to use new networking library and fix buffering --- source_modules/hermes_source/src/hermes.cpp | 1 + source_modules/rfspace_source/src/main.cpp | 4 +- .../rfspace_source/src/rfspace_client.cpp | 189 +++++++++++------- .../rfspace_source/src/rfspace_client.h | 35 ++-- 4 files changed, 140 insertions(+), 89 deletions(-) diff --git a/source_modules/hermes_source/src/hermes.cpp b/source_modules/hermes_source/src/hermes.cpp index a96c7fdb..b7d967a9 100644 --- a/source_modules/hermes_source/src/hermes.cpp +++ b/source_modules/hermes_source/src/hermes.cpp @@ -173,6 +173,7 @@ namespace hermes { if (len <= 0) { break; } // Ignore anything that's not a USB packet + // TODO: Gotta check the endpoint if (htons(pkt->hdr.signature) != HERMES_METIS_SIGNATURE || pkt->hdr.type != METIS_PKT_USB) { continue; } diff --git a/source_modules/rfspace_source/src/main.cpp b/source_modules/rfspace_source/src/main.cpp index e0078efb..f2e74661 100644 --- a/source_modules/rfspace_source/src/main.cpp +++ b/source_modules/rfspace_source/src/main.cpp @@ -231,7 +231,7 @@ private: } // Create samplerate list - auto srs = client->getValidSampleRates(); + auto srs = client->getSamplerates(); sampleRates.clear(); for (auto& sr : srs) { sampleRates.define(sr, getBandwdithScaled(sr), sr); @@ -317,7 +317,7 @@ private: dsp::stream stream; SourceManager::SourceHandler handler; - rfspace::RFspaceClient client; + std::shared_ptr client; }; MOD_EXPORT void _INIT_() { diff --git a/source_modules/rfspace_source/src/rfspace_client.cpp b/source_modules/rfspace_source/src/rfspace_client.cpp index fce0b4e3..71c01345 100644 --- a/source_modules/rfspace_source/src/rfspace_client.cpp +++ b/source_modules/rfspace_source/src/rfspace_client.cpp @@ -6,15 +6,13 @@ using namespace std::chrono_literals; namespace rfspace { - RFspaceClientClass::RFspaceClientClass(net::Conn conn, net::Conn udpConn, dsp::stream* out) { - client = std::move(conn); - udpClient = std::move(udpConn); + Client::Client(std::shared_ptr tcp, std::shared_ptr udp, dsp::stream* out) { + this->tcp = tcp; + this->udp = udp; output = out; // Allocate buffers - rbuffer = new uint8_t[RFSPACE_MAX_SIZE]; sbuffer = new uint8_t[RFSPACE_MAX_SIZE]; - ubuffer = new uint8_t[RFSPACE_MAX_SIZE]; // Clear write stop of stream just in case output->clearWriteStop(); @@ -22,9 +20,9 @@ namespace rfspace { // Send UDP packet so that a router opens the port sendDummyUDP(); - // Start readers - client->readAsync(sizeof(tcpHeader), (uint8_t*)&tcpHeader, tcpHandler, this); - udpClient->readAsync(RFSPACE_MAX_SIZE, ubuffer, udpHandler, this); + // Start workers + tcpWorkerThread = std::thread(&Client::tcpWorker, this); + udpWorkerThread = std::thread(&Client::udpWorker, this); // Get device ID and wait for response getControlItem(RFSPACE_CTRL_ITEM_PROD_ID, NULL, 0); @@ -43,22 +41,20 @@ namespace rfspace { setPort(RFSPACE_RF_PORT_1); // Start heartbeat - heartBeatThread = std::thread(&RFspaceClientClass::heartBeatWorker, this); + heartBeatThread = std::thread(&Client::heartBeatWorker, this); } - RFspaceClientClass::~RFspaceClientClass() { + Client::~Client() { close(); - delete[] rbuffer; delete[] sbuffer; - delete[] ubuffer; } - void RFspaceClientClass::sendDummyUDP() { + void Client::sendDummyUDP() { uint8_t dummy = 0x5A; - udpClient->write(1, &dummy); + udp->send(&dummy, 1); } - int RFspaceClientClass::getControlItem(ControlItem item, void* param, int len) { + int Client::getControlItem(ControlItem item, void* param, int len) { // Build packet uint16_t* header = (uint16_t*)&sbuffer[0]; uint16_t* item_val = (uint16_t*)&sbuffer[2]; @@ -66,12 +62,12 @@ namespace rfspace { *item_val = item; // Send packet - client->write(4, sbuffer); + tcp->send(sbuffer, 4); return -1; } - void RFspaceClientClass::setControlItem(ControlItem item, void* param, int len) { + void Client::setControlItem(ControlItem item, void* param, int len) { // Build packet uint16_t* header = (uint16_t*)&sbuffer[0]; uint16_t* item_val = (uint16_t*)&sbuffer[2]; @@ -80,10 +76,10 @@ namespace rfspace { memcpy(&sbuffer[4], param, len); // Send packet - client->write(len + 4, sbuffer); + tcp->send(sbuffer, len + 4); } - void RFspaceClientClass::setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len) { + void Client::setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len) { // Build packet uint16_t* header = (uint16_t*)&sbuffer[0]; uint16_t* item_val = (uint16_t*)&sbuffer[2]; @@ -94,10 +90,10 @@ namespace rfspace { memcpy(&sbuffer[5], param, len); // Send packet - client->write(len + 5, sbuffer); + tcp->send(sbuffer, len + 5); } - std::vector RFspaceClientClass::getValidSampleRates() { + std::vector Client::getSamplerates() { std::vector sr; switch (deviceId) { @@ -119,92 +115,145 @@ namespace rfspace { return sr; } - void RFspaceClientClass::setFrequency(uint64_t freq) { + void Client::setFrequency(uint64_t freq) { setControlItemWithChanID(RFSPACE_CTRL_ITEM_NCO_FREQUENCY, 0, &freq, 5); } - void RFspaceClientClass::setPort(RFPort port) { + void Client::setPort(RFPort port) { uint8_t value = port; setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_PORT, 0, &value, sizeof(value)); } - void RFspaceClientClass::setGain(int8_t gain) { + void Client::setGain(int8_t gain) { setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_GAIN, 0, &gain, sizeof(gain)); } - void RFspaceClientClass::setSampleRate(uint32_t sampleRate) { + void Client::setSampleRate(uint32_t sampleRate) { + // Acquire the buffer variables + std::lock_guard lck(bufferMtx); + + // Update block size + blockSize = sampleRate / 200; + + // Send samplerate to device setControlItemWithChanID(RFSPACE_CTRL_ITEM_IQ_SAMP_RATE, 0, &sampleRate, sizeof(sampleRate)); } - void RFspaceClientClass::start(SampleFormat sampleFormat, SampleDepth sampleDepth) { + void Client::start(SampleFormat sampleFormat, SampleDepth sampleDepth) { + // Acquire the buffer variables + std::lock_guard lck(bufferMtx); + + // Reset buffer + inBuffer = 0; + + // Start device uint8_t args[4] = { (uint8_t)sampleFormat, (uint8_t)RFSPACE_STATE_RUN, (uint8_t)sampleDepth, 0 }; setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args)); } - void RFspaceClientClass::stop() { + void Client::stop() { uint8_t args[4] = { 0, RFSPACE_STATE_IDLE, 0, 0 }; setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args)); } - void RFspaceClientClass::close() { + void Client::close() { + // Stop UDP worker output->stopWriter(); + udp->close(); + if (udpWorkerThread.joinable()) { udpWorkerThread.join(); } + output->clearWriteStop(); + + // Stop heartbeat worker stopHeartBeat = true; heartBeatCnd.notify_all(); if (heartBeatThread.joinable()) { heartBeatThread.join(); } - client->close(); - udpClient->close(); - output->clearWriteStop(); + + // Stop TCP worker + tcp->close(); + if (tcpWorkerThread.joinable()) { tcpWorkerThread.join(); } } - bool RFspaceClientClass::isOpen() { - return client->isOpen(); + bool Client::isOpen() { + return tcp->isOpen() || udp->isOpen(); } - void RFspaceClientClass::tcpHandler(int count, uint8_t* buf, void* ctx) { - RFspaceClientClass* _this = (RFspaceClientClass*)ctx; - uint8_t type = _this->tcpHeader >> 13; - uint16_t size = _this->tcpHeader & 0b1111111111111; + void Client::tcpWorker() { + // Allocate receive buffer + uint8_t* buffer = new uint8_t[RFSPACE_MAX_SIZE]; - // Read the rest of the data - if (size > 2) { - _this->client->read(size - 2, &_this->rbuffer[2]); - } + // Receive loop + while (true) { + // Receive header + uint16_t header; + if (tcp->recv((uint8_t*)&header, sizeof(uint16_t), true) <= 0) { break; } - // flog::warn("TCP received: {0} {1}", type, size); + // Decode header + uint8_t type = header >> 13; + uint16_t size = header & 0b1111111111111; - // Check for a device ID - uint16_t* controlItem = (uint16_t*)&_this->rbuffer[2]; - if (type == RFSPACE_MSG_TYPE_T2H_SET_CTRL_ITEM_RESP && *controlItem == RFSPACE_CTRL_ITEM_PROD_ID) { - { - std::lock_guard lck(_this->devIdMtx); - _this->deviceId = (DeviceID)*(uint32_t*)&_this->rbuffer[4]; - _this->devIdAvailable = true; + // Receive data + if (tcp->recv(buffer, size - 2, true, RFSPACE_TIMEOUT_MS) <= 0) { break; } + + // Check for a device ID + uint16_t* controlItem = (uint16_t*)&buffer[0]; + if (type == RFSPACE_MSG_TYPE_T2H_SET_CTRL_ITEM_RESP && *controlItem == RFSPACE_CTRL_ITEM_PROD_ID) { + { + std::lock_guard lck(devIdMtx); + deviceId = (DeviceID)*(uint32_t*)&buffer[2]; + devIdAvailable = true; + } + devIdCnd.notify_all(); } - _this->devIdCnd.notify_all(); } - // Restart an async read - _this->client->readAsync(sizeof(_this->tcpHeader), (uint8_t*)&_this->tcpHeader, tcpHandler, _this); + // Free receive buffer + delete[] buffer; } - void RFspaceClientClass::udpHandler(int count, uint8_t* buf, void* ctx) { - RFspaceClientClass* _this = (RFspaceClientClass*)ctx; - uint16_t hdr = (uint16_t)buf[0] | ((uint16_t)buf[1] << 8); - uint8_t type = hdr >> 13; - uint16_t size = hdr & 0b1111111111111; + void Client::udpWorker() { + // Allocate receive buffer + uint8_t* buffer = new uint8_t[RFSPACE_MAX_SIZE]; + uint16_t* header = (uint16_t*)&buffer[0]; - if (type == RFSPACE_MSG_TYPE_T2H_DATA_ITEM_0) { - int16_t* samples = (int16_t*)&buf[4]; - int sampCount = (size - 4) / (2 * sizeof(int16_t)); - volk_16i_s32f_convert_32f((float*)_this->output->writeBuf, samples, 32768.0f, sampCount * 2); - _this->output->swap(sampCount); + // Receive loop + while (true) { + // Receive datagram + int rsize = udp->recv(buffer, RFSPACE_MAX_SIZE); + if (rsize <= 0) { break; } + + // Decode header + uint8_t type = (*header) >> 13; + uint16_t size = (*header) & 0b1111111111111; + + if (rsize != size) { + flog::error("Datagram size mismatch: {} vs {}", rsize, size); + continue; + } + + // Check for a sample packet + if (type == RFSPACE_MSG_TYPE_T2H_DATA_ITEM_0) { + // Acquire the buffer variables + std::lock_guard lck(bufferMtx); + + // Convert samples to complex float + int16_t* samples = (int16_t*)&buffer[4]; + int sampCount = (size - 4) / (2 * sizeof(int16_t)); + volk_16i_s32f_convert_32f((float*)&output->writeBuf[inBuffer], samples, 32768.0f, sampCount * 2); + inBuffer += sampCount; + + // Send out samples if enough are buffered + if (inBuffer >= blockSize) { + if (!output->swap(inBuffer)) { break; }; + inBuffer = 0; + } + } } - // Restart an async read - _this->udpClient->readAsync(RFSPACE_MAX_SIZE, _this->ubuffer, udpHandler, _this); + // Free receive buffer + delete[] buffer; } - void RFspaceClientClass::heartBeatWorker() { + void Client::heartBeatWorker() { uint8_t dummy[4]; while (true) { getControlItem(RFSPACE_CTRL_ITEM_STATE, dummy, sizeof(dummy)); @@ -216,11 +265,9 @@ namespace rfspace { } } - RFspaceClient connect(std::string host, uint16_t port, dsp::stream* out) { - net::Conn conn = net::connect(host, port); - if (!conn) { return NULL; } - net::Conn udpConn = net::openUDP("0.0.0.0", port, host, port, true); - if (!udpConn) { return NULL; } - return RFspaceClient(new RFspaceClientClass(std::move(conn), std::move(udpConn), out)); + std::shared_ptr connect(std::string host, uint16_t port, dsp::stream* out) { + auto tcp = net::connect(host, port); + auto udp = net::openudp(host, port, "0.0.0.0", port); + return std::make_shared(tcp, udp, out); } } diff --git a/source_modules/rfspace_source/src/rfspace_client.h b/source_modules/rfspace_source/src/rfspace_client.h index fc27fbc1..04aa2dc0 100644 --- a/source_modules/rfspace_source/src/rfspace_client.h +++ b/source_modules/rfspace_source/src/rfspace_client.h @@ -1,9 +1,9 @@ #pragma once -#include +#include #include #include -#include -#include +#include +#include #define RFSPACE_MAX_SIZE 8192 #define RFSPACE_HEARTBEAT_INTERVAL_MS 1000 @@ -96,10 +96,10 @@ namespace rfspace { RFSPACE_CTRL_ITEM_ERROR_LOG = 0x0410 }; - class RFspaceClientClass { + class Client { public: - RFspaceClientClass(net::Conn conn, net::Conn udpConn, dsp::stream* out); - ~RFspaceClientClass(); + Client(std::shared_ptr tcp, std::shared_ptr udp, dsp::stream* out); + ~Client(); void sendDummyUDP(); @@ -107,7 +107,7 @@ namespace rfspace { void setControlItem(ControlItem item, void* param, int len); void setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len); - std::vector getValidSampleRates(); + std::vector getSamplerates(); void setFrequency(uint64_t freq); void setPort(RFPort port); @@ -123,21 +123,22 @@ namespace rfspace { DeviceID deviceId; private: - static void tcpHandler(int count, uint8_t* buf, void* ctx); - static void udpHandler(int count, uint8_t* buf, void* ctx); + void tcpWorker(); + void udpWorker(); void heartBeatWorker(); - net::Conn client; - net::Conn udpClient; + std::shared_ptr tcp; + std::shared_ptr udp; dsp::stream* output; uint16_t tcpHeader; uint16_t udpHeader; - uint8_t* rbuffer = NULL; uint8_t* sbuffer = NULL; - uint8_t* ubuffer = NULL; + + std::thread tcpWorkerThread; + std::thread udpWorkerThread; std::thread heartBeatThread; std::mutex heartBeatMtx; @@ -147,10 +148,12 @@ namespace rfspace { bool devIdAvailable = false; std::condition_variable devIdCnd; std::mutex devIdMtx; + + std::mutex bufferMtx; + int blockSize = 256; + int inBuffer = 0; }; - typedef std::unique_ptr RFspaceClient; - - RFspaceClient connect(std::string host, uint16_t port, dsp::stream* out); + std::shared_ptr connect(std::string host, uint16_t port, dsp::stream* out); }