#pragma once #include #include #include // 1MB buffer #define STREAM_BUFFER_SIZE 1000000 namespace dsp { class untyped_stream { public: virtual bool swap(int size) { return false; } virtual int read() { return -1; } virtual void flush() {} virtual void stopWriter() {} virtual void clearWriteStop() {} virtual void stopReader() {} virtual void clearReadStop() {} }; template class stream : public untyped_stream { public: stream() { writeBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment()); readBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment()); } ~stream() { volk_free(writeBuf); volk_free(readBuf); } bool swap(int size) { { // Wait to either swap or stop std::unique_lock lck(swapMtx); swapCV.wait(lck, [this]{ return (canSwap || writerStop); }); // If writer was stopped, abandon operation if (writerStop) { return false; } // Swap buffers dataSize = size; T* temp = writeBuf; writeBuf = readBuf; readBuf = temp; canSwap = false; } // Notify reader that some data is ready { std::lock_guard lck(rdyMtx); dataReady = true; } rdyCV.notify_all(); return true; } int read() { // Wait for data to be ready or to be stopped std::unique_lock lck(rdyMtx); rdyCV.wait(lck, [this]{ return (dataReady || readerStop); }); return (readerStop ? -1 : dataSize); } void flush() { // Clear data ready { std::lock_guard lck(rdyMtx); dataReady = false; } // Notify writer that buffers can be swapped { std::lock_guard lck(swapMtx); canSwap = true; } swapCV.notify_all(); } void stopWriter() { { std::lock_guard lck(swapMtx); writerStop = true; } swapCV.notify_all(); } void clearWriteStop() { writerStop = false; } void stopReader() { { std::lock_guard lck(rdyMtx); readerStop = true; } rdyCV.notify_all(); } void clearReadStop() { readerStop = false; } T* writeBuf; T* readBuf; private: std::mutex swapMtx; std::condition_variable swapCV; bool canSwap = true; std::mutex rdyMtx; std::condition_variable rdyCV; bool dataReady = false; bool readerStop = false; bool writerStop = false; int dataSize = 0; }; }