#pragma once #include #include #include namespace dsp { template class Splitter : public generic_block> { public: Splitter() {} Splitter(stream* in) { init(in); } ~Splitter() { generic_block::stop(); } void init(stream* in) { _in = in; generic_block::registerInput(_in); } void setInput(stream* in) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); generic_block::unregisterInput(_in); _in = in; generic_block::registerInput(_in); generic_block::tempStart(); } void bindStream(stream* stream) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); out.push_back(stream); generic_block::registerOutput(stream); generic_block::tempStart(); } void unbindStream(stream* stream) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); generic_block::unregisterOutput(stream); out.erase(std::remove(out.begin(), out.end(), stream), out.end()); generic_block::tempStart(); } private: int run() { // TODO: If too slow, buffering might be necessary int count = _in->read(); if (count < 0) { return -1; } for (const auto& stream : out) { if (stream->aquire() < 0) { return -1; } memcpy(stream->data, _in->data, count * sizeof(T)); stream->write(count); } _in->flush(); return count; } stream* _in; std::vector*> out; }; template class Reshaper : public generic_block> { public: Reshaper() {} Reshaper(stream* in) { init(in); } ~Reshaper() { generic_block>::stop(); } void init(stream* in) { _in = in; buffer = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment()); generic_block>::registerInput(_in); generic_block>::registerOutput(&out); } void setInput(stream* in) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _in = in; generic_block>::registerInput(_in); generic_block>::tempStart(); } int run() { int count = _in->read(); _in->flush(); return count; } stream out; private: stream* _in; T* buffer; int _outBlockSize; int readCount; }; }