SDRPlusPlus/core/src/dsp/stream.h

126 lines
3.1 KiB
C
Raw Normal View History

2020-06-10 04:13:56 +02:00
#pragma once
2020-11-02 03:57:44 +01:00
#include <mutex>
2020-06-10 04:13:56 +02:00
#include <condition_variable>
2020-11-02 03:57:44 +01:00
#include <volk/volk.h>
2020-06-10 04:13:56 +02:00
2020-11-02 03:57:44 +01:00
// 1MB buffer
#define STREAM_BUFFER_SIZE 1000000
2020-06-22 16:45:57 +02:00
namespace dsp {
class untyped_stream {
2020-11-02 03:57:44 +01:00
public:
virtual bool swap(int size) { return false; }
2020-11-02 03:57:44 +01:00
virtual int read() { return -1; }
virtual void flush() {}
virtual void stopWriter() {}
virtual void clearWriteStop() {}
virtual void stopReader() {}
virtual void clearReadStop() {}
2020-11-02 03:57:44 +01:00
};
2020-06-10 04:13:56 +02:00
template <class T>
class stream : public untyped_stream {
2020-06-10 04:13:56 +02:00
public:
2020-06-10 18:52:07 +02:00
stream() {
writeBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
readBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
2020-06-10 18:52:07 +02:00
}
~stream() {
volk_free(writeBuf);
volk_free(readBuf);
2020-06-10 04:13:56 +02:00
}
bool swap(int size) {
{
// Wait to either swap or stop
std::unique_lock<std::mutex> lck(swapMtx);
swapCV.wait(lck, [this] { return (canSwap || writerStop); });
// If writer was stopped, abandon operation
2020-12-25 18:17:43 +01:00
if (writerStop) { return false; }
// Swap buffers
dataSize = size;
T* temp = writeBuf;
writeBuf = readBuf;
readBuf = temp;
canSwap = false;
}
// Notify reader that some data is ready
2020-11-02 16:16:21 +01:00
{
std::lock_guard<std::mutex> lck(rdyMtx);
2020-11-02 16:16:21 +01:00
dataReady = true;
}
rdyCV.notify_all();
return true;
2020-06-10 04:13:56 +02:00
}
2020-11-02 03:57:44 +01:00
int read() {
// Wait for data to be ready or to be stopped
std::unique_lock<std::mutex> lck(rdyMtx);
rdyCV.wait(lck, [this] { return (dataReady || readerStop); });
2020-12-25 18:17:43 +01:00
return (readerStop ? -1 : dataSize);
2020-06-10 04:13:56 +02:00
}
2020-11-02 03:57:44 +01:00
void flush() {
2020-12-25 18:17:43 +01:00
// Clear data ready
{
std::lock_guard<std::mutex> lck(rdyMtx);
dataReady = false;
}
// Notify writer that buffers can be swapped
2020-11-02 16:16:21 +01:00
{
std::lock_guard<std::mutex> lck(swapMtx);
canSwap = true;
2020-11-02 16:16:21 +01:00
}
2020-12-25 18:17:43 +01:00
swapCV.notify_all();
2020-06-10 04:13:56 +02:00
}
void stopWriter() {
2020-11-02 16:16:21 +01:00
{
std::lock_guard<std::mutex> lck(swapMtx);
writerStop = true;
2020-11-02 16:16:21 +01:00
}
swapCV.notify_all();
2020-06-10 04:13:56 +02:00
}
void clearWriteStop() {
writerStop = false;
2020-06-15 15:53:45 +02:00
}
void stopReader() {
2020-11-02 16:16:21 +01:00
{
std::lock_guard<std::mutex> lck(rdyMtx);
readerStop = true;
2020-11-02 16:16:21 +01:00
}
rdyCV.notify_all();
2020-06-15 15:53:45 +02:00
}
void clearReadStop() {
readerStop = false;
2020-06-15 15:53:45 +02:00
}
T* writeBuf;
T* readBuf;
2020-06-15 15:53:45 +02:00
2020-11-02 03:57:44 +01:00
private:
std::mutex swapMtx;
std::condition_variable swapCV;
bool canSwap = true;
2020-06-15 15:53:45 +02:00
std::mutex rdyMtx;
std::condition_variable rdyCV;
2020-11-02 03:57:44 +01:00
bool dataReady = false;
2020-06-22 16:45:57 +02:00
2020-11-02 03:57:44 +01:00
bool readerStop = false;
bool writerStop = false;
int dataSize = 0;
2020-06-10 04:13:56 +02:00
};
2020-11-02 03:57:44 +01:00
}