Work on the spectran HTTP source

This commit is contained in:
AlexandreRouma 2023-04-19 02:25:44 +02:00
parent 604f95fd96
commit 9c0b57a036
4 changed files with 140 additions and 5 deletions

View File

@ -0,0 +1,51 @@
#pragma once
#include <functional>
#include <stdexcept>
#include <mutex>
#include <map>
typedef int HandlerID;
template <typename... Args>
class NewEvent {
using Handler = std::function<void(Args...)>;
public:
HandlerID bind(const Handler& handler) {
std::lock_guard<std::mutex> lck(mtx);
HandlerID id = genID();
handlers[id] = handler;
return id;
}
template<typename MHandler, class T>
HandlerID bind(MHandler handler, T* ctx) {
return bind([=](Args... args){
(ctx->*handler)(args...);
});
}
void unbind(HandlerID id) {
std::lock_guard<std::mutex> lck(mtx);
if (handlers.find(id) == handlers.end()) {
throw std::runtime_error("Could not unbind handler, unknown ID");
}
handlers.erase(id);
}
void operator()(Args... args) {
std::lock_guard<std::mutex> lck(mtx);
for (const auto& [desc, handler] : handlers) {
handler(args...);
}
}
private:
HandlerID genID() {
int id;
for (id = 1; handlers.find(id) != handlers.end(); id++);
return id;
}
std::map<HandlerID, Handler> handlers;
std::mutex mtx;
};

View File

@ -28,7 +28,7 @@ public:
this->name = name; this->name = name;
strcpy(hostname, "localhost"); strcpy(hostname, "localhost");
sampleRate = 41000000.0; sampleRate = 5750000.0;
handler.ctx = this; handler.ctx = this;
handler.selectHandler = menuSelected; handler.selectHandler = menuSelected;
@ -103,8 +103,14 @@ private:
static void tune(double freq, void* ctx) { static void tune(double freq, void* ctx) {
SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx;
if (_this->running) { bool connected = (_this->client && _this->client->isOpen());
// TODO if (connected) {
int64_t newfreq = round(freq);
if (newfreq != _this->lastReportedFreq && _this->gotReport) {
flog::debug("Sending tuning command");
_this->lastReportedFreq = newfreq;
_this->client->setCenterFrequency(newfreq);
}
} }
_this->freq = freq; _this->freq = freq;
flog::info("SpectranHTTPSourceModule '{0}': Tune: {1}!", _this->name, freq); flog::info("SpectranHTTPSourceModule '{0}': Tune: {1}!", _this->name, freq);
@ -138,6 +144,7 @@ private:
_this->tryConnect(); _this->tryConnect();
} }
else if (connected && SmGui::Button("Disconnect##spectran_http_source")) { else if (connected && SmGui::Button("Disconnect##spectran_http_source")) {
_this->client->onCenterFrequencyChanged.unbind(_this->onFreqChangedId);
_this->client->close(); _this->client->close();
} }
if (_this->running) { style::endDisabled(); } if (_this->running) { style::endDisabled(); }
@ -154,13 +161,23 @@ private:
void tryConnect() { void tryConnect() {
try { try {
gotReport = false;
client = std::make_shared<SpectranHTTPClient>(hostname, port, &stream); client = std::make_shared<SpectranHTTPClient>(hostname, port, &stream);
onFreqChangedId = client->onCenterFrequencyChanged.bind(&SpectranHTTPSourceModule::onFreqChanged, this);
client->startWorker();
} }
catch (std::runtime_error e) { catch (std::runtime_error e) {
flog::error("Could not connect: {0}", e.what()); flog::error("Could not connect: {0}", e.what());
} }
} }
void onFreqChanged(double newFreq) {
if (lastReportedFreq == newFreq) { return; }
lastReportedFreq = newFreq;
tuner::tune(tuner::TUNER_MODE_IQ_ONLY, "", newFreq);
gotReport = true;
}
std::string name; std::string name;
bool enabled = true; bool enabled = true;
double sampleRate; double sampleRate;
@ -168,11 +185,15 @@ private:
bool running = false; bool running = false;
std::shared_ptr<SpectranHTTPClient> client; std::shared_ptr<SpectranHTTPClient> client;
HandlerID onFreqChangedId;
double freq; double freq;
int64_t lastReportedFreq = 0;
bool gotReport;
char hostname[1024]; char hostname[1024];
int port = 80; int port = 54664;
dsp::stream<dsp::complex_t> stream; dsp::stream<dsp::complex_t> stream;
}; };

View File

