revamp RFspace source to use new networking library and fix buffering

This commit is contained in:
AlexandreRouma 2024-01-26 16:57:11 +01:00
parent f5adc7c587
commit 859af77bd3
4 changed files with 140 additions and 89 deletions

View File

@ -173,6 +173,7 @@ namespace hermes {
if (len <= 0) { break; } if (len <= 0) { break; }
// Ignore anything that's not a USB packet // Ignore anything that's not a USB packet
// TODO: Gotta check the endpoint
if (htons(pkt->hdr.signature) != HERMES_METIS_SIGNATURE || pkt->hdr.type != METIS_PKT_USB) { if (htons(pkt->hdr.signature) != HERMES_METIS_SIGNATURE || pkt->hdr.type != METIS_PKT_USB) {
continue; continue;
} }

View File

@ -231,7 +231,7 @@ private:
} }
// Create samplerate list // Create samplerate list
auto srs = client->getValidSampleRates(); auto srs = client->getSamplerates();
sampleRates.clear(); sampleRates.clear();
for (auto& sr : srs) { for (auto& sr : srs) {
sampleRates.define(sr, getBandwdithScaled(sr), sr); sampleRates.define(sr, getBandwdithScaled(sr), sr);
@ -317,7 +317,7 @@ private:
dsp::stream<dsp::complex_t> stream; dsp::stream<dsp::complex_t> stream;
SourceManager::SourceHandler handler; SourceManager::SourceHandler handler;
rfspace::RFspaceClient client; std::shared_ptr<rfspace::Client> client;
}; };
MOD_EXPORT void _INIT_() { MOD_EXPORT void _INIT_() {

View File

@ -6,15 +6,13 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
namespace rfspace { namespace rfspace {
RFspaceClientClass::RFspaceClientClass(net::Conn conn, net::Conn udpConn, dsp::stream<dsp::complex_t>* out) { Client::Client(std::shared_ptr<net::Socket> tcp, std::shared_ptr<net::Socket> udp, dsp::stream<dsp::complex_t>* out) {
client = std::move(conn); this->tcp = tcp;
udpClient = std::move(udpConn); this->udp = udp;
output = out; output = out;
// Allocate buffers // Allocate buffers
rbuffer = new uint8_t[RFSPACE_MAX_SIZE];
sbuffer = new uint8_t[RFSPACE_MAX_SIZE]; sbuffer = new uint8_t[RFSPACE_MAX_SIZE];
ubuffer = new uint8_t[RFSPACE_MAX_SIZE];
// Clear write stop of stream just in case // Clear write stop of stream just in case
output->clearWriteStop(); output->clearWriteStop();
@ -22,9 +20,9 @@ namespace rfspace {
// Send UDP packet so that a router opens the port // Send UDP packet so that a router opens the port
sendDummyUDP(); sendDummyUDP();
// Start readers // Start workers
client->readAsync(sizeof(tcpHeader), (uint8_t*)&tcpHeader, tcpHandler, this); tcpWorkerThread = std::thread(&Client::tcpWorker, this);
udpClient->readAsync(RFSPACE_MAX_SIZE, ubuffer, udpHandler, this); udpWorkerThread = std::thread(&Client::udpWorker, this);
// Get device ID and wait for response // Get device ID and wait for response
getControlItem(RFSPACE_CTRL_ITEM_PROD_ID, NULL, 0); getControlItem(RFSPACE_CTRL_ITEM_PROD_ID, NULL, 0);
@ -43,22 +41,20 @@ namespace rfspace {
setPort(RFSPACE_RF_PORT_1); setPort(RFSPACE_RF_PORT_1);
// Start heartbeat // Start heartbeat
heartBeatThread = std::thread(&RFspaceClientClass::heartBeatWorker, this); heartBeatThread = std::thread(&Client::heartBeatWorker, this);
} }
RFspaceClientClass::~RFspaceClientClass() { Client::~Client() {
close(); close();
delete[] rbuffer;
delete[] sbuffer; delete[] sbuffer;
delete[] ubuffer;
} }
void RFspaceClientClass::sendDummyUDP() { void Client::sendDummyUDP() {
uint8_t dummy = 0x5A; uint8_t dummy = 0x5A;
udpClient->write(1, &dummy); udp->send(&dummy, 1);
} }
int RFspaceClientClass::getControlItem(ControlItem item, void* param, int len) { int Client::getControlItem(ControlItem item, void* param, int len) {
// Build packet // Build packet
uint16_t* header = (uint16_t*)&sbuffer[0]; uint16_t* header = (uint16_t*)&sbuffer[0];
uint16_t* item_val = (uint16_t*)&sbuffer[2]; uint16_t* item_val = (uint16_t*)&sbuffer[2];
@ -66,12 +62,12 @@ namespace rfspace {
*item_val = item; *item_val = item;
// Send packet // Send packet
client->write(4, sbuffer); tcp->send(sbuffer, 4);
return -1; return -1;
} }
void RFspaceClientClass::setControlItem(ControlItem item, void* param, int len) { void Client::setControlItem(ControlItem item, void* param, int len) {
// Build packet // Build packet
uint16_t* header = (uint16_t*)&sbuffer[0]; uint16_t* header = (uint16_t*)&sbuffer[0];
uint16_t* item_val = (uint16_t*)&sbuffer[2]; uint16_t* item_val = (uint16_t*)&sbuffer[2];
@ -80,10 +76,10 @@ namespace rfspace {
memcpy(&sbuffer[4], param, len); memcpy(&sbuffer[4], param, len);
// Send packet // Send packet
client->write(len + 4, sbuffer); tcp->send(sbuffer, len + 4);
} }
void RFspaceClientClass::setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len) { void Client::setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len) {
// Build packet // Build packet
uint16_t* header = (uint16_t*)&sbuffer[0]; uint16_t* header = (uint16_t*)&sbuffer[0];
uint16_t* item_val = (uint16_t*)&sbuffer[2]; uint16_t* item_val = (uint16_t*)&sbuffer[2];
@ -94,10 +90,10 @@ namespace rfspace {
memcpy(&sbuffer[5], param, len); memcpy(&sbuffer[5], param, len);
// Send packet // Send packet
client->write(len + 5, sbuffer); tcp->send(sbuffer, len + 5);
} }
std::vector<uint32_t> RFspaceClientClass::getValidSampleRates() { std::vector<uint32_t> Client::getSamplerates() {
std::vector<uint32_t> sr; std::vector<uint32_t> sr;
switch (deviceId) { switch (deviceId) {
@ -119,92 +115,145 @@ namespace rfspace {
return sr; return sr;
} }
void RFspaceClientClass::setFrequency(uint64_t freq) { void Client::setFrequency(uint64_t freq) {
setControlItemWithChanID(RFSPACE_CTRL_ITEM_NCO_FREQUENCY, 0, &freq, 5); setControlItemWithChanID(RFSPACE_CTRL_ITEM_NCO_FREQUENCY, 0, &freq, 5);
} }
void RFspaceClientClass::setPort(RFPort port) { void Client::setPort(RFPort port) {
uint8_t value = port; uint8_t value = port;
setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_PORT, 0, &value, sizeof(value)); setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_PORT, 0, &value, sizeof(value));
} }
void RFspaceClientClass::setGain(int8_t gain) { void Client::setGain(int8_t gain) {
setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_GAIN, 0, &gain, sizeof(gain)); setControlItemWithChanID(RFSPACE_CTRL_ITEM_RF_GAIN, 0, &gain, sizeof(gain));
} }
void RFspaceClientClass::setSampleRate(uint32_t sampleRate) { void Client::setSampleRate(uint32_t sampleRate) {
// Acquire the buffer variables
std::lock_guard<std::mutex> lck(bufferMtx);
// Update block size
blockSize = sampleRate / 200;
// Send samplerate to device
setControlItemWithChanID(RFSPACE_CTRL_ITEM_IQ_SAMP_RATE, 0, &sampleRate, sizeof(sampleRate)); setControlItemWithChanID(RFSPACE_CTRL_ITEM_IQ_SAMP_RATE, 0, &sampleRate, sizeof(sampleRate));
} }
void RFspaceClientClass::start(SampleFormat sampleFormat, SampleDepth sampleDepth) { void Client::start(SampleFormat sampleFormat, SampleDepth sampleDepth) {
// Acquire the buffer variables
std::lock_guard<std::mutex> lck(bufferMtx);
// Reset buffer
inBuffer = 0;
// Start device
uint8_t args[4] = { (uint8_t)sampleFormat, (uint8_t)RFSPACE_STATE_RUN, (uint8_t)sampleDepth, 0 }; uint8_t args[4] = { (uint8_t)sampleFormat, (uint8_t)RFSPACE_STATE_RUN, (uint8_t)sampleDepth, 0 };
setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args)); setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args));
} }
void RFspaceClientClass::stop() { void Client::stop() {
uint8_t args[4] = { 0, RFSPACE_STATE_IDLE, 0, 0 }; uint8_t args[4] = { 0, RFSPACE_STATE_IDLE, 0, 0 };
setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args)); setControlItem(RFSPACE_CTRL_ITEM_STATE, args, sizeof(args));
} }
void RFspaceClientClass::close() { void Client::close() {
// Stop UDP worker
output->stopWriter(); output->stopWriter();
udp->close();
if (udpWorkerThread.joinable()) { udpWorkerThread.join(); }
output->clearWriteStop();
// Stop heartbeat worker
stopHeartBeat = true; stopHeartBeat = true;
heartBeatCnd.notify_all(); heartBeatCnd.notify_all();
if (heartBeatThread.joinable()) { heartBeatThread.join(); } if (heartBeatThread.joinable()) { heartBeatThread.join(); }
client->close();
udpClient->close(); // Stop TCP worker
output->clearWriteStop(); tcp->close();
if (tcpWorkerThread.joinable()) { tcpWorkerThread.join(); }
} }
bool RFspaceClientClass::isOpen() { bool Client::isOpen() {
return client->isOpen(); return tcp->isOpen() || udp->isOpen();
} }
void RFspaceClientClass::tcpHandler(int count, uint8_t* buf, void* ctx) { void Client::tcpWorker() {
RFspaceClientClass* _this = (RFspaceClientClass*)ctx; // Allocate receive buffer
uint8_t type = _this->tcpHeader >> 13; uint8_t* buffer = new uint8_t[RFSPACE_MAX_SIZE];
uint16_t size = _this->tcpHeader & 0b1111111111111;
// Read the rest of the data // Receive loop
if (size > 2) { while (true) {
_this->client->read(size - 2, &_this->rbuffer[2]); // Receive header
} uint16_t header;
if (tcp->recv((uint8_t*)&header, sizeof(uint16_t), true) <= 0) { break; }
// flog::warn("TCP received: {0} {1}", type, size); // Decode header
uint8_t type = header >> 13;
uint16_t size = header & 0b1111111111111;
// Receive data
if (tcp->recv(buffer, size - 2, true, RFSPACE_TIMEOUT_MS) <= 0) { break; }
// Check for a device ID // Check for a device ID
uint16_t* controlItem = (uint16_t*)&_this->rbuffer[2]; uint16_t* controlItem = (uint16_t*)&buffer[0];
if (type == RFSPACE_MSG_TYPE_T2H_SET_CTRL_ITEM_RESP && *controlItem == RFSPACE_CTRL_ITEM_PROD_ID) { if (type == RFSPACE_MSG_TYPE_T2H_SET_CTRL_ITEM_RESP && *controlItem == RFSPACE_CTRL_ITEM_PROD_ID) {
{ {
std::lock_guard<std::mutex> lck(_this->devIdMtx); std::lock_guard<std::mutex> lck(devIdMtx);
_this->deviceId = (DeviceID)*(uint32_t*)&_this->rbuffer[4]; deviceId = (DeviceID)*(uint32_t*)&buffer[2];
_this->devIdAvailable = true; devIdAvailable = true;
}
devIdCnd.notify_all();
} }
_this->devIdCnd.notify_all();
} }
// Restart an async read // Free receive buffer
_this->client->readAsync(sizeof(_this->tcpHeader), (uint8_t*)&_this->tcpHeader, tcpHandler, _this); delete[] buffer;
} }
void RFspaceClientClass::udpHandler(int count, uint8_t* buf, void* ctx) { void Client::udpWorker() {
RFspaceClientClass* _this = (RFspaceClientClass*)ctx; // Allocate receive buffer
uint16_t hdr = (uint16_t)buf[0] | ((uint16_t)buf[1] << 8); uint8_t* buffer = new uint8_t[RFSPACE_MAX_SIZE];
uint8_t type = hdr >> 13; uint16_t* header = (uint16_t*)&buffer[0];
uint16_t size = hdr & 0b1111111111111;
// Receive loop
while (true) {
// Receive datagram
int rsize = udp->recv(buffer, RFSPACE_MAX_SIZE);
if (rsize <= 0) { break; }
// Decode header
uint8_t type = (*header) >> 13;
uint16_t size = (*header) & 0b1111111111111;
if (rsize != size) {
flog::error("Datagram size mismatch: {} vs {}", rsize, size);
continue;
}
// Check for a sample packet
if (type == RFSPACE_MSG_TYPE_T2H_DATA_ITEM_0) { if (type == RFSPACE_MSG_TYPE_T2H_DATA_ITEM_0) {
int16_t* samples = (int16_t*)&buf[4]; // Acquire the buffer variables
std::lock_guard<std::mutex> lck(bufferMtx);
// Convert samples to complex float
int16_t* samples = (int16_t*)&buffer[4];
int sampCount = (size - 4) / (2 * sizeof(int16_t)); int sampCount = (size - 4) / (2 * sizeof(int16_t));
volk_16i_s32f_convert_32f((float*)_this->output->writeBuf, samples, 32768.0f, sampCount * 2); volk_16i_s32f_convert_32f((float*)&output->writeBuf[inBuffer], samples, 32768.0f, sampCount * 2);
_this->output->swap(sampCount); inBuffer += sampCount;
// Send out samples if enough are buffered
if (inBuffer >= blockSize) {
if (!output->swap(inBuffer)) { break; };
inBuffer = 0;
}
}
} }
// Restart an async read // Free receive buffer
_this->udpClient->readAsync(RFSPACE_MAX_SIZE, _this->ubuffer, udpHandler, _this); delete[] buffer;
} }
void RFspaceClientClass::heartBeatWorker() { void Client::heartBeatWorker() {
uint8_t dummy[4]; uint8_t dummy[4];
while (true) { while (true) {
getControlItem(RFSPACE_CTRL_ITEM_STATE, dummy, sizeof(dummy)); getControlItem(RFSPACE_CTRL_ITEM_STATE, dummy, sizeof(dummy));
@ -216,11 +265,9 @@ namespace rfspace {
} }
} }
RFspaceClient connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) { std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out) {
net::Conn conn = net::connect(host, port); auto tcp = net::connect(host, port);
if (!conn) { return NULL; } auto udp = net::openudp(host, port, "0.0.0.0", port);
net::Conn udpConn = net::openUDP("0.0.0.0", port, host, port, true); return std::make_shared<Client>(tcp, udp, out);
if (!udpConn) { return NULL; }
return RFspaceClient(new RFspaceClientClass(std::move(conn), std::move(udpConn), out));
} }
} }

View File

@ -1,9 +1,9 @@
#pragma once #pragma once
#include <utils/networking.h> #include <utils/net.h>
#include <dsp/stream.h> #include <dsp/stream.h>
#include <dsp/types.h> #include <dsp/types.h>
#include <atomic> #include <vector>
#include <queue> #include <mutex>
#define RFSPACE_MAX_SIZE 8192 #define RFSPACE_MAX_SIZE 8192
#define RFSPACE_HEARTBEAT_INTERVAL_MS 1000 #define RFSPACE_HEARTBEAT_INTERVAL_MS 1000
@ -96,10 +96,10 @@ namespace rfspace {
RFSPACE_CTRL_ITEM_ERROR_LOG = 0x0410 RFSPACE_CTRL_ITEM_ERROR_LOG = 0x0410
}; };
class RFspaceClientClass { class Client {
public: public:
RFspaceClientClass(net::Conn conn, net::Conn udpConn, dsp::stream<dsp::complex_t>* out); Client(std::shared_ptr<net::Socket> tcp, std::shared_ptr<net::Socket> udp, dsp::stream<dsp::complex_t>* out);
~RFspaceClientClass(); ~Client();
void sendDummyUDP(); void sendDummyUDP();
@ -107,7 +107,7 @@ namespace rfspace {
void setControlItem(ControlItem item, void* param, int len); void setControlItem(ControlItem item, void* param, int len);
void setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len); void setControlItemWithChanID(ControlItem item, uint8_t chanId, void* param, int len);
std::vector<uint32_t> getValidSampleRates(); std::vector<uint32_t> getSamplerates();
void setFrequency(uint64_t freq); void setFrequency(uint64_t freq);
void setPort(RFPort port); void setPort(RFPort port);
@ -123,21 +123,22 @@ namespace rfspace {
DeviceID deviceId; DeviceID deviceId;
private: private:
static void tcpHandler(int count, uint8_t* buf, void* ctx); void tcpWorker();
static void udpHandler(int count, uint8_t* buf, void* ctx); void udpWorker();
void heartBeatWorker(); void heartBeatWorker();
net::Conn client; std::shared_ptr<net::Socket> tcp;
net::Conn udpClient; std::shared_ptr<net::Socket> udp;
dsp::stream<dsp::complex_t>* output; dsp::stream<dsp::complex_t>* output;
uint16_t tcpHeader; uint16_t tcpHeader;
uint16_t udpHeader; uint16_t udpHeader;
uint8_t* rbuffer = NULL;
uint8_t* sbuffer = NULL; uint8_t* sbuffer = NULL;
uint8_t* ubuffer = NULL;
std::thread tcpWorkerThread;
std::thread udpWorkerThread;
std::thread heartBeatThread; std::thread heartBeatThread;
std::mutex heartBeatMtx; std::mutex heartBeatMtx;
@ -147,10 +148,12 @@ namespace rfspace {
bool devIdAvailable = false; bool devIdAvailable = false;
std::condition_variable devIdCnd; std::condition_variable devIdCnd;
std::mutex devIdMtx; std::mutex devIdMtx;
std::mutex bufferMtx;
int blockSize = 256;
int inBuffer = 0;
}; };
typedef std::unique_ptr<RFspaceClientClass> RFspaceClient; std::shared_ptr<Client> connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
RFspaceClient connect(std::string host, uint16_t port, dsp::stream<dsp::complex_t>* out);
} }