From f163e926c71d87dc1da246260fc7be4ad86fda42 Mon Sep 17 00:00:00 2001 From: AlexandreRouma Date: Sun, 4 Dec 2022 02:10:34 +0100 Subject: [PATCH] spectran http source + hermes source cleanup --- CMakeLists.txt | 10 + decoder_modules/atv_decoder/CMakeLists.txt | 21 + decoder_modules/atv_decoder/src/main.cpp | 181 ++++++++ readme.md | 38 +- source_modules/hermes_source/src/hermes.cpp | 7 +- source_modules/hermes_source/src/hermes.h | 1 - .../spectran_http_source/CMakeLists.txt | 25 ++ .../spectran_http_source/src/main.cpp | 200 +++++++++ .../spectran_http_source/src/net.cpp | 402 ++++++++++++++++++ source_modules/spectran_http_source/src/net.h | 281 ++++++++++++ .../spectran_http_source/src/proto/http.cpp | 304 +++++++++++++ .../spectran_http_source/src/proto/http.h | 276 ++++++++++++ .../src/spectran_http_client.cpp | 89 ++++ .../src/spectran_http_client.h | 25 ++ 14 files changed, 1837 insertions(+), 23 deletions(-) create mode 100644 decoder_modules/atv_decoder/CMakeLists.txt create mode 100644 decoder_modules/atv_decoder/src/main.cpp create mode 100644 source_modules/spectran_http_source/CMakeLists.txt create mode 100644 source_modules/spectran_http_source/src/main.cpp create mode 100644 source_modules/spectran_http_source/src/net.cpp create mode 100644 source_modules/spectran_http_source/src/net.h create mode 100644 source_modules/spectran_http_source/src/proto/http.cpp create mode 100644 source_modules/spectran_http_source/src/proto/http.h create mode 100644 source_modules/spectran_http_source/src/spectran_http_client.cpp create mode 100644 source_modules/spectran_http_source/src/spectran_http_client.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 38e28fb4..aae70451 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ option(OPT_BUILD_RTL_SDR_SOURCE "Build RTL-SDR Source Module (Dependencies: libr option(OPT_BUILD_RTL_TCP_SOURCE "Build RTL-TCP Source Module (no dependencies required)" ON) option(OPT_BUILD_SDRPLAY_SOURCE "Build SDRplay Source Module (Dependencies: libsdrplay)" OFF) option(OPT_BUILD_SOAPY_SOURCE "Build SoapySDR Source Module (Dependencies: soapysdr)" ON) +option(OPT_BUILD_SPECTRAN_HTTP_SOURCE "Build Spectran HTTP Source Module (no dependencies required)" ON) option(OPT_BUILD_SPYSERVER_SOURCE "Build SpyServer Source Module (no dependencies required)" ON) option(OPT_BUILD_PLUTOSDR_SOURCE "Build PlutoSDR Source Module (Dependencies: libiio, libad9361)" ON) option(OPT_BUILD_USRP_SOURCE "Build USRP Source Module (libuhd)" OFF) @@ -52,6 +53,7 @@ option(OPT_BUILD_NETWORK_SINK "Build Audio Sink Module (no dependencies required option(OPT_BUILD_NEW_PORTAUDIO_SINK "Build the new PortAudio Sink Module (Dependencies: portaudio)" OFF) # Decoders +option(OPT_BUILD_ATV_DECODER "Build ATV decoder (no dependencies required)" OFF) option(OPT_BUILD_FALCON9_DECODER "Build the falcon9 live decoder (Dependencies: ffplay)" OFF) option(OPT_BUILD_KG_SSTV_DECODER "Build the M17 decoder module (no dependencies required)" OFF) option(OPT_BUILD_M17_DECODER "Build the M17 decoder module (Dependencies: codec2)" OFF) @@ -128,6 +130,10 @@ if (OPT_BUILD_SOAPY_SOURCE) add_subdirectory("source_modules/soapy_source") endif (OPT_BUILD_SOAPY_SOURCE) +if (OPT_BUILD_SPECTRAN_HTTP_SOURCE) +add_subdirectory("source_modules/spectran_http_source") +endif (OPT_BUILD_SPECTRAN_HTTP_SOURCE) + if (OPT_BUILD_SPYSERVER_SOURCE) add_subdirectory("source_modules/spyserver_source") endif (OPT_BUILD_SPYSERVER_SOURCE) @@ -164,6 +170,10 @@ endif (OPT_BUILD_NEW_PORTAUDIO_SINK) # Decoders +if (OPT_BUILD_ATV_DECODER) +add_subdirectory("decoder_modules/atv_decoder") +endif (OPT_BUILD_ATV_DECODER) + if (OPT_BUILD_FALCON9_DECODER) add_subdirectory("decoder_modules/falcon9_decoder") endif (OPT_BUILD_FALCON9_DECODER) diff --git a/decoder_modules/atv_decoder/CMakeLists.txt b/decoder_modules/atv_decoder/CMakeLists.txt new file mode 100644 index 00000000..db5e62d7 --- /dev/null +++ b/decoder_modules/atv_decoder/CMakeLists.txt @@ -0,0 +1,21 @@ +cmake_minimum_required(VERSION 3.13) +project(atv_decoder) + +file(GLOB_RECURSE SRC "src/*.cpp" "src/*.c") + +include_directories("src/") + +add_library(atv_decoder SHARED ${SRC}) +target_link_libraries(atv_decoder PRIVATE sdrpp_core) +set_target_properties(atv_decoder PROPERTIES PREFIX "") + +if (MSVC) + target_compile_options(atv_decoder PRIVATE /O2 /Ob2 /std:c++17 /EHsc) +elseif (CMAKE_CXX_COMPILER_ID MATCHES "Clang") + target_compile_options(atv_decoder PRIVATE -O3 -std=c++17 -Wno-unused-command-line-argument -undefined dynamic_lookup) +else () + target_compile_options(atv_decoder PRIVATE -O3 -std=c++17) +endif () + +# Install directives +install(TARGETS atv_decoder DESTINATION lib/sdrpp/plugins) \ No newline at end of file diff --git a/decoder_modules/atv_decoder/src/main.cpp b/decoder_modules/atv_decoder/src/main.cpp new file mode 100644 index 00000000..c5938f34 --- /dev/null +++ b/decoder_modules/atv_decoder/src/main.cpp @@ -0,0 +1,181 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define CONCAT(a, b) ((std::string(a) + b).c_str()) + +SDRPP_MOD_INFO{/* Name: */ "atv_decoder", + /* Description: */ "ATV decoder for SDR++", + /* Author: */ "Ryzerth", + /* Version: */ 0, 1, 0, + /* Max instances */ -1}; + +#define SAMPLE_RATE (625.0f * 720.0f * 25.0f) + +class ATVDecoderModule : public ModuleManager::Instance { + public: + ATVDecoderModule(std::string name) : img(720, 625) { + this->name = name; + + vfo = sigpath::vfoManager.createVFO(name, ImGui::WaterfallVFO::REF_CENTER, 0, 8000000.0f, SAMPLE_RATE, SAMPLE_RATE, SAMPLE_RATE, true); + + demod.init(vfo->output, SAMPLE_RATE, SAMPLE_RATE / 2.0f); + sink.init(&demod.out, handler, this); + + demod.start(); + sink.start(); + + gui::menu.registerEntry(name, menuHandler, this, this); + } + + ~ATVDecoderModule() { + if (vfo) { + sigpath::vfoManager.deleteVFO(vfo); + } + demod.stop(); + gui::menu.removeEntry(name); + } + + void postInit() {} + + void enable() { enabled = true; } + + void disable() { enabled = false; } + + bool isEnabled() { return enabled; } + + private: + static void menuHandler(void *ctx) { + ATVDecoderModule *_this = (ATVDecoderModule *)ctx; + + if (!_this->enabled) { + style::beginDisabled(); + } + + ImGui::FillWidth(); + _this->img.draw(); + + ImGui::LeftLabel("Sync"); + ImGui::FillWidth(); + ImGui::SliderFloat("##syncLvl", &_this->sync_level, -2, 2); + + ImGui::LeftLabel("Min"); + ImGui::FillWidth(); + ImGui::SliderFloat("##minLvl", &_this->minLvl, -1.0, 1.0); + + ImGui::LeftLabel("Span"); + ImGui::FillWidth(); + ImGui::SliderFloat("##spanLvl", &_this->spanLvl, 0, 1.0); + + if (!_this->enabled) { + style::endDisabled(); + } + } + + static void handler(float *data, int count, void *ctx) { + ATVDecoderModule *_this = (ATVDecoderModule *)ctx; + + uint8_t *buf = (uint8_t *)_this->img.buffer; + float val; + float imval; + int pos = 0; + for (int i = 0; i < count; i++) { + val = data[i]; + // Sync + if (val < _this->sync_level) { + _this->sync_count++; + } + else { + if (_this->sync_count >= 300) { + _this->short_sync = 0; + } + else if (_this->sync_count >= 33) { + if (_this->short_sync == 5) { + _this->even_field = false; + _this->ypos = 0; + _this->img.swap(); + buf = (uint8_t *)_this->img.buffer; + } + else if (_this->short_sync == 4) { + _this->even_field = true; + _this->ypos = 0; + } + _this->xpos = 0; + _this->short_sync = 0; + } + else if (_this->sync_count >= 15) { + _this->short_sync++; + } + _this->sync_count = 0; + } + + + // Draw + imval = std::clamp((val - _this->minLvl) * 255.0 / _this->spanLvl, 0, 255); + if (_this->even_field) { + pos = ((720 * _this->ypos * 2) + _this->xpos) * 4; + } + else { + pos = ((720 * (_this->ypos * 2 + 1)) + _this->xpos) * 4; + } + + buf[pos] = imval; + buf[pos + 1] = imval; + buf[pos + 2] = imval; + buf[pos + 3] = imval; + + // Image logic + _this->xpos++; + if (_this->xpos >= 720) { + _this->ypos++; + _this->xpos = 0; + } + if (_this->ypos >= 312) { + _this->ypos = 0; + _this->xpos = 0; + _this->even_field = !_this->even_field; + if (_this->even_field) { + _this->img.swap(); + buf = (uint8_t *)_this->img.buffer; + } + } + } + } + + std::string name; + bool enabled = true; + + VFOManager::VFO *vfo = NULL; + dsp::demod::Quadrature demod; + dsp::sink::Handler sink; + + int xpos = 0; + int ypos = 0; + bool even_field = false; + + float sync_level = -0.3f; + int sync_count = 0; + int short_sync = 0; + + float minLvl = 0.0f; + float spanLvl = 1.0f; + + ImGui::ImageDisplay img; +}; + +MOD_EXPORT void _INIT_() {} + +MOD_EXPORT ModuleManager::Instance *_CREATE_INSTANCE_(std::string name) { return new ATVDecoderModule(name); } + +MOD_EXPORT void _DELETE_INSTANCE_(void *instance) { delete (ATVDecoderModule *)instance; } + +MOD_EXPORT void _END_() {} \ No newline at end of file diff --git a/readme.md b/readme.md index 55a5e00a..58ec42c4 100644 --- a/readme.md +++ b/readme.md @@ -323,24 +323,25 @@ Modules in beta are still included in releases for the most part but not enabled ## Sources -| Name | Stage | Dependencies | Option | Built by default| Built in Release | Enabled in SDR++ by default | -|---------------------|------------|-------------------|-------------------------------|:---------------:|:-----------------------:|:---------------------------:| -| airspy_source | Working | libairspy | OPT_BUILD_AIRSPY_SOURCE | ✅ | ✅ | ✅ | -| airspyhf_source | Working | libairspyhf | OPT_BUILD_AIRSPYHF_SOURCE | ✅ | ✅ | ✅ | -| bladerf_source | Working | libbladeRF | OPT_BUILD_BLADERF_SOURCE | ⛔ | ⚠️ (not Debian Buster) | ✅ | -| file_source | Working | - | OPT_BUILD_FILE_SOURCE | ✅ | ✅ | ✅ | -| hackrf_source | Working | libhackrf | OPT_BUILD_HACKRF_SOURCE | ✅ | ✅ | ✅ | -| hermes_source | Unfinished | - | OPT_BUILD_HERMES_SOURCE | ✅ | ✅ | ⛔ | -| limesdr_source | Working | liblimesuite | OPT_BUILD_LIMESDR_SOURCE | ⛔ | ✅ | ✅ | -| plutosdr_source | Working | libiio, libad9361 | OPT_BUILD_PLUTOSDR_SOURCE | ✅ | ✅ | ✅ | -| rfspace_source | Working | - | OPT_BUILD_RFSPACE_SOURCE | ✅ | ✅ | ✅ | -| rtl_sdr_source | Working | librtlsdr | OPT_BUILD_RTL_SDR_SOURCE | ✅ | ✅ | ✅ | -| rtl_tcp_source | Working | - | OPT_BUILD_RTL_TCP_SOURCE | ✅ | ✅ | ✅ | -| sdrplay_source | Working | SDRplay API | OPT_BUILD_SDRPLAY_SOURCE | ⛔ | ✅ | ✅ | -| sdrpp_server_source | Working | - | OPT_BUILD_SDRPP_SERVER_SOURCE | ✅ | ✅ | ✅ | -| soapy_source | Working | soapysdr | OPT_BUILD_SOAPY_SOURCE | ✅ | ✅ | ✅ | -| spyserver_source | Working | - | OPT_BUILD_SPYSERVER_SOURCE | ✅ | ✅ | ✅ | -| usrp_source | Beta | libuhd | OPT_BUILD_USRP_SOURCE | ⛔ | ⛔ | ⛔ | +| Name | Stage | Dependencies | Option | Built by default| Built in Release | Enabled in SDR++ by default | +|----------------------|------------|-------------------|--------------------------------|:---------------:|:-----------------------:|:---------------------------:| +| airspy_source | Working | libairspy | OPT_BUILD_AIRSPY_SOURCE | ✅ | ✅ | ✅ | +| airspyhf_source | Working | libairspyhf | OPT_BUILD_AIRSPYHF_SOURCE | ✅ | ✅ | ✅ | +| bladerf_source | Working | libbladeRF | OPT_BUILD_BLADERF_SOURCE | ⛔ | ⚠️ (not Debian Buster) | ✅ | +| file_source | Working | - | OPT_BUILD_FILE_SOURCE | ✅ | ✅ | ✅ | +| hackrf_source | Working | libhackrf | OPT_BUILD_HACKRF_SOURCE | ✅ | ✅ | ✅ | +| hermes_source | Beta | - | OPT_BUILD_HERMES_SOURCE | ✅ | ✅ | ⛔ | +| limesdr_source | Working | liblimesuite | OPT_BUILD_LIMESDR_SOURCE | ⛔ | ✅ | ✅ | +| plutosdr_source | Working | libiio, libad9361 | OPT_BUILD_PLUTOSDR_SOURCE | ✅ | ✅ | ✅ | +| rfspace_source | Working | - | OPT_BUILD_RFSPACE_SOURCE | ✅ | ✅ | ✅ | +| rtl_sdr_source | Working | librtlsdr | OPT_BUILD_RTL_SDR_SOURCE | ✅ | ✅ | ✅ | +| rtl_tcp_source | Working | - | OPT_BUILD_RTL_TCP_SOURCE | ✅ | ✅ | ✅ | +| sdrplay_source | Working | SDRplay API | OPT_BUILD_SDRPLAY_SOURCE | ⛔ | ✅ | ✅ | +| sdrpp_server_source | Working | - | OPT_BUILD_SDRPP_SERVER_SOURCE | ✅ | ✅ | ✅ | +| soapy_source | Working | soapysdr | OPT_BUILD_SOAPY_SOURCE | ✅ | ✅ | ✅ | +| spectran_http_source | Unfinished | - | OPT_BUILD_SPECTRAN_HTTP_SOURCE | ✅ | ✅ | ⛔ | +| spyserver_source | Working | - | OPT_BUILD_SPYSERVER_SOURCE | ✅ | ✅ | ✅ | +| usrp_source | Beta | libuhd | OPT_BUILD_USRP_SOURCE | ⛔ | ⛔ | ⛔ | ## Sinks @@ -356,6 +357,7 @@ Modules in beta are still included in releases for the most part but not enabled | Name | Stage | Dependencies | Option | Built by default| Built in Release | Enabled in SDR++ by default | |---------------------|------------|--------------|-------------------------------|:---------------:|:----------------:|:---------------------------:| +| atv_decoder | Unfinished | - | OPT_BUILD_ATV_DECODER | ⛔ | ⛔ | ⛔ | | dmr_decoder | Unfinished | - | OPT_BUILD_DMR_DECODER | ⛔ | ⛔ | ⛔ | | falcon9_decoder | Unfinished | ffplay | OPT_BUILD_FALCON9_DECODER | ⛔ | ⛔ | ⛔ | | kgsstv_decoder | Unfinished | - | OPT_BUILD_KGSSTV_DECODER | ⛔ | ⛔ | ⛔ | diff --git a/source_modules/hermes_source/src/hermes.cpp b/source_modules/hermes_source/src/hermes.cpp index 71baf5b1..2c1b23e9 100644 --- a/source_modules/hermes_source/src/hermes.cpp +++ b/source_modules/hermes_source/src/hermes.cpp @@ -10,13 +10,12 @@ namespace hermes { } void Client::close() { - if (!open) { return; } sock->close(); // Wait for worker to exit + out.stopWriter(); if (workerThread.joinable()) { workerThread.join(); } - - open = false; + out.clearWriteStop(); } void Client::start() { @@ -195,7 +194,7 @@ namespace hermes { si = (si << 8) >> 8; sq = (sq << 8) >> 8; - // Convert to float (IQ swapper for some reason... 'I' means in-phase... :facepalm:) + // Convert to float (IQ swapped for some reason) out.writeBuf[i].im = (float)si / (float)0x1000000; out.writeBuf[i].re = (float)sq / (float)0x1000000; } diff --git a/source_modules/hermes_source/src/hermes.h b/source_modules/hermes_source/src/hermes.h index 4d694e34..5b48bb02 100644 --- a/source_modules/hermes_source/src/hermes.h +++ b/source_modules/hermes_source/src/hermes.h @@ -153,7 +153,6 @@ namespace hermes { void worker(); - bool open = true; double freq = 0; std::thread workerThread; diff --git a/source_modules/spectran_http_source/CMakeLists.txt b/source_modules/spectran_http_source/CMakeLists.txt new file mode 100644 index 00000000..535878d8 --- /dev/null +++ b/source_modules/spectran_http_source/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.13) +project(spectran_http_source) + +file(GLOB_RECURSE SRC "src/*.cpp") + +add_library(spectran_http_source SHARED ${SRC}) +target_link_libraries(spectran_http_source PRIVATE sdrpp_core) +set_target_properties(spectran_http_source PROPERTIES PREFIX "") + +target_include_directories(spectran_http_source PRIVATE "src/") + +if (MSVC) + target_compile_options(spectran_http_source PRIVATE /O2 /Ob2 /std:c++17 /EHsc) +elseif (CMAKE_CXX_COMPILER_ID MATCHES "Clang") + target_compile_options(spectran_http_source PRIVATE -O3 -std=c++17 -Wno-unused-command-line-argument -undefined dynamic_lookup) +else () + target_compile_options(spectran_http_source PRIVATE -O3 -std=c++17) +endif () + +if(WIN32) + target_link_libraries(spectran_http_source PRIVATE wsock32 ws2_32 iphlpapi) +endif() + +# Install directives +install(TARGETS spectran_http_source DESTINATION lib/sdrpp/plugins) \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/main.cpp b/source_modules/spectran_http_source/src/main.cpp new file mode 100644 index 00000000..264b1b5d --- /dev/null +++ b/source_modules/spectran_http_source/src/main.cpp @@ -0,0 +1,200 @@ +#include "spectran_http_client.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CONCAT(a, b) ((std::string(a) + b).c_str()) + +SDRPP_MOD_INFO{ + /* Name: */ "spectran_http_source", + /* Description: */ "Spectran V6 HTTP source module for SDR++", + /* Author: */ "Ryzerth", + /* Version: */ 0, 1, 0, + /* Max instances */ 1 +}; + +ConfigManager config; + +class SpectranHTTPSourceModule : public ModuleManager::Instance { +public: + SpectranHTTPSourceModule(std::string name) { + this->name = name; + + strcpy(hostname, "localhost"); + sampleRate = 5000000.0; + + handler.ctx = this; + handler.selectHandler = menuSelected; + handler.deselectHandler = menuDeselected; + handler.menuHandler = menuHandler; + handler.startHandler = start; + handler.stopHandler = stop; + handler.tuneHandler = tune; + handler.stream = &stream; + + sigpath::sourceManager.registerSource("Spectran HTTP", &handler); + } + + ~SpectranHTTPSourceModule() { + stop(this); + sigpath::sourceManager.unregisterSource("Spectran HTTP"); + } + + void postInit() {} + + void enable() { + enabled = true; + } + + void disable() { + enabled = false; + } + + bool isEnabled() { + return enabled; + } + + // TODO: Implement select functions + +private: + static void menuSelected(void* ctx) { + SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; + core::setInputSampleRate(_this->sampleRate); + spdlog::info("SpectranHTTPSourceModule '{0}': Menu Select!", _this->name); + } + + static void menuDeselected(void* ctx) { + SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; + gui::mainWindow.playButtonLocked = false; + spdlog::info("SpectranHTTPSourceModule '{0}': Menu Deselect!", _this->name); + } + + static void start(void* ctx) { + SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; + bool connected = (_this->client && _this->client->isOpen()); + if (_this->running && connected) { return; } + + // TODO: Start + _this->client->streaming(true); + + // TODO: Set options + + _this->running = true; + spdlog::info("SpectranHTTPSourceModule '{0}': Start!", _this->name); + } + + static void stop(void* ctx) { + SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; + if (!_this->running) { return; } + _this->running = false; + + // TODO: Implement stop + _this->client->streaming(false); + + spdlog::info("SpectranHTTPSourceModule '{0}': Stop!", _this->name); + } + + static void tune(double freq, void* ctx) { + SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; + if (_this->running) { + // TODO + } + _this->freq = freq; + spdlog::info("SpectranHTTPSourceModule '{0}': Tune: {1}!", _this->name, freq); + } + + static void menuHandler(void* ctx) { + SpectranHTTPSourceModule* _this = (SpectranHTTPSourceModule*)ctx; + float menuWidth = ImGui::GetContentRegionAvail().x; + bool connected = (_this->client && _this->client->isOpen()); + gui::mainWindow.playButtonLocked = !connected; + + if (connected) { SmGui::BeginDisabled(); } + + if (ImGui::InputText(CONCAT("##spectran_http_host_", _this->name), _this->hostname, 1023)) { + config.acquire(); + config.conf["hostname"] = _this->hostname; + config.release(true); + } + ImGui::SameLine(); + ImGui::SetNextItemWidth(menuWidth - ImGui::GetCursorPosX()); + if (ImGui::InputInt(CONCAT("##spectran_http_port_", _this->name), &_this->port, 0, 0)) { + config.acquire(); + config.conf["port"] = _this->port; + config.release(true); + } + + if (connected) { SmGui::EndDisabled(); } + + if (_this->running) { style::beginDisabled(); } + if (!connected && ImGui::Button("Connect##spectran_http_source", ImVec2(menuWidth, 0))) { + _this->tryConnect(); + } + else if (connected && ImGui::Button("Disconnect##spectran_http_source", ImVec2(menuWidth, 0))) { + _this->client->close(); + } + if (_this->running) { style::endDisabled(); } + + ImGui::TextUnformatted("Status:"); + ImGui::SameLine(); + if (connected) { + ImGui::TextColored(ImVec4(0.0f, 1.0f, 0.0f, 1.0f), "Connected"); + } + else { + ImGui::TextUnformatted("Not connected"); + } + } + + void tryConnect() { + try { + client = std::make_shared(hostname, port, &stream); + } + catch (std::runtime_error e) { + spdlog::error("Could not connect: {0}", e.what()); + } + } + + std::string name; + bool enabled = true; + double sampleRate; + SourceManager::SourceHandler handler; + bool running = false; + + std::shared_ptr client; + + double freq; + + char hostname[1024]; + int port = 80; + dsp::stream stream; + +}; + +MOD_EXPORT void _INIT_() { + json def = json({}); + def["devices"] = json({}); + def["device"] = ""; + config.setPath(core::args["root"].s() + "/spectran_http_config.json"); + config.load(def); + config.enableAutoSave(); +} + +MOD_EXPORT ModuleManager::Instance* _CREATE_INSTANCE_(std::string name) { + return new SpectranHTTPSourceModule(name); +} + +MOD_EXPORT void _DELETE_INSTANCE_(ModuleManager::Instance* instance) { + delete (SpectranHTTPSourceModule*)instance; +} + +MOD_EXPORT void _END_() { + config.disableAutoSave(); + config.save(); +} \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/net.cpp b/source_modules/spectran_http_source/src/net.cpp new file mode 100644 index 00000000..13f50c6e --- /dev/null +++ b/source_modules/spectran_http_source/src/net.cpp @@ -0,0 +1,402 @@ +#include "net.h" +#include +#include + +#ifdef _WIN32 +#define WOULD_BLOCK (WSAGetLastError() == WSAEWOULDBLOCK) +#else +#define WOULD_BLOCK (errno == EWOULDBLOCK) +#endif + +namespace net { + bool _init = false; + + // === Private functions === + + void init() { + if (_init) { return; } +#ifdef _WIN32 + // Initialize WinSock2 + WSADATA wsa; + if (WSAStartup(MAKEWORD(2, 2), &wsa)) { + throw std::runtime_error("Could not initialize WinSock2"); + return; + } +#else + // Disable SIGPIPE to avoid closing when the remote host disconnects + signal(SIGPIPE, SIG_IGN); +#endif + _init = true; + } + + bool queryHost(uint32_t* addr, std::string host) { + hostent* ent = gethostbyname(host.c_str()); + if (!ent || !ent->h_addr_list[0]) { return false; } + *addr = *(uint32_t*)ent->h_addr_list[0]; + return true; + } + + void closeSocket(SockHandle_t sock) { +#ifdef _WIN32 + shutdown(sock, SD_BOTH); + closesocket(sock); +#else + shutdown(sock, SHUT_RDWR); + close(sock); +#endif + } + + void setNonblocking(SockHandle_t sock) { +#ifdef _WIN32 + u_long enabled = 1; + ioctlsocket(sock, FIONBIO, &enabled); +#else + fcntl(sock, F_SETFL, O_NONBLOCK); +#endif + } + + // === Address functions === + + Address::Address() { + memset(&addr, 0, sizeof(addr)); + } + + Address::Address(const std::string& host, int port) { + // Initialize WSA if needed + init(); + + // Lookup host + hostent* ent = gethostbyname(host.c_str()); + if (!ent || !ent->h_addr_list[0]) { + throw std::runtime_error("Unknown host"); + } + + // Build address + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = *(uint32_t*)ent->h_addr_list[0]; + addr.sin_port = htons(port); + } + + Address::Address(IP_t ip, int port) { + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(ip); + addr.sin_port = htons(port); + } + + std::string Address::getIPStr() { + char buf[128]; + IP_t ip = getIP(); + sprintf(buf, "%d.%d.%d.%d", (ip >> 24) & 0xFF, (ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF); + return buf; + } + + IP_t Address::getIP() { + return htonl(addr.sin_addr.s_addr); + } + + void Address::setIP(IP_t ip) { + addr.sin_addr.s_addr = htonl(ip); + } + + int Address::getPort() { + return htons(addr.sin_port); + } + + void Address::setPort(int port) { + addr.sin_port = htons(port); + } + + // === Socket functions === + + Socket::Socket(SockHandle_t sock, const Address* raddr) { + this->sock = sock; + if (raddr) { + this->raddr = new Address(*raddr); + } + } + + Socket::~Socket() { + close(); + if (raddr) { delete raddr; } + } + + void Socket::close() { + if (!open) { return; } + open = false; + closeSocket(sock); + } + + bool Socket::isOpen() { + return open; + } + + SocketType Socket::type() { + return raddr ? SOCKET_TYPE_UDP : SOCKET_TYPE_TCP; + } + + int Socket::send(const uint8_t* data, size_t len, const Address* dest) { + return sendto(sock, (const char*)data, len, 0, (sockaddr*)(dest ? &dest->addr : (raddr ? &raddr->addr : NULL)), sizeof(sockaddr_in)); + } + + int Socket::sendstr(const std::string& str, const Address* dest) { + return send((const uint8_t*)str.c_str(), str.length(), dest); + } + + int Socket::recv(uint8_t* data, size_t maxLen, bool forceLen, int timeout, Address* dest) { + // Create FD set + fd_set set; + FD_ZERO(&set); + FD_SET(sock, &set); + + // Define timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = timeout * 1000; + + int read = 0; + bool blocking = (timeout != NONBLOCKING); + do { + // Wait for data or error if + if (blocking) { + int err = select(sock+1, &set, NULL, &set, (timeout > 0) ? &tv : NULL); + if (err <= 0) { return err; } + } + + // Receive + int addrLen = sizeof(sockaddr_in); + int err = ::recvfrom(sock, (char*)&data[read], maxLen - read, 0,(sockaddr*)(dest ? &dest->addr : NULL), (socklen_t*)(dest ? &addrLen : NULL)); + if (err <= 0 && !WOULD_BLOCK) { + close(); + return err; + } + read += err; + } + while (blocking && forceLen && read < maxLen); + return read; + } + + int Socket::recvline(std::string& str, int maxLen, int timeout, Address* dest) { + // Disallow nonblocking mode + if (!timeout) { return -1; } + + str.clear(); + int read = 0; + while (!maxLen || read < maxLen) { + char c; + int err = recv((uint8_t*)&c, 1, false, timeout, dest); + if (err <= 0) { return err; } + read++; + if (c == '\n') { break; } + str += c; + } + return read; + } + + // === Listener functions === + + Listener::Listener(SockHandle_t sock) { + this->sock = sock; + } + + Listener::~Listener() { + stop(); + } + + void Listener::stop() { + closeSocket(sock); + open = false; + } + + bool Listener::listening() { + return open; + } + + std::shared_ptr Listener::accept(Address* dest, int timeout) { + // Create FD set + fd_set set; + FD_ZERO(&set); + FD_SET(sock, &set); + + // Define timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = timeout * 1000; + + // Wait for data or error + if (timeout != NONBLOCKING) { + int err = select(sock+1, &set, NULL, &set, (timeout > 0) ? &tv : NULL); + if (err <= 0) { return NULL; } + } + + // Accept + int addrLen = sizeof(sockaddr_in); + SockHandle_t s = ::accept(sock, (sockaddr*)(dest ? &dest->addr : NULL), (socklen_t*)(dest ? &addrLen : NULL)); + if ((int)s < 0) { + if (!WOULD_BLOCK) { stop(); } + return NULL; + } + + // Enable nonblocking mode + setNonblocking(s); + + return std::make_shared(s); + } + + // === Creation functions === + + std::map listInterfaces() { + // Init library if needed + init(); + + std::map ifaces; +#ifdef _WIN32 + // Pre-allocate buffer + ULONG size = sizeof(IP_ADAPTER_ADDRESSES); + PIP_ADAPTER_ADDRESSES addresses = (PIP_ADAPTER_ADDRESSES)malloc(size); + + // Reallocate to real size + if (GetAdaptersAddresses(AF_INET, 0, NULL, addresses, &size) == ERROR_BUFFER_OVERFLOW) { + addresses = (PIP_ADAPTER_ADDRESSES)realloc(addresses, size); + if (GetAdaptersAddresses(AF_INET, 0, NULL, addresses, &size)) { + throw std::exception("Could not list network interfaces"); + } + } + + // Save data + std::wstring_convert> utfConv; + for (auto iface = addresses; iface; iface = iface->Next) { + InterfaceInfo info; + auto ip = iface->FirstUnicastAddress; + if (!ip || ip->Address.lpSockaddr->sa_family != AF_INET) { continue; } + info.address = ntohl(*(uint32_t*)&ip->Address.lpSockaddr->sa_data[2]); + info.netmask = ~((1 << (32 - ip->OnLinkPrefixLength)) - 1); + info.broadcast = info.address | (~info.netmask); + ifaces[utfConv.to_bytes(iface->FriendlyName)] = info; + } + + // Free tables + free(addresses); +#else + // Get iface list + struct ifaddrs* addresses = NULL; + getifaddrs(&addresses); + + // Save data + for (auto iface = addresses; iface; iface = iface->ifa_next) { + if (iface->ifa_addr->sa_family != AF_INET) { continue; } + InterfaceInfo info; + info.address = ntohl(*(uint32_t*)&iface->ifa_addr->sa_data[2]); + info.netmask = ntohl(*(uint32_t*)&iface->ifa_netmask->sa_data[2]); + info.broadcast = info.address | (~info.netmask); + ifaces[iface->ifa_name] = info; + } + + // Free iface list + freeifaddrs(addresses); +#endif + + return ifaces; + } + + std::shared_ptr listen(const Address& addr) { + // Init library if needed + init(); + + // Create socket + SockHandle_t s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + // TODO: Support non-blockign mode + +#ifndef _WIN32 + // Allow port reusing if the app was killed or crashed + // and the socket is stuck in TIME_WAIT state. + // This option has a different meaning on Windows, + // so we use it only for non-Windows systems + int enable = 1; + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) { + closeSocket(s); + throw std::runtime_error("Could not configure socket"); + return NULL; + } +#endif + + // Bind socket to the port + if (bind(s, (sockaddr*)&addr.addr, sizeof(sockaddr_in))) { + closeSocket(s); + throw std::runtime_error("Could not bind socket"); + return NULL; + } + + // Enable listening + if (::listen(s, SOMAXCONN) != 0) { + throw std::runtime_error("Could start listening for connections"); + return NULL; + } + + // Enable nonblocking mode + setNonblocking(s); + + // Return listener class + return std::make_shared(s); + } + + std::shared_ptr listen(std::string host, int port) { + return listen(Address(host, port)); + } + + std::shared_ptr connect(const Address& addr) { + // Init library if needed + init(); + + // Create socket + SockHandle_t s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + + // Connect to server + if (::connect(s, (sockaddr*)&addr.addr, sizeof(sockaddr_in))) { + closeSocket(s); + throw std::runtime_error("Could not connect"); + return NULL; + } + + // Enable nonblocking mode + setNonblocking(s); + + // Return socket class + return std::make_shared(s); + } + + std::shared_ptr connect(std::string host, int port) { + return connect(Address(host, port)); + } + + std::shared_ptr openudp(const Address& raddr, const Address& laddr) { + // Init library if needed + init(); + + // Create socket + SockHandle_t s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + // Bind socket to local port + if (bind(s, (sockaddr*)&laddr.addr, sizeof(sockaddr_in))) { + closeSocket(s); + throw std::runtime_error("Could not bind socket"); + return NULL; + } + + // Return socket class + return std::make_shared(s, &raddr); + } + + std::shared_ptr openudp(std::string rhost, int rport, const Address& laddr) { + return openudp(Address(rhost, rport), laddr); + } + + std::shared_ptr openudp(const Address& raddr, std::string lhost, int lport) { + return openudp(raddr, Address(lhost, lport)); + } + + std::shared_ptr openudp(std::string rhost, int rport, std::string lhost, int lport) { + return openudp(Address(rhost, rport), Address(lhost, lport)); + } +} \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/net.h b/source_modules/spectran_http_source/src/net.h new file mode 100644 index 00000000..a35a6fbb --- /dev/null +++ b/source_modules/spectran_http_source/src/net.h @@ -0,0 +1,281 @@ +#pragma once +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +namespace net { +#ifdef _WIN32 + typedef SOCKET SockHandle_t; + typedef int socklen_t; +#else + typedef int SockHandle_t; +#endif + typedef uint32_t IP_t; + + class Socket; + class Listener; + + struct InterfaceInfo { + IP_t address; + IP_t netmask; + IP_t broadcast; + }; + + class Address { + friend Socket; + friend Listener; + public: + /** + * Default constructor. Corresponds to 0.0.0.0:0. + */ + Address(); + + /** + * Do not instantiate this class manually. Use the provided functions. + * @param host Hostname or IP. + * @param port TCP/UDP port. + */ + Address(const std::string& host, int port); + + /** + * Do not instantiate this class manually. Use the provided functions. + * @param ip IP in host byte order. + * @param port TCP/UDP port. + */ + Address(IP_t ip, int port); + + /** + * Get the IP address. + * @return IP address in standard string format. + */ + std::string getIPStr(); + + /** + * Get the IP address. + * @return IP address in host byte order. + */ + IP_t getIP(); + + /** + * Set the IP address. + * @param ip IP address in host byte order. + */ + void setIP(IP_t ip); + + /** + * Get the TCP/UDP port. + * @return TCP/UDP port number. + */ + int getPort(); + + /** + * Set the TCP/UDP port. + * @param port TCP/UDP port number. + */ + void setPort(int port); + + struct sockaddr_in addr; + }; + + enum { + NO_TIMEOUT = -1, + NONBLOCKING = 0 + }; + + enum SocketType { + SOCKET_TYPE_TCP, + SOCKET_TYPE_UDP + }; + + class Socket { + public: + /** + * Do not instantiate this class manually. Use the provided functions. + */ + Socket(SockHandle_t sock, const Address* raddr = NULL); + ~Socket(); + + /** + * Close socket. The socket can no longer be used after this. + */ + void close(); + + /** + * Check if the socket is open. + * @return True if open, false if closed. + */ + bool isOpen(); + + /** + * Get socket type. Either TCP or UDP. + * @return Socket type. + */ + SocketType type(); + + /** + * Send data on socket. + * @param data Data to be sent. + * @param len Number of bytes to be sent. + * @param dest Destination address. NULL to use the default remote address. + * @return Number of bytes sent. + */ + int send(const uint8_t* data, size_t len, const Address* dest = NULL); + + /** + * Send string on socket. Terminating NULL byte is not sent, include one in the string if you need it. + * @param str String to be sent. + * @param dest Destination address. NULL to use the default remote address. + * @return Number of bytes sent. + */ + int sendstr(const std::string& str, const Address* dest = NULL); + + /** + * Receive data from socket. + * @param data Buffer to read the data into. + * @param maxLen Maximum number of bytes to read. + * @param forceLen Read the maximum number of bytes even if it requires multiple receive operations. + * @param timeout Timeout in milliseconds. Use NO_TIMEOUT or NONBLOCKING here if needed. + * @param dest Destination address. If multiple packets, this will contain the address of the last one. NULL if not used. + * @return Number of bytes read. 0 means timed out or closed. -1 means would block or error. + */ + int recv(uint8_t* data, size_t maxLen, bool forceLen = false, int timeout = NO_TIMEOUT, Address* dest = NULL); + + /** + * Receive line from socket. + * @param str String to read the data into. + * @param maxLen Maximum line length allowed, 0 for no limit. + * @param timeout Timeout in milliseconds. Use NO_TIMEOUT or NONBLOCKING here if needed. + * @param dest Destination address. If multiple packets, this will contain the address of the last one. NULL if not used. + * @return Length of the returned string. 0 means timed out or closed. -1 means would block or error. + */ + int recvline(std::string& str, int maxLen = 0, int timeout = NO_TIMEOUT, Address* dest = NULL); + + private: + Address* raddr = NULL; + SockHandle_t sock; + bool open = true; + + }; + + class Listener { + public: + /** + * Do not instantiate this class manually. Use the provided functions. + */ + Listener(SockHandle_t sock); + ~Listener(); + + /** + * Stop listening. The listener can no longer be used after this. + */ + void stop(); + + /** + * CHeck if the listener is still listening. + * @return True if listening, false if not. + */ + bool listening(); + + /** + * Accept connection. + * @param timeout Timeout in milliseconds. Use NO_TIMEOUT or NONBLOCKING here if needed. + * @return Socket of the connection. NULL means timed out, would block or closed. + */ + std::shared_ptr accept(Address* dest = NULL, int timeout = NO_TIMEOUT); + + private: + SockHandle_t sock; + bool open = true; + + }; + + /** + * Get a list of the network interface. + * @return List of network interfaces and their addresses. + */ + std::map listInterfaces(); + + /** + * Create TCP listener. + * @param addr Address to listen on. + * @return Listener instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr listen(const Address& addr); + + /** + * Create TCP listener. + * @param host Hostname or IP to listen on ("0.0.0.0" for Any). + * @param port Port to listen on. + * @return Listener instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr listen(std::string host, int port); + + /** + * Create TCP connection. + * @param addr Remote address. + * @return Socket instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr connect(const Address& addr); + + /** + * Create TCP connection. + * @param host Remote hostname or IP address. + * @param port Remote port. + * @return Socket instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr connect(std::string host, int port); + + /** + * Create UDP socket. + * @param raddr Remote address. + * @param laddr Local address to bind the socket to. + * @return Socket instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr openudp(const Address& raddr, const Address& laddr); + + /** + * Create UDP socket. + * @param rhost Remote hostname or IP address. + * @param rport Remote port. + * @param laddr Local address to bind the socket to. + * @return Socket instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr openudp(std::string rhost, int rport, const Address& laddr); + + /** + * Create UDP socket. + * @param raddr Remote address. + * @param lhost Local hostname or IP used to bind the socket (optional, "0.0.0.0" for Any). + * @param lpost Local port used to bind the socket to (optional, 0 to allocate automatically). + * @return Socket instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr openudp(const Address& raddr, std::string lhost = "0.0.0.0", int lport = 0); + + /** + * Create UDP socket. + * @param rhost Remote hostname or IP address. + * @param rport Remote port. + * @param lhost Local hostname or IP used to bind the socket (optional, "0.0.0.0" for Any). + * @param lpost Local port used to bind the socket to (optional, 0 to allocate automatically). + * @return Socket instance on success, Throws runtime_error otherwise. + */ + std::shared_ptr openudp(std::string rhost, int rport, std::string lhost = "0.0.0.0", int lport = 0); +} \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/proto/http.cpp b/source_modules/spectran_http_source/src/proto/http.cpp new file mode 100644 index 00000000..8821f976 --- /dev/null +++ b/source_modules/spectran_http_source/src/proto/http.cpp @@ -0,0 +1,304 @@ +#include "http.h" +#include + +namespace net::http { + std::string MessageHeader::serialize() { + std::string data; + + // Add start line + data += serializeStartLine() + "\r\n"; + + // Add fields + for (const auto& [key, value] : fields) { + data += key + ": " + value + "\r\n"; + } + + // Add terminator + data += "\r\n"; + + return data; + } + + void MessageHeader::deserialize(const std::string& data) { + // Clear existing fields + fields.clear(); + + // Parse first line + std::string line; + int offset = readLine(data, line); + deserializeStartLine(line); + + // Parse fields + while (offset < data.size()) { + // Read line + offset = readLine(data, line, offset); + + // If empty line, the header is done + if (line.empty()) { break; } + + // Read until first ':' for the key + int klen = 0; + for (; klen < line.size(); klen++) { + if (line[klen] == ':') { break; } + } + + // Find offset of value + int voff = klen + 1; + for (; voff < line.size(); voff++) { + if (line[voff] != ' ' && line[voff] != '\t') { break; } + } + + // Save field + fields[line.substr(0, klen)] = line.substr(voff); + } + } + + std::map& MessageHeader::getFields() { + return fields; + } + + bool MessageHeader::hasField(const std::string& name) { + return fields.find(name) != fields.end(); + } + + std::string MessageHeader::getField(const std::string name) { + // TODO: Check if exists + return fields[name]; + } + + void MessageHeader::setField(const std::string& name, const std::string& value) { + fields[name] = value; + } + + void MessageHeader::clearField(const std::string& name) { + // TODO: Check if exists (but maybe no error?) + fields.erase(name); + } + + int MessageHeader::readLine(const std::string& str, std::string& line, int start) { + // Get line length + int len = 0; + bool cr = false; + for (int i = start; i < str.size(); i++) { + if (str[i] == '\n') { + if (len && str[i-1] == '\r') { cr = true; } + break; + } + len++; + } + + // Copy line + line = str.substr(start, len - (cr ? 1:0)); + return start + len + 1; + } + + RequestHeader::RequestHeader(Method method, std::string uri, std::string host) { + this->method = method; + this->uri = uri; + setField("Host", host); + } + + RequestHeader::RequestHeader(const std::string& data) { + deserialize(data); + } + + Method RequestHeader::getMethod() { + return method; + } + + void RequestHeader::setMethod(Method method) { + this->method = method; + } + + std::string RequestHeader::getURI() { + return uri; + } + + void RequestHeader::setURI(const std::string& uri) { + this->uri = uri; + } + + void RequestHeader::deserializeStartLine(const std::string& data) { + // TODO + } + + std::string RequestHeader::serializeStartLine() { + // TODO: Allow to specify version + return MethodStrings[method] + " " + uri + " HTTP/1.1"; + } + + ResponseHeader::ResponseHeader(StatusCode statusCode) { + this->statusCode = statusCode; + if (StatusCodeStrings.find(statusCode) != StatusCodeStrings.end()) { + this->statusString = StatusCodeStrings[statusCode]; + } + else { + this->statusString = "UNKNOWN"; + } + } + + ResponseHeader::ResponseHeader(StatusCode statusCode, const std::string& statusString) { + this->statusCode = statusCode; + this->statusString = statusString; + } + + ResponseHeader::ResponseHeader(const std::string& data) { + deserialize(data); + } + + StatusCode ResponseHeader::getStatusCode() { + return statusCode; + } + + void ResponseHeader::setStatusCode(StatusCode statusCode) { + this->statusCode = statusCode; + } + + std::string ResponseHeader::getStatusString() { + return statusString; + } + + void ResponseHeader::setStatusString(const std::string& statusString) { + this->statusString = statusString; + } + + void ResponseHeader::deserializeStartLine(const std::string& data) { + // Parse version + int offset = 0; + for (; offset < data.size(); offset++) { + if (data[offset] == ' ') { break; } + } + // TODO: Error if null length + // TODO: Parse version + + // Skip spaces + for (; offset < data.size(); offset++) { + if (data[offset] != ' ' && data[offset] != '\t') { break; } + } + + // Parse status code + int codeOffset = offset; + for (; offset < data.size(); offset++) { + if (data[offset] == ' ') { break; } + } + // TODO: Error if null length + statusCode = (StatusCode)std::stoi(data.substr(codeOffset, codeOffset - offset)); + + // Skip spaces + for (; offset < data.size(); offset++) { + if (data[offset] != ' ' && data[offset] != '\t') { break; } + } + + // Parse status string + int stringOffset = offset; + for (; offset < data.size(); offset++) { + if (data[offset] == ' ') { break; } + } + // TODO: Error if null length (maybe?) + statusString = data.substr(stringOffset, stringOffset - offset); + } + + std::string ResponseHeader::serializeStartLine() { + char buf[1024]; + sprintf(buf, "%d %s", (int)statusCode, statusString.c_str()); + return buf; + } + + ChunkHeader::ChunkHeader(size_t length) { + this->length = length; + } + + ChunkHeader::ChunkHeader(const std::string& data) { + deserialize(data); + } + + std::string ChunkHeader::serialize() { + char buf[64]; + sprintf(buf, "%" PRIX64 "\r\n", length); + return buf; + } + + void ChunkHeader::deserialize(const std::string& data) { + // Parse length + int offset = 0; + for (; offset < data.size(); offset++) { + if (data[offset] == ' ') { break; } + } + // TODO: Error if null length + length = (StatusCode)std::stoull(data.substr(0, offset), nullptr, 16); + + // TODO: Parse rest + } + + size_t ChunkHeader::getLength() { + return length; + } + + void ChunkHeader::setLength(size_t length) { + this->length = length; + } + + Client::Client(std::shared_ptr sock) { + this->sock = sock; + } + + int Client::sendRequestHeader(RequestHeader& req) { + return sock->sendstr(req.serialize()); + } + + int Client::recvRequestHeader(RequestHeader& req, int timeout) { + // Non-blocking mode not alloowed + if (!timeout) { return -1; } + + // Read response + std::string respData; + int err = recvHeader(respData, timeout); + if (err) { return err; } + + // Deserialize + req.deserialize(respData); + } + + int Client::sendResponseHeader(ResponseHeader& resp) { + return sock->sendstr(resp.serialize()); + } + + int Client::recvResponseHeader(ResponseHeader& resp, int timeout) { + // Non-blocking mode not alloowed + if (!timeout) { return -1; } + + // Read response + std::string respData; + int err = recvHeader(respData, timeout); + if (err) { return err; } + + // Deserialize + resp.deserialize(respData); + } + + int Client::sendChunkHeader(ChunkHeader& chdr) { + return sock->sendstr(chdr.serialize()); + } + + int Client::recvChunkHeader(ChunkHeader& chdr, int timeout) { + std::string respData; + int err = sock->recvline(respData, 0, timeout); + if (err <= 0) { return err; } + if (respData[respData.size()-1] == '\r') { + respData.pop_back(); + } + chdr.deserialize(respData); + return 0; + } + + int Client::recvHeader(std::string& data, int timeout) { + while (sock->isOpen()) { + std::string line; + int ret = sock->recvline(line); + if (line == "\r") { break; } + if (ret <= 0) { return ret; } + data += line + "\n"; + } + return 0; + } +} \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/proto/http.h b/source_modules/spectran_http_source/src/proto/http.h new file mode 100644 index 00000000..77901ec7 --- /dev/null +++ b/source_modules/spectran_http_source/src/proto/http.h @@ -0,0 +1,276 @@ +#pragma once +#include +#include +#include "../net.h" + +namespace net::http { + enum Method { + METHOD_OPTIONS, + METHOD_GET, + METHOD_HEAD, + METHOD_POST, + METHOD_PUT, + METHOD_DELETE, + METHOD_TRACE, + METHOD_CONNECT + }; + + inline std::map MethodStrings { + { METHOD_OPTIONS, "OPTIONS" }, + { METHOD_GET, "GET" }, + { METHOD_HEAD, "HEAD" }, + { METHOD_POST, "POST" }, + { METHOD_PUT, "PUT" }, + { METHOD_DELETE, "DELETE" }, + { METHOD_TRACE, "TRACE" }, + { METHOD_CONNECT, "CONNECT" } + }; + + enum StatusCode { + STATUS_CODE_CONTINUE = 100, + STATUS_CODE_SWITCH_PROTO = 101, + + STATUS_CODE_OK = 200, + STATUS_CODE_CREATED = 201, + STATUS_CODE_ACCEPTED = 202, + STATUS_CODE_NON_AUTH_INFO = 203, + STATUS_CODE_NO_CONTENT = 204, + STATUS_CODE_RESET_CONTENT = 205, + STATUS_CODE_PARTIAL_CONTENT = 206, + + STATUS_CODE_MULTIPLE_CHOICES = 300, + STATUS_CODE_MOVED_PERMANENTLY = 301, + STATUS_CODE_FOUND = 302, + STATUS_CODE_SEE_OTHER = 303, + STATUS_CODE_NOT_MODIFIED = 304, + STATUS_CODE_USE_PROXY = 305, + STATUS_CODE_TEMP_REDIRECT = 307, + + STATUS_CODE_BAD_REQUEST = 400, + STATUS_CODE_UNAUTHORIZED = 401, + STATUS_CODE_PAYMENT_REQUIRED = 402, + STATUS_CODE_FORBIDDEN = 403, + STATUS_CODE_NOT_FOUND = 404, + STATUS_CODE_METHOD_NOT_ALLOWED = 405, + STATUS_CODE_NOT_ACCEPTABLE = 406, + STATUS_CODE_PROXY_AUTH_REQ = 407, + STATUS_CODE_REQUEST_TIEMOUT = 408, + STATUS_CODE_CONFLICT = 409, + STATUS_CODE_GONE = 410, + STATUS_CODE_LENGTH_REQUIRED = 411, + STATUS_CODE_PRECONDITION_FAILED = 412, + STATUS_CODE_REQ_ENTITY_TOO_LARGE = 413, + STATUS_CODE_REQ_URI_TOO_LONG = 414, + STATUS_CODE_UNSUPPORTED_MEDIA_TYPE = 415, + STATUS_CODE_REQ_RANGE_NOT_SATISFIABLE = 416, + STATUS_CODE_EXPECTATION_FAILED = 417, + STATUS_CODE_IM_A_TEAPOT = 418, + STATUS_CODE_ENHANCE_YOUR_CALM = 420, + + STATUS_CODE_INTERNAL_SERVER_ERROR = 500, + STATUS_CODE_NOT_IMPLEMENTED = 501, + STATUS_CODE_BAD_GATEWAY = 502, + STATUS_CODE_SERVICE_UNAVAILABLE = 503, + STATUS_CODE_GATEWAY_TIMEOUT = 504, + STATUS_CODE_HTTP_VERSION_UNSUPPORTED = 505 + }; + + inline std::map StatusCodeStrings { + { STATUS_CODE_CONTINUE , "CONTINUE" }, + { STATUS_CODE_SWITCH_PROTO , "SWITCH_PROTO" }, + + { STATUS_CODE_OK , "OK" }, + { STATUS_CODE_CREATED , "CREATED" }, + { STATUS_CODE_ACCEPTED , "ACCEPTED" }, + { STATUS_CODE_NON_AUTH_INFO , "NON_AUTH_INFO" }, + { STATUS_CODE_NO_CONTENT , "NO_CONTENT" }, + { STATUS_CODE_RESET_CONTENT , "RESET_CONTENT" }, + { STATUS_CODE_PARTIAL_CONTENT , "PARTIAL_CONTENT" }, + + { STATUS_CODE_MULTIPLE_CHOICES , "MULTIPLE_CHOICES" }, + { STATUS_CODE_MOVED_PERMANENTLY , "MOVED_PERMANENTLY" }, + { STATUS_CODE_FOUND , "FOUND" }, + { STATUS_CODE_SEE_OTHER , "SEE_OTHER" }, + { STATUS_CODE_NOT_MODIFIED , "NOT_MODIFIED" }, + { STATUS_CODE_USE_PROXY , "USE_PROXY" }, + { STATUS_CODE_TEMP_REDIRECT , "TEMP_REDIRECT" }, + + { STATUS_CODE_BAD_REQUEST , "BAD_REQUEST" }, + { STATUS_CODE_UNAUTHORIZED , "UNAUTHORIZED" }, + { STATUS_CODE_PAYMENT_REQUIRED , "PAYMENT_REQUIRED" }, + { STATUS_CODE_FORBIDDEN , "FORBIDDEN" }, + { STATUS_CODE_NOT_FOUND , "NOT_FOUND" }, + { STATUS_CODE_METHOD_NOT_ALLOWED , "METHOD_NOT_ALLOWED" }, + { STATUS_CODE_NOT_ACCEPTABLE , "NOT_ACCEPTABLE" }, + { STATUS_CODE_PROXY_AUTH_REQ , "PROXY_AUTH_REQ" }, + { STATUS_CODE_REQUEST_TIEMOUT , "REQUEST_TIEMOUT" }, + { STATUS_CODE_CONFLICT , "CONFLICT" }, + { STATUS_CODE_GONE , "GONE" }, + { STATUS_CODE_LENGTH_REQUIRED , "LENGTH_REQUIRED" }, + { STATUS_CODE_PRECONDITION_FAILED , "PRECONDITION_FAILED" }, + { STATUS_CODE_REQ_ENTITY_TOO_LARGE , "REQ_ENTITY_TOO_LARGE" }, + { STATUS_CODE_REQ_URI_TOO_LONG , "REQ_URI_TOO_LONG" }, + { STATUS_CODE_UNSUPPORTED_MEDIA_TYPE , "UNSUPPORTED_MEDIA_TYPE" }, + { STATUS_CODE_REQ_RANGE_NOT_SATISFIABLE, "REQ_RANGE_NOT_SATISFIABLE"}, + { STATUS_CODE_EXPECTATION_FAILED , "EXPECTATION_FAILED" }, + { STATUS_CODE_IM_A_TEAPOT , "IM_A_TEAPOT" }, + { STATUS_CODE_ENHANCE_YOUR_CALM , "ENHANCE_YOUR_CALM" }, + + { STATUS_CODE_INTERNAL_SERVER_ERROR , "INTERNAL_SERVER_ERROR" }, + { STATUS_CODE_NOT_IMPLEMENTED , "NOT_IMPLEMENTED" }, + { STATUS_CODE_BAD_GATEWAY , "BAD_GATEWAY" }, + { STATUS_CODE_SERVICE_UNAVAILABLE , "SERVICE_UNAVAILABLE" }, + { STATUS_CODE_GATEWAY_TIMEOUT , "GATEWAY_TIMEOUT" }, + { STATUS_CODE_HTTP_VERSION_UNSUPPORTED , "HTTP_VERSION_UNSUPPORTED" } + }; + + /** + * HTTP Message Header + */ + class MessageHeader { + public: + /** + * Serialize header to string. + * @return Header in string form. + */ + std::string serialize(); + + /** + * Deserialize header from string. + * @param data Header in string form. + */ + void deserialize(const std::string& data); + + /** + * Get field list. + * @return Map from field name to field. + */ + std::map& getFields(); + + /** + * Check if a field exists in the header. + * @return True if the field exists, false otherwise. + */ + bool hasField(const std::string& name); + + /** + * Get field value. + * @param name Name of the field. + * @return Field value. + */ + std::string getField(const std::string name); + + /** + * Set field. + * @param name Field name. + * @param value Field value. + */ + void setField(const std::string& name, const std::string& value); + + /** + * Delete field. + * @param name Field name. + */ + void clearField(const std::string& name); + + private: + int readLine(const std::string& str, std::string& line, int start = 0); + virtual std::string serializeStartLine() = 0; + virtual void deserializeStartLine(const std::string& data) = 0; + std::map fields; + }; + + /** + * HTTP Request Header + */ + class RequestHeader : public MessageHeader { + public: + RequestHeader() {} + + /** + * Create request header from the mandatory parameters. + * @param method HTTP Method. + * @param uri URI of request. + * @param host Server host passed in the 'Host' field. + */ + RequestHeader(Method method, std::string uri, std::string host); + + /** + * Create request header from its serialized string form. + * @param data Request header in string form. + */ + RequestHeader(const std::string& data); + + /** + * Get HTTP Method. + * @return HTTP Method. + */ + Method getMethod(); + void setMethod(Method method); + std::string getURI(); + void setURI(const std::string& uri); + + private: + void deserializeStartLine(const std::string& data); + std::string serializeStartLine(); + + Method method; + std::string uri; + }; + + class ResponseHeader : public MessageHeader { + public: + ResponseHeader() {} + ResponseHeader(StatusCode statusCode); + ResponseHeader(StatusCode statusCode, const std::string& statusString); + ResponseHeader(const std::string& data); + + StatusCode getStatusCode(); + void setStatusCode(StatusCode statusCode); + std::string getStatusString(); + void setStatusString(const std::string& statusString); + + private: + void deserializeStartLine(const std::string& data); + std::string serializeStartLine(); + + StatusCode statusCode; + std::string statusString; + }; + + class ChunkHeader { + public: + ChunkHeader() {} + ChunkHeader(size_t length); + ChunkHeader(const std::string& data); + + std::string serialize(); + void deserialize(const std::string& data); + + size_t getLength(); + void setLength(size_t length); + + private: + size_t length; + }; + + class Client { + public: + Client() {} + Client(std::shared_ptr sock); + + int sendRequestHeader(RequestHeader& req); + int recvRequestHeader(RequestHeader& req, int timeout = -1); + int sendResponseHeader(ResponseHeader& resp); + int recvResponseHeader(ResponseHeader& resp, int timeout = -1); + int sendChunkHeader(ChunkHeader& chdr); + int recvChunkHeader(ChunkHeader& chdr, int timeout = -1); + + private: + int recvHeader(std::string& data, int timeout = -1); + std::shared_ptr sock; + + }; + + +} \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/spectran_http_client.cpp b/source_modules/spectran_http_source/src/spectran_http_client.cpp new file mode 100644 index 00000000..bd438ab9 --- /dev/null +++ b/source_modules/spectran_http_source/src/spectran_http_client.cpp @@ -0,0 +1,89 @@ +#include "spectran_http_client.h" +#include + +SpectranHTTPClient::SpectranHTTPClient(std::string host, int port, dsp::stream* stream) { + this->stream = stream; + + // Connect to server + sock = net::connect(host, port); + http = net::http::Client(sock); + + // Make request + net::http::RequestHeader rqhdr(net::http::METHOD_GET, "/stream?format=float32", host); + http.sendRequestHeader(rqhdr); + net::http::ResponseHeader rshdr; + http.recvResponseHeader(rshdr, 5000); + + // Start chunk worker + workerThread = std::thread(&SpectranHTTPClient::worker, this); +} + +void SpectranHTTPClient::streaming(bool enabled) { + streamingEnabled = enabled; +} + +bool SpectranHTTPClient::isOpen() { + return sock->isOpen(); +} + +void SpectranHTTPClient::close() { + sock->close(); + stream->stopWriter(); + if (workerThread.joinable()) { workerThread.join(); } + stream->clearWriteStop(); +} + +void SpectranHTTPClient::worker() { + while (sock->isOpen()) { + // Get chunk header + net::http::ChunkHeader chdr; + int err = http.recvChunkHeader(chdr, 5000); + if (err < 0) { return; } + + // If null length, finish + size_t clen = chdr.getLength(); + if (!clen) { return; } + + // Read JSON + std::string jsonData; + int jlen = sock->recvline(jsonData, clen, 5000); + if (jlen <= 0) { + spdlog::error("Couldn't read JSON metadata"); + return; + } + + // Read (and check for) record separator + uint8_t rs; + int rslen = sock->recv(&rs, 1, true, 5000); + if (rslen != 1 || rs != 0x1E) { + spdlog::error("Missing record separator"); + return; + } + + // Read data + uint8_t* buf = (uint8_t*)stream->writeBuf; + int sampLen = 0; + for (int i = jlen + 1; i < clen;) { + int read = sock->recv(&buf[sampLen], clen - i, true); + if (read <= 0) { + spdlog::error("Recv failed while reading data"); + return; + } + i += read; + sampLen += read; + } + + // Swap to stream + if (streamingEnabled) { + if (!stream->swap(sampLen / 8)) { return; } + } + + // Read trailing CRLF + std::string dummy; + sock->recvline(dummy, 2); + if (dummy != "\r") { + spdlog::error("Missing trailing CRLF"); + return; + } + } +} \ No newline at end of file diff --git a/source_modules/spectran_http_source/src/spectran_http_client.h b/source_modules/spectran_http_source/src/spectran_http_client.h new file mode 100644 index 00000000..eef15579 --- /dev/null +++ b/source_modules/spectran_http_source/src/spectran_http_client.h @@ -0,0 +1,25 @@ +#pragma once +#include +#include +#include +#include +#include "proto/http.h" + +class SpectranHTTPClient { +public: + SpectranHTTPClient(std::string host, int port, dsp::stream* stream); + + void streaming(bool enabled); + bool isOpen(); + void close(); + +private: + void worker(); + + std::shared_ptr sock; + net::http::Client http; + dsp::stream* stream; + std::thread workerThread; + + bool streamingEnabled = false; +}; \ No newline at end of file