diff --git a/airspyhf_source/src/main.cpp b/airspyhf_source/src/main.cpp index bd57b800..ddd4a8bd 100644 --- a/airspyhf_source/src/main.cpp +++ b/airspyhf_source/src/main.cpp @@ -262,11 +262,8 @@ private: static int callback(airspyhf_transfer_t* transfer) { AirspyHFSourceModule* _this = (AirspyHFSourceModule*)transfer->ctx; - if (_this->stream.aquire() < 0) { - return -1; - } - memcpy(_this->stream.data, transfer->samples, transfer->sample_count * sizeof(dsp::complex_t)); - _this->stream.write(transfer->sample_count); + memcpy(_this->stream.writeBuf, transfer->samples, transfer->sample_count * sizeof(dsp::complex_t)); + if (!_this->stream.swap(transfer->sample_count)) { return -1; } return 0; } diff --git a/core/src/dsp/audio.h b/core/src/dsp/audio.h index 624abdbe..4178ad37 100644 --- a/core/src/dsp/audio.h +++ b/core/src/dsp/audio.h @@ -29,14 +29,13 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } for (int i = 0; i < count; i++) { - out.data[i].l = _in->data[i]; - out.data[i].r = _in->data[i]; + out.writeBuf[i].l = _in->readBuf[i]; + out.writeBuf[i].r = _in->readBuf[i]; } _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -75,13 +74,12 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } for (int i = 0; i < count; i++) { - out.data[i] = (_in->data[i].l + _in->data[i].r) / 2.0f; + out.writeBuf[i] = (_in->readBuf[i].l + _in->readBuf[i].r) / 2.0f; } _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } diff --git a/core/src/dsp/block.h b/core/src/dsp/block.h index 1de249cb..4bfcaa4c 100644 --- a/core/src/dsp/block.h +++ b/core/src/dsp/block.h @@ -6,6 +6,8 @@ #include #include +#include + #define FL_M_PI 3.1415926535f namespace dsp { @@ -77,10 +79,10 @@ namespace dsp { } virtual void doStop() { - for (auto const& in : inputs) { + for (auto& in : inputs) { in->stopReader(); } - for (auto const& out : outputs) { + for (auto& out : outputs) { out->stopWriter(); } @@ -88,11 +90,11 @@ namespace dsp { if (workerThread.joinable()) { workerThread.join(); } - - for (auto const& in : inputs) { + + for (auto& in : inputs) { in->clearReadStop(); } - for (auto const& out : outputs) { + for (auto& out : outputs) { out->clearWriteStop(); } } diff --git a/core/src/dsp/convertion.h b/core/src/dsp/convertion.h index 705f9d26..0ece083e 100644 --- a/core/src/dsp/convertion.h +++ b/core/src/dsp/convertion.h @@ -31,11 +31,10 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - memcpy(out.data, _in->data, count * sizeof(complex_t)); + memcpy(out.writeBuf, _in->readBuf, count * sizeof(complex_t)); _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -75,11 +74,10 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - volk_32fc_deinterleave_real_32f(out.data, (lv_32fc_t*)_in->data, count); + volk_32fc_deinterleave_real_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count); _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -119,11 +117,10 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - volk_32fc_deinterleave_imag_32f(out.data, (lv_32fc_t*)_in->data, count); + volk_32fc_deinterleave_imag_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count); _in->flush(); - out.write(count); + if(!out.swap(count)) { return -1; } return count; } diff --git a/core/src/dsp/demodulator.h b/core/src/dsp/demodulator.h index feb2aba1..1bf0da91 100644 --- a/core/src/dsp/demodulator.h +++ b/core/src/dsp/demodulator.h @@ -83,19 +83,17 @@ namespace dsp { // This is somehow faster than volk... float diff, currentPhase; - - if (out.aquire() < 0) { return -1; } for (int i = 0; i < count; i++) { - currentPhase = fast_arctan2(_in->data[i].i, _in->data[i].q); + currentPhase = fast_arctan2(_in->readBuf[i].i, _in->readBuf[i].q); diff = currentPhase - phase; if (diff > 3.1415926535f) { diff -= 2 * 3.1415926535f; } else if (diff <= -3.1415926535f) { diff += 2 * 3.1415926535f; } - out.data[i] = diff / phasorSpeed; + out.writeBuf[i] = diff / phasorSpeed; phase = currentPhase; } _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -135,19 +133,18 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - volk_32fc_magnitude_32f(out.data, (lv_32fc_t*)_in->data, count); + volk_32fc_magnitude_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count); _in->flush(); - volk_32f_accumulator_s32f(&avg, out.data, count); + volk_32f_accumulator_s32f(&avg, out.writeBuf, count); avg /= (float)count; for (int i = 0; i < count; i++) { - out.data[i] -= avg; + out.writeBuf[i] -= avg; } - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -259,12 +256,11 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - volk_32fc_s32fc_x2_rotator_32fc(buffer, (lv_32fc_t*)_in->data, phaseDelta, &phase, count); - volk_32fc_deinterleave_real_32f(out.data, buffer, count); + volk_32fc_s32fc_x2_rotator_32fc(buffer, (lv_32fc_t*)_in->readBuf, phaseDelta, &phase, count); + volk_32fc_deinterleave_real_32f(out.writeBuf, buffer, count); _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } diff --git a/core/src/dsp/filter.h b/core/src/dsp/filter.h index 36279766..d32b0e3d 100644 --- a/core/src/dsp/filter.h +++ b/core/src/dsp/filter.h @@ -52,24 +52,21 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - memcpy(bufStart, _in->data, count * sizeof(T)); + memcpy(bufStart, _in->readBuf, count * sizeof(T)); _in->flush(); - // Write to output - if (out.aquire() < 0) { return -1; } - if constexpr (std::is_same_v) { for (int i = 0; i < count; i++) { - volk_32f_x2_dot_prod_32f((float*)&out.data[i], (float*)&buffer[i+1], taps, tapCount); + volk_32f_x2_dot_prod_32f((float*)&out.writeBuf[i], (float*)&buffer[i+1], taps, tapCount); } } if constexpr (std::is_same_v) { for (int i = 0; i < count; i++) { - volk_32fc_32f_dot_prod_32fc((lv_32fc_t*)&out.data[i], (lv_32fc_t*)&buffer[i+1], taps, tapCount); + volk_32fc_32f_dot_prod_32fc((lv_32fc_t*)&out.writeBuf[i], (lv_32fc_t*)&buffer[i+1], taps, tapCount); } } - out.write(count); + if (!out.swap(count)) { return -1; } memmove(buffer, &buffer[count], tapCount * sizeof(T)); @@ -135,25 +132,23 @@ namespace dsp { if (count < 0) { return -1; } if (bypass) { - if (out.aquire() < 0) { return -1; } - memcpy(out.data, _in->data, count * sizeof(float)); + memcpy(out.writeBuf, _in->readBuf, count * sizeof(float)); _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } if (isnan(lastOut)) { lastOut = 0.0f; } - if (out.aquire() < 0) { return -1; } - out.data[0] = (alpha * _in->data[0]) + ((1-alpha) * lastOut); + out.writeBuf[0] = (alpha * _in->readBuf[0]) + ((1-alpha) * lastOut); for (int i = 1; i < count; i++) { - out.data[i] = (alpha * _in->data[i]) + ((1 - alpha) * out.data[i - 1]); + out.writeBuf[i] = (alpha * _in->readBuf[i]) + ((1 - alpha) * out.writeBuf[i - 1]); } - lastOut = out.data[count - 1]; + lastOut = out.writeBuf[count - 1]; _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } diff --git a/core/src/dsp/math.h b/core/src/dsp/math.h index 1d92553e..4893049d 100644 --- a/core/src/dsp/math.h +++ b/core/src/dsp/math.h @@ -31,17 +31,16 @@ namespace dsp { return 0; } - if (out.aquire() < 0) { return -1; } if constexpr (std::is_same_v || std::is_same_v) { - volk_32fc_x2_add_32fc(out.data, _a->data, _b->data, a_count); + volk_32fc_x2_add_32fc(out.writeBuf, _a->readBuf, _b->readBuf, a_count); } else { - volk_32f_x2_add_32f(out.data, _a->data, _b->data, a_count); + volk_32f_x2_add_32f(out.writeBuf, _a->readBuf, _b->readBuf, a_count); } _a->flush(); _b->flush(); - out.write(a_count); + if (!out.swap(a_count)) { return -1; } return a_count; } @@ -82,17 +81,16 @@ namespace dsp { return 0; } - if (out.aquire() < 0) { return -1; } if constexpr (std::is_same_v) { - volk_32fc_x2_multiply_32fc(out.data, _a->data, _b->data, a_count); + volk_32fc_x2_multiply_32fc(out.writeBuf, _a->readBuf, _b->readBuf, a_count); } else { - volk_32f_x2_multiply_32f(out.data, _a->data, _b->data, a_count); + volk_32f_x2_multiply_32f(out.writeBuf, _a->readBuf, _b->readBuf, a_count); } _a->flush(); _b->flush(); - out.write(a_count); + if (!out.swap(a_count)) { return -1; } return a_count; } diff --git a/core/src/dsp/processing.h b/core/src/dsp/processing.h index 1ffcc05e..cef9c6c2 100644 --- a/core/src/dsp/processing.h +++ b/core/src/dsp/processing.h @@ -60,18 +60,16 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - // TODO: Do float xlation if constexpr (std::is_same_v) { spdlog::error("XLATOR NOT IMPLEMENTED FOR FLOAT"); } if constexpr (std::is_same_v) { - volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.data, (lv_32fc_t*)_in->data, phaseDelta, &phase, count); + volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.writeBuf, (lv_32fc_t*)_in->readBuf, phaseDelta, &phase, count); } _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -115,17 +113,13 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - for (int i = 0; i < count; i++) { - level = (fabsf(_in->data[i]) * _ratio) + (level * (1.0f - _ratio)); - out.data[i] = _in->data[i] / level; + level = (fabsf(_in->readBuf[i]) * _ratio) + (level * (1.0f - _ratio)); + out.writeBuf[i] = _in->readBuf[i] / level; } - - _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -185,27 +179,25 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } - if (_muted) { if constexpr (std::is_same_v) { - memset(out.data, 0, sizeof(stereo_t) * count); + memset(out.writeBuf, 0, sizeof(stereo_t) * count); } else { - memset(out.data, 0, sizeof(float) * count); + memset(out.writeBuf, 0, sizeof(float) * count); } } else { if constexpr (std::is_same_v) { - volk_32f_s32f_multiply_32f((float*)out.data, (float*)_in->data, level, count * 2); + volk_32f_s32f_multiply_32f((float*)out.writeBuf, (float*)_in->readBuf, level, count * 2); } else { - volk_32f_s32f_multiply_32f((float*)out.data, (float*)_in->data, level, count); + volk_32f_s32f_multiply_32f((float*)out.writeBuf, (float*)_in->readBuf, level, count); } } _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } @@ -260,21 +252,20 @@ namespace dsp { count = _in->read(); if (count < 0) { return -1; } - if (out.aquire() < 0) { return -1; } float sum = 0.0f; - volk_32fc_magnitude_32f(normBuffer, (lv_32fc_t*)_in->data, count); + volk_32fc_magnitude_32f(normBuffer, (lv_32fc_t*)_in->readBuf, count); volk_32f_accumulator_s32f(&sum, normBuffer, count); sum /= (float)count; if (10.0f * log10f(sum) >= _level) { - memcpy(out.data, _in->data, count * sizeof(complex_t)); + memcpy(out.writeBuf, _in->readBuf, count * sizeof(complex_t)); } else { - memset(out.data, 0, count * sizeof(complex_t)); + memset(out.writeBuf, 0, count * sizeof(complex_t)); } _in->flush(); - out.write(count); + if (!out.swap(count)) { return -1; } return count; } diff --git a/core/src/dsp/resampling.h b/core/src/dsp/resampling.h index 257ab962..254d8921 100644 --- a/core/src/dsp/resampling.h +++ b/core/src/dsp/resampling.h @@ -100,35 +100,32 @@ namespace dsp { int outCount = calcOutSize(count); - memcpy(&buffer[tapCount], _in->data, count * sizeof(T)); + memcpy(&buffer[tapCount], _in->readBuf, count * sizeof(T)); _in->flush(); // Write to output - if (out.aquire() < 0) { - return -1; - } int outIndex = 0; if constexpr (std::is_same_v) { for (int i = 0; outIndex < outCount; i += _decim) { - out.data[outIndex] = 0; + out.writeBuf[outIndex] = 0; for (int j = i % _interp; j < tapCount; j += _interp) { - out.data[outIndex] += buffer[((i - j) / _interp) + tapCount] * taps[j]; + out.writeBuf[outIndex] += buffer[((i - j) / _interp) + tapCount] * taps[j]; } outIndex++; } } if constexpr (std::is_same_v) { for (int i = 0; outIndex < outCount; i += _decim) { - out.data[outIndex].i = 0; - out.data[outIndex].q = 0; + out.writeBuf[outIndex].i = 0; + out.writeBuf[outIndex].q = 0; for (int j = i % _interp; j < tapCount; j += _interp) { - out.data[outIndex].i += buffer[((i - j) / _interp) + tapCount].i * taps[j]; - out.data[outIndex].q += buffer[((i - j) / _interp) + tapCount].q * taps[j]; + out.writeBuf[outIndex].i += buffer[((i - j) / _interp) + tapCount].i * taps[j]; + out.writeBuf[outIndex].q += buffer[((i - j) / _interp) + tapCount].q * taps[j]; } outIndex++; } } - out.write(outCount); + if (!out.swap(outCount)) { return -1; } memmove(buffer, &buffer[count], tapCount * sizeof(T)); diff --git a/core/src/dsp/routing.h b/core/src/dsp/routing.h index 78815335..a1e821b0 100644 --- a/core/src/dsp/routing.h +++ b/core/src/dsp/routing.h @@ -51,9 +51,8 @@ namespace dsp { int count = _in->read(); if (count < 0) { return -1; } for (const auto& stream : out) { - if (stream->aquire() < 0) { return -1; } - memcpy(stream->data, _in->data, count * sizeof(T)); - stream->write(count); + memcpy(stream->writeBuf, _in->readBuf, count * sizeof(T)); + if (!stream->swap(count)) { return -1; } } _in->flush(); return count; @@ -115,7 +114,7 @@ namespace dsp { int run() { int count = _in->read(); if (count < 0) { return -1; } - ringBuf.write(_in->data, count); + ringBuf.write(_in->readBuf, count); _in->flush(); return count; } @@ -172,9 +171,8 @@ namespace dsp { } } if (ringBuf.readAndSkip(start, readCount, skip) < 0) { break; }; - if (out.aquire() < 0) { break; } - memcpy(out.data, buf, _keep * sizeof(complex_t)); - out.write(_keep); + memcpy(out.writeBuf, buf, _keep * sizeof(complex_t)); + if (!out.swap(_keep)) { break; } } delete[] buf; } diff --git a/core/src/dsp/sink.h b/core/src/dsp/sink.h index c7e0ac44..b9890a37 100644 --- a/core/src/dsp/sink.h +++ b/core/src/dsp/sink.h @@ -39,7 +39,7 @@ namespace dsp { int run() { count = _in->read(); if (count < 0) { return -1; } - _handler(_in->data, count, _ctx); + _handler(_in->readBuf, count, _ctx); _in->flush(); return count; } @@ -79,7 +79,7 @@ namespace dsp { int run() { count = _in->read(); if (count < 0) { return -1; } - if (data.write(_in->data, count) < 0) { return -1; } + if (data.write(_in->readBuf, count) < 0) { return -1; } _in->flush(); return count; } diff --git a/core/src/dsp/source.h b/core/src/dsp/source.h index 75dd38a1..b973f7b6 100644 --- a/core/src/dsp/source.h +++ b/core/src/dsp/source.h @@ -55,9 +55,8 @@ namespace dsp { } int run() { - if (out.aquire() < 0) { return -1; } - volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.data, zeroPhase, phaseDelta, &phase, _blockSize); - out.write(_blockSize); + volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.writeBuf, zeroPhase, phaseDelta, &phase, _blockSize); + if(!out.swap(_blockSize)) { return -1; } return _blockSize; } diff --git a/core/src/dsp/stream.h b/core/src/dsp/stream.h index 96fae88f..ca021649 100644 --- a/core/src/dsp/stream.h +++ b/core/src/dsp/stream.h @@ -9,100 +9,122 @@ namespace dsp { class untyped_steam { public: - virtual int aquire() { return -1; } - virtual void write(int size) {} + virtual bool swap(int size) { return false; } virtual int read() { return -1; } virtual void flush() {} - virtual void stopReader() {} - virtual void clearReadStop() {} virtual void stopWriter() {} virtual void clearWriteStop() {} + virtual void stopReader() {} + virtual void clearReadStop() {} }; template class stream : public untyped_steam { public: stream() { - data = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment()); + writeBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment()); + readBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment()); } - int aquire() { - waitReady(); - if (writerStop) { - return -1; - } - return 0; + ~stream() { + volk_free(writeBuf); + volk_free(readBuf); } - void write(int size) { + bool swap(int size) { { - std::lock_guard lck(sigMtx); - contentSize = size; + // Wait to either swap or stop + std::unique_lock lck(swapMtx); + swapCV.wait(lck, [this]{ return (canSwap || writerStop); }); + + // If writer was stopped, abandon operation + if (writerStop) { + writerStop = false; + return false; + } + + // Swap buffers + dataSize = size; + T* temp = writeBuf; + writeBuf = readBuf; + readBuf = temp; + canSwap = false; + } + + // Notify reader that some data is ready + { + std::lock_guard lck(rdyMtx); dataReady = true; } - cv.notify_one(); + rdyCV.notify_all(); + + return true; } int read() { - waitData(); + // Wait for data to be ready or to be stopped + std::unique_lock lck(rdyMtx); + rdyCV.wait(lck, [this]{ return (dataReady || readerStop); }); + + // If stopped, abort if (readerStop) { + readerStop = false; return -1; } - return contentSize; + + dataReady = false; + + return dataSize; } void flush() { + // Notify writer that buffers can be swapped { - std::lock_guard lck(sigMtx); - dataReady = false; + std::lock_guard lck(swapMtx); + canSwap = true; } - cv.notify_one(); - } - - void stopReader() { - { - std::lock_guard lck(sigMtx); - readerStop = true; - } - cv.notify_one(); - } - - void clearReadStop() { - readerStop = false; + swapCV.notify_all(); } void stopWriter() { { - std::lock_guard lck(sigMtx); + std::lock_guard lck(swapMtx); writerStop = true; } - cv.notify_one(); + swapCV.notify_all(); } void clearWriteStop() { writerStop = false; } - T* data; + void stopReader() { + { + std::lock_guard lck(rdyMtx); + readerStop = true; + } + rdyCV.notify_all(); + } + + void clearReadStop() { + readerStop = false; + } + + T* writeBuf; + T* readBuf; private: - void waitReady() { - std::unique_lock lck(sigMtx); - cv.wait(lck, [this]{ return (!dataReady || writerStop); }); - } + std::mutex swapMtx; + std::condition_variable swapCV; + bool canSwap = true; - void waitData() { - std::unique_lock lck(sigMtx); - cv.wait(lck, [this]{ return (dataReady || readerStop); }); - } - - std::mutex sigMtx; - std::condition_variable cv; + std::mutex rdyMtx; + std::condition_variable rdyCV; bool dataReady = false; bool readerStop = false; bool writerStop = false; - int contentSize = 0; + int dataSize = 0; }; } \ No newline at end of file diff --git a/plutosdr_source/src/main.cpp b/plutosdr_source/src/main.cpp index e4e0f5ae..b316ba55 100644 --- a/plutosdr_source/src/main.cpp +++ b/plutosdr_source/src/main.cpp @@ -229,12 +229,11 @@ private: int16_t* buf = (int16_t*)iio_buffer_first(rxbuf, rx0_i); - if (_this->stream.aquire() < 0) { break; } for (int i = 0; i < blockSize; i++) { - _this->stream.data[i].q = (float)buf[i * 2] / 32768.0f; - _this->stream.data[i].i = (float)buf[(i * 2) + 1] / 32768.0f; + _this->stream.writeBuf[i].q = (float)buf[i * 2] / 32768.0f; + _this->stream.writeBuf[i].i = (float)buf[(i * 2) + 1] / 32768.0f; } - _this->stream.write(blockSize); + if (!_this->stream.swap(blockSize)) { break; }; } iio_buffer_destroy(rxbuf); diff --git a/recorder/src/main.cpp b/recorder/src/main.cpp index 9cbd2aad..efff67b7 100644 --- a/recorder/src/main.cpp +++ b/recorder/src/main.cpp @@ -254,8 +254,8 @@ private: int count = _this->audioStream->read(); if (count < 0) { break; } for (int i = 0; i < count; i++) { - sampleBuf[(i * 2) + 0] = _this->audioStream->data[i].l * 0x7FFF; - sampleBuf[(i * 2) + 1] = _this->audioStream->data[i].r * 0x7FFF; + sampleBuf[(i * 2) + 0] = _this->audioStream->readBuf[i].l * 0x7FFF; + sampleBuf[(i * 2) + 1] = _this->audioStream->readBuf[i].r * 0x7FFF; } _this->audioStream->flush(); _this->samplesWritten += count; @@ -270,8 +270,8 @@ private: int count = _this->iqStream->read(); if (count < 0) { break; } for (int i = 0; i < count; i++) { - sampleBuf[(i * 2) + 0] = _this->iqStream->data[i].q * 0x7FFF; - sampleBuf[(i * 2) + 1] = _this->iqStream->data[i].i * 0x7FFF; + sampleBuf[(i * 2) + 0] = _this->iqStream->readBuf[i].q * 0x7FFF; + sampleBuf[(i * 2) + 1] = _this->iqStream->readBuf[i].i * 0x7FFF; } _this->iqStream->flush(); _this->samplesWritten += count; diff --git a/rtl_tcp_source/src/main.cpp b/rtl_tcp_source/src/main.cpp index 3936ed82..1aa71896 100644 --- a/rtl_tcp_source/src/main.cpp +++ b/rtl_tcp_source/src/main.cpp @@ -218,12 +218,11 @@ private: while (true) { // Read samples here _this->client.receiveData(inBuf, blockSize * 2); - if (_this->stream.aquire() < 0) { break; } for (int i = 0; i < blockSize; i++) { - _this->stream.data[i].q = ((double)inBuf[i * 2] - 128.0) / 128.0; - _this->stream.data[i].i = ((double)inBuf[(i * 2) + 1] - 128.0) / 128.0; + _this->stream.writeBuf[i].q = ((double)inBuf[i * 2] - 128.0) / 128.0; + _this->stream.writeBuf[i].i = ((double)inBuf[(i * 2) + 1] - 128.0) / 128.0; } - _this->stream.write(blockSize); + if (!_this->stream.swap(blockSize)) { break; }; } delete[] inBuf; diff --git a/soapy_source/src/main.cpp b/soapy_source/src/main.cpp index f6237e99..d88d0433 100644 --- a/soapy_source/src/main.cpp +++ b/soapy_source/src/main.cpp @@ -351,12 +351,11 @@ private: long long timeMs = 0; while (_this->running) { - if (_this->stream.aquire() < 0) { break; } - int res = _this->dev->readStream(_this->devStream, (void**)&_this->stream.data, blockSize, flags, timeMs); + int res = _this->dev->readStream(_this->devStream, (void**)&_this->stream.writeBuf, blockSize, flags, timeMs); if (res < 1) { continue; } - _this->stream.write(res); + if (!_this->stream.swap(res)) { return; } } }