#pragma once #include #include #include #include #include namespace dsp { template class Splitter : public generic_block> { public: Splitter() {} Splitter(stream* in) { init(in); } ~Splitter() { generic_block::stop(); } void init(stream* in) { _in = in; generic_block::registerInput(_in); } void setInput(stream* in) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); generic_block::unregisterInput(_in); _in = in; generic_block::registerInput(_in); generic_block::tempStart(); } void bindStream(stream* stream) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); out.push_back(stream); generic_block::registerOutput(stream); generic_block::tempStart(); } void unbindStream(stream* stream) { std::lock_guard lck(generic_block::ctrlMtx); generic_block::tempStop(); generic_block::unregisterOutput(stream); out.erase(std::remove(out.begin(), out.end(), stream), out.end()); generic_block::tempStart(); } private: int run() { // TODO: If too slow, buffering might be necessary int count = _in->read(); if (count < 0) { return -1; } for (const auto& stream : out) { if (stream->aquire() < 0) { return -1; } memcpy(stream->data, _in->data, count * sizeof(T)); stream->write(count); } _in->flush(); return count; } stream* _in; std::vector*> out; }; // NOTE: I'm not proud of this, it's BAD and just taken from the previous DSP, but it works... template class Reshaper : public generic_block> { public: Reshaper() {} Reshaper(stream* in, int keep, int skip) { init(in, keep, skip); } ~Reshaper() { generic_block>::stop(); } void init(stream* in, int keep, int skip) { _in = in; _keep = keep; _skip = skip; ringBuf.init(keep * 2); generic_block>::registerInput(_in); generic_block>::registerOutput(&out); } void setInput(stream* in) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _in = in; generic_block>::registerInput(_in); generic_block>::tempStart(); } void setKeep(int keep) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _keep = keep; ringBuf.setMaxLatency(keep * 2); generic_block>::registerInput(_in); generic_block>::tempStart(); } void setSkip(int skip) { std::lock_guard lck(generic_block>::ctrlMtx); generic_block>::tempStop(); generic_block>::unregisterInput(_in); _skip = skip; generic_block>::registerInput(_in); generic_block>::tempStart(); } int run() { int count = _in->read(); if (count < 0) { return -1; } ringBuf.write(_in->data, count); _in->flush(); return count; } stream out; private: void doStart() { workThread = std::thread(&Reshaper::loop, this); bufferWorkerThread = std::thread(&Reshaper::bufferWorker, this); } void loop() { while (run() >= 0); } void doStop() { _in->stopReader(); ringBuf.stopReader(); out.stopWriter(); ringBuf.stopWriter(); if (workThread.joinable()) { workThread.join(); } if (bufferWorkerThread.joinable()) { bufferWorkerThread.join(); } _in->clearReadStop(); ringBuf.clearReadStop(); out.clearWriteStop(); ringBuf.clearWriteStop(); } void bufferWorker() { complex_t* buf = new complex_t[_keep]; bool delay = _skip < 0; int readCount = std::min(_keep + _skip, _keep); int skip = std::max(_skip, 0); int delaySize = (-_skip) * sizeof(complex_t); complex_t* start = &buf[std::max(-_skip, 0)]; complex_t* delayStart = &buf[_keep + _skip]; while (true) { if (delay) { memmove(buf, delayStart, delaySize); } if (ringBuf.readAndSkip(start, readCount, skip) < 0) { break; }; if (out.aquire() < 0) { break; } memcpy(out.data, buf, _keep * sizeof(complex_t)); out.write(_keep); } delete[] buf; } stream* _in; int _outBlockSize; RingBuffer ringBuf; std::thread bufferWorkerThread; std::thread workThread; int _keep, _skip; }; }