@ -5,6 +5,8 @@ SpectranHTTPClient::SpectranHTTPClient(std::string host, int port, dsp::stream<d
this->stream = stream; this->stream = stream;
// Connect to server // Connect to server
this->host = host;
this->port = port;
sock = net::connect(host, port); sock = net::connect(host, port);
http = net::http::Client(sock); http = net::http::Client(sock);
@ -14,6 +16,13 @@ SpectranHTTPClient::SpectranHTTPClient(std::string host, int port, dsp::stream<d
net::http::ResponseHeader rshdr; net::http::ResponseHeader rshdr;
http.recvResponseHeader(rshdr, 5000); http.recvResponseHeader(rshdr, 5000);
if (rshdr.getStatusCode() != net::http::STATUS_CODE_OK) {
flog::error("HTTP request did not return ok: {}", rshdr.getStatusString());
throw std::runtime_error("HTTP request did not return ok");
}
}
void SpectranHTTPClient::startWorker() {
// Start chunk worker // Start chunk worker
workerThread = std::thread(&SpectranHTTPClient::worker, this); workerThread = std::thread(&SpectranHTTPClient::worker, this);
} }
@ -33,6 +42,27 @@ void SpectranHTTPClient::close() {
stream->clearWriteStop(); stream->clearWriteStop();
} }
void SpectranHTTPClient::setCenterFrequency(uint64_t freq) {
// Connect to control endpoint (TODO: Switch to an always connected endpoint)
auto controlSock = net::connect(host, port);
auto controlHttp = net::http::Client(controlSock);
// Make request
net::http::RequestHeader rqhdr(net::http::METHOD_PUT, "/control", host);
char buf[1024];
sprintf(buf, "{\"frequencyCenter\":%d,\"frequencySpan\":%d,\"type\":\"capture\"}", freq, _span);
std::string data = buf;
char lenBuf[16];
sprintf(lenBuf, "%d", data.size());
rqhdr.setField("Content-Length", lenBuf);
controlHttp.sendRequestHeader(rqhdr);
controlSock->sendstr(data);
net::http::ResponseHeader rshdr;
controlHttp.recvResponseHeader(rshdr, 5000);
flog::debug("Response: {}", rshdr.getStatusString());
}
void SpectranHTTPClient::worker() { void SpectranHTTPClient::worker() {
while (sock->isOpen()) { while (sock->isOpen()) {
// Get chunk header // Get chunk header
@ -52,6 +82,26 @@ void SpectranHTTPClient::worker() {
return; return;
} }
// Decode JSON (yes, this is hacky, but it must be extremely fast)
auto startFreqBegin = jsonData.find("\"startFrequency\":");
auto startFreqEnd = jsonData.find(',', startFreqBegin);
std::string startFreqStr = jsonData.substr(startFreqBegin + 17, startFreqEnd - startFreqBegin - 17);
int64_t startFreq = std::stoll(startFreqStr);
auto endFreqBegin = jsonData.find("\"endFrequency\":");
auto endFreqEnd = jsonData.find(',', endFreqBegin);
std::string endFreqStr = jsonData.substr(endFreqBegin + 15, endFreqEnd - endFreqBegin - 15);
int64_t endFreq = std::stoll(endFreqStr);
// Calculate and update center freq
_span = endFreq - startFreq;
int64_t centerFreq = round(((double)endFreq + (double)startFreq) / 2.0);
if (centerFreq != _centerFreq) {
flog::debug("{}, {}, {}", _span, centerFreq);
_centerFreq = centerFreq;
onCenterFrequencyChanged(centerFreq);
}
// Read (and check for) record separator // Read (and check for) record separator
uint8_t rs; uint8_t rs;
int rslen = sock->recv(&rs, 1, true, 5000); int rslen = sock->recv(&rs, 1, true, 5000);
@ -72,10 +122,11 @@ void SpectranHTTPClient::worker() {
i += read; i += read;
sampLen += read; sampLen += read;
} }
int sampCount = sampLen / 8;
// Swap to stream // Swap to stream
if (streamingEnabled) { if (streamingEnabled) {
if (!stream->swap(sampLen / 8)) { return; } if (!stream->swap(sampCount)) { return; }
} }
// Read trailing CRLF // Read trailing CRLF

View File

@ -4,22 +4,34 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include <utils/proto/http.h> #include <utils/proto/http.h>
#include <utils/new_event.h>
#include <stdint.h>
class SpectranHTTPClient { class SpectranHTTPClient {
public: public:
SpectranHTTPClient(std::string host, int port, dsp::stream<dsp::complex_t>* stream); SpectranHTTPClient(std::string host, int port, dsp::stream<dsp::complex_t>* stream);
void startWorker();
void streaming(bool enabled); void streaming(bool enabled);
bool isOpen(); bool isOpen();
void close(); void close();
void setCenterFrequency(uint64_t freq);
NewEvent<uint64_t> onCenterFrequencyChanged;
private: private:
void worker(); void worker();
std::string host;
int port;
std::shared_ptr<net::Socket> sock; std::shared_ptr<net::Socket> sock;
net::http::Client http; net::http::Client http;
dsp::stream<dsp::complex_t>* stream; dsp::stream<dsp::complex_t>* stream;
std::thread workerThread; std::thread workerThread;
bool streamingEnabled = false; bool streamingEnabled = false;
int64_t _centerFreq = 0;
uint64_t _span = 5000000;
}; };