#pragma once #include #include #include #include #include namespace dsp { template class Splitter : public generic_block> { public: Splitter() {} Splitter(stream* in) { init(in); } ~Splitter() { generic_block::stop(); } void init(stream* in) { _in = in; generic_block::registerInput(_in); } void setInput(stream* in) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); generic_block::unregisterInput(_in); _in = in; generic_block::registerInput(_in); generic_block::tempStart(); } void bindStream(stream* stream) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); out.push_back(stream); generic_block::registerOutput(stream); generic_block::tempStart(); } void unbindStream(stream* stream) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); generic_block::unregisterOutput(stream); out.erase(std::remove(out.begin(), out.end(), stream), out.end()); generic_block::tempStart(); } private: int run() { // TODO: If too slow, buffering might be necessary int count = _in->read(); if (count < 0) { return -1; } for (const auto& stream : out) { memcpy(stream->writeBuf, _in->readBuf, count * sizeof(T)); if (!stream->swap(count)) { return -1; } } _in->flush(); return count; } stream* _in; std::vector*> out; }; // NOTE: I'm not proud of this, it's BAD and just taken from the previous DSP, but it works... template class Reshaper : public generic_block> { public: Reshaper() {} Reshaper(stream* in, int keep, int skip) { init(in, keep, skip); } ~Reshaper() { generic_block>::stop(); } void init(stream* in, int keep, int skip) { _in = in; _keep = keep; _skip = skip; ringBuf.init(keep * 2); generic_block>::registerInput(_in); generic_block>::registerOutput(&out); } void setInput(stream* in) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _in = in; generic_block>::registerInput(_in); generic_block>::tempStart(); } void setKeep(int keep) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _keep = keep; ringBuf.setMaxLatency(keep * 2); generic_block>::registerInput(_in); generic_block>::tempStart(); } void setSkip(int skip) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _skip = skip; generic_block>::registerInput(_in); generic_block>::tempStart(); } int run() { int count = _in->read(); if (count < 0) { return -1; } ringBuf.write(_in->readBuf, count); _in->flush(); return count; } stream out; private: void doStart() { workThread = std::thread(&Reshaper::loop, this); bufferWorkerThread = std::thread(&Reshaper::bufferWorker, this); } void loop() { while (run() >= 0); } void doStop() { _in->stopReader(); ringBuf.stopReader(); out.stopWriter(); ringBuf.stopWriter(); if (workThread.joinable()) { workThread.join(); } if (bufferWorkerThread.joinable()) { bufferWorkerThread.join(); } _in->clearReadStop(); ringBuf.clearReadStop(); out.clearWriteStop(); ringBuf.clearWriteStop(); } void bufferWorker() { T* buf = new T[_keep]; bool delay = _skip < 0; int readCount = std::min(_keep + _skip, _keep); int skip = std::max(_skip, 0); int delaySize = (-_skip) * sizeof(T); int delayCount = (-_skip); T* start = &buf[std::max(-_skip, 0)]; T* delayStart = &buf[_keep + _skip]; while (true) { if (delay) { memmove(buf, delayStart, delaySize); if constexpr (std::is_same_v || std::is_same_v) { for (int i = 0; i < delayCount; i++) { buf[i].i /= 10.0f; buf[i].q /= 10.0f; } } } if (ringBuf.readAndSkip(start, readCount, skip) < 0) { break; }; memcpy(out.writeBuf, buf, _keep * sizeof(T)); if (!out.swap(_keep)) { break; } } delete[] buf; } stream* _in; int _outBlockSize; RingBuffer ringBuf; std::thread bufferWorkerThread; std::thread workThread; int _keep, _skip; }; }