SDRPlusPlus/core/src/dsp/stream.h
2020-12-25 18:17:43 +01:00

126 lines
3.1 KiB
C++

#pragma once
#include <mutex>
#include <condition_variable>
#include <volk/volk.h>
// 1MB buffer
#define STREAM_BUFFER_SIZE 1000000
namespace dsp {
class untyped_steam {
public:
virtual bool swap(int size) { return false; }
virtual int read() { return -1; }
virtual void flush() {}
virtual void stopWriter() {}
virtual void clearWriteStop() {}
virtual void stopReader() {}
virtual void clearReadStop() {}
};
template <class T>
class stream : public untyped_steam {
public:
stream() {
writeBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
readBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
}
~stream() {
volk_free(writeBuf);
volk_free(readBuf);
}
bool swap(int size) {
{
// Wait to either swap or stop
std::unique_lock<std::mutex> lck(swapMtx);
swapCV.wait(lck, [this]{ return (canSwap || writerStop); });
// If writer was stopped, abandon operation
if (writerStop) { return false; }
// Swap buffers
dataSize = size;
T* temp = writeBuf;
writeBuf = readBuf;
readBuf = temp;
canSwap = false;
}
// Notify reader that some data is ready
{
std::lock_guard<std::mutex> lck(rdyMtx);
dataReady = true;
}
rdyCV.notify_all();
return true;
}
int read() {
// Wait for data to be ready or to be stopped
std::unique_lock<std::mutex> lck(rdyMtx);
rdyCV.wait(lck, [this]{ return (dataReady || readerStop); });
return (readerStop ? -1 : dataSize);
}
void flush() {
// Clear data ready
{
std::lock_guard<std::mutex> lck(rdyMtx);
dataReady = false;
}
// Notify writer that buffers can be swapped
{
std::lock_guard<std::mutex> lck(swapMtx);
canSwap = true;
}
swapCV.notify_all();
}
void stopWriter() {
{
std::lock_guard<std::mutex> lck(swapMtx);
writerStop = true;
}
swapCV.notify_all();
}
void clearWriteStop() {
writerStop = false;
}
void stopReader() {
{
std::lock_guard<std::mutex> lck(rdyMtx);
readerStop = true;
}
rdyCV.notify_all();
}
void clearReadStop() {
readerStop = false;
}
T* writeBuf;
T* readBuf;
private:
std::mutex swapMtx;
std::condition_variable swapCV;
bool canSwap = true;
std::mutex rdyMtx;
std::condition_variable rdyCV;
bool dataReady = false;
bool readerStop = false;
bool writerStop = false;
int dataSize = 0;
};
}