SDRPlusPlus/core/src/dsp/sink.h

140 lines
4.1 KiB
C
Raw Normal View History

2020-06-22 16:45:57 +02:00
#pragma once
2020-11-02 03:57:44 +01:00
#include <dsp/block.h>
#include <dsp/buffer.h>
2020-06-22 16:45:57 +02:00
namespace dsp {
2020-11-02 03:57:44 +01:00
template <class T>
class HandlerSink : public generic_block<HandlerSink<T>> {
2020-06-22 16:45:57 +02:00
public:
2020-11-02 03:57:44 +01:00
HandlerSink() {}
2020-06-22 16:45:57 +02:00
2020-11-02 03:57:44 +01:00
HandlerSink(stream<T>* in, void (*handler)(T* data, int count, void* ctx), void* ctx) { init(in, handler, ctx); }
2020-06-22 16:45:57 +02:00
2020-11-02 03:57:44 +01:00
~HandlerSink() { generic_block<HandlerSink<T>>::stop(); }
void init(stream<T>* in, void (*handler)(T* data, int count, void* ctx), void* ctx) {
_in = in;
2020-06-22 16:45:57 +02:00
_handler = handler;
2020-11-02 03:57:44 +01:00
_ctx = ctx;
generic_block<HandlerSink<T>>::registerInput(_in);
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
void setInput(stream<T>* in) {
std::lock_guard<std::mutex> lck(generic_block<HandlerSink<T>>::ctrlMtx);
generic_block<HandlerSink<T>>::tempStop();
2020-11-02 17:48:17 +01:00
generic_block<HandlerSink<T>>::unregisterInput(_in);
2020-11-02 03:57:44 +01:00
_in = in;
2020-11-02 17:48:17 +01:00
generic_block<HandlerSink<T>>::registerInput(_in);
2020-11-02 03:57:44 +01:00
generic_block<HandlerSink<T>>::tempStart();
2020-07-19 15:59:44 +02:00
}
2020-11-02 03:57:44 +01:00
void setHandler(void (*handler)(T* data, int count, void* ctx), void* ctx) {
std::lock_guard<std::mutex> lck(generic_block<HandlerSink<T>>::ctrlMtx);
generic_block<HandlerSink<T>>::tempStop();
_handler = handler;
_ctx = ctx;
generic_block<HandlerSink<T>>::tempStart();
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
int run() {
count = _in->read();
if (count < 0) { return -1; }
_handler(_in->readBuf, count, _ctx);
2020-11-02 03:57:44 +01:00
_in->flush();
return count;
}
2020-06-22 16:45:57 +02:00
private:
2020-11-02 03:57:44 +01:00
int count;
stream<T>* _in;
void (*_handler)(T* data, int count, void* ctx);
void* _ctx;
2020-06-22 16:45:57 +02:00
};
2020-10-24 14:51:55 +02:00
template <class T>
2020-11-02 03:57:44 +01:00
class RingBufferSink : public generic_block<RingBufferSink<T>> {
2020-06-22 16:45:57 +02:00
public:
2020-11-02 03:57:44 +01:00
RingBufferSink() {}
2020-06-22 16:45:57 +02:00
2020-11-02 03:57:44 +01:00
RingBufferSink(stream<T>* in) { init(in); }
2020-06-22 16:45:57 +02:00
2020-11-02 03:57:44 +01:00
~RingBufferSink() { generic_block<RingBufferSink<T>>::stop(); }
2020-06-22 16:45:57 +02:00
2020-11-02 03:57:44 +01:00
void init(stream<T>* in) {
_in = in;
2020-11-02 21:13:28 +01:00
data.init(480); // TODO: Use an argument
2020-11-02 03:57:44 +01:00
generic_block<RingBufferSink<T>>::registerInput(_in);
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
void setInput(stream<T>* in) {
std::lock_guard<std::mutex> lck(generic_block<RingBufferSink<T>>::ctrlMtx);
generic_block<RingBufferSink<T>>::tempStop();
2020-11-02 17:48:17 +01:00
generic_block<RingBufferSink<T>>::unregisterInput(_in);
2020-11-02 03:57:44 +01:00
_in = in;
2020-11-02 17:48:17 +01:00
generic_block<RingBufferSink<T>>::registerInput(_in);
2020-11-02 03:57:44 +01:00
generic_block<RingBufferSink<T>>::tempStart();
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
int run() {
count = _in->read();
if (count < 0) { return -1; }
if (data.write(_in->readBuf, count) < 0) { return -1; }
2020-11-02 03:57:44 +01:00
_in->flush();
return count;
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
RingBuffer<T> data;
2020-06-22 16:45:57 +02:00
private:
2020-11-02 03:57:44 +01:00
void doStop() {
_in->stopReader();
data.stopWriter();
if (generic_block<RingBufferSink<T>>::workerThread.joinable()) {
generic_block<RingBufferSink<T>>::workerThread.join();
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
_in->clearReadStop();
data.clearWriteStop();
2020-06-22 16:45:57 +02:00
}
2020-11-02 03:57:44 +01:00
int count;
stream<T>* _in;
2020-06-22 16:45:57 +02:00
};
2020-11-02 16:16:21 +01:00
template <class T>
class NullSink : public generic_block<NullSink<T>> {
public:
NullSink() {}
NullSink(stream<T>* in) { init(in); }
~NullSink() { generic_block<NullSink<T>>::stop(); }
void init(stream<T>* in) {
_in = in;
generic_block<NullSink<T>>::registerInput(_in);
}
void setInput(stream<T>* in) {
std::lock_guard<std::mutex> lck(generic_block<NullSink<T>>::ctrlMtx);
generic_block<NullSink<T>>::tempStop();
2020-11-02 17:48:17 +01:00
generic_block<NullSink<T>>::unregisterInput(_in);
2020-11-02 16:16:21 +01:00
_in = in;
2020-11-02 17:48:17 +01:00
generic_block<NullSink<T>>::registerInput(_in);
2020-11-02 16:16:21 +01:00
generic_block<NullSink<T>>::tempStart();
}
int run() {
count = _in->read();
if (count < 0) { return -1; }
_in->flush();
return count;
}
private:
int count;
stream<T>* _in;
};
2020-11-02 03:57:44 +01:00
}