mirror of
https://github.com/AlexandreRouma/SDRPlusPlus.git
synced 2024-11-10 12:47:40 +01:00
Merge pull request #48 from AlexandreRouma/double_bufferd_streams
switched all streams to double buffering
This commit is contained in:
commit
bfe15aff19
@ -262,11 +262,8 @@ private:
|
||||
|
||||
static int callback(airspyhf_transfer_t* transfer) {
|
||||
AirspyHFSourceModule* _this = (AirspyHFSourceModule*)transfer->ctx;
|
||||
if (_this->stream.aquire() < 0) {
|
||||
return -1;
|
||||
}
|
||||
memcpy(_this->stream.data, transfer->samples, transfer->sample_count * sizeof(dsp::complex_t));
|
||||
_this->stream.write(transfer->sample_count);
|
||||
memcpy(_this->stream.writeBuf, transfer->samples, transfer->sample_count * sizeof(dsp::complex_t));
|
||||
if (!_this->stream.swap(transfer->sample_count)) { return -1; }
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -29,14 +29,13 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
for (int i = 0; i < count; i++) {
|
||||
out.data[i].l = _in->data[i];
|
||||
out.data[i].r = _in->data[i];
|
||||
out.writeBuf[i].l = _in->readBuf[i];
|
||||
out.writeBuf[i].r = _in->readBuf[i];
|
||||
}
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -75,13 +74,12 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
for (int i = 0; i < count; i++) {
|
||||
out.data[i] = (_in->data[i].l + _in->data[i].r) / 2.0f;
|
||||
out.writeBuf[i] = (_in->readBuf[i].l + _in->readBuf[i].r) / 2.0f;
|
||||
}
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
|
||||
#define FL_M_PI 3.1415926535f
|
||||
|
||||
namespace dsp {
|
||||
@ -77,10 +79,10 @@ namespace dsp {
|
||||
}
|
||||
|
||||
virtual void doStop() {
|
||||
for (auto const& in : inputs) {
|
||||
for (auto& in : inputs) {
|
||||
in->stopReader();
|
||||
}
|
||||
for (auto const& out : outputs) {
|
||||
for (auto& out : outputs) {
|
||||
out->stopWriter();
|
||||
}
|
||||
|
||||
@ -89,10 +91,10 @@ namespace dsp {
|
||||
workerThread.join();
|
||||
}
|
||||
|
||||
for (auto const& in : inputs) {
|
||||
for (auto& in : inputs) {
|
||||
in->clearReadStop();
|
||||
}
|
||||
for (auto const& out : outputs) {
|
||||
for (auto& out : outputs) {
|
||||
out->clearWriteStop();
|
||||
}
|
||||
}
|
||||
|
@ -31,11 +31,10 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
memcpy(out.data, _in->data, count * sizeof(complex_t));
|
||||
memcpy(out.writeBuf, _in->readBuf, count * sizeof(complex_t));
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -75,11 +74,10 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
volk_32fc_deinterleave_real_32f(out.data, (lv_32fc_t*)_in->data, count);
|
||||
volk_32fc_deinterleave_real_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count);
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -119,11 +117,10 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
volk_32fc_deinterleave_imag_32f(out.data, (lv_32fc_t*)_in->data, count);
|
||||
volk_32fc_deinterleave_imag_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count);
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if(!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -83,19 +83,17 @@ namespace dsp {
|
||||
// This is somehow faster than volk...
|
||||
|
||||
float diff, currentPhase;
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
for (int i = 0; i < count; i++) {
|
||||
currentPhase = fast_arctan2(_in->data[i].i, _in->data[i].q);
|
||||
currentPhase = fast_arctan2(_in->readBuf[i].i, _in->readBuf[i].q);
|
||||
diff = currentPhase - phase;
|
||||
if (diff > 3.1415926535f) { diff -= 2 * 3.1415926535f; }
|
||||
else if (diff <= -3.1415926535f) { diff += 2 * 3.1415926535f; }
|
||||
out.data[i] = diff / phasorSpeed;
|
||||
out.writeBuf[i] = diff / phasorSpeed;
|
||||
phase = currentPhase;
|
||||
}
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -135,19 +133,18 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
volk_32fc_magnitude_32f(out.data, (lv_32fc_t*)_in->data, count);
|
||||
volk_32fc_magnitude_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count);
|
||||
|
||||
_in->flush();
|
||||
|
||||
volk_32f_accumulator_s32f(&avg, out.data, count);
|
||||
volk_32f_accumulator_s32f(&avg, out.writeBuf, count);
|
||||
avg /= (float)count;
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
out.data[i] -= avg;
|
||||
out.writeBuf[i] -= avg;
|
||||
}
|
||||
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -259,12 +256,11 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
volk_32fc_s32fc_x2_rotator_32fc(buffer, (lv_32fc_t*)_in->data, phaseDelta, &phase, count);
|
||||
volk_32fc_deinterleave_real_32f(out.data, buffer, count);
|
||||
volk_32fc_s32fc_x2_rotator_32fc(buffer, (lv_32fc_t*)_in->readBuf, phaseDelta, &phase, count);
|
||||
volk_32fc_deinterleave_real_32f(out.writeBuf, buffer, count);
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -52,24 +52,21 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
memcpy(bufStart, _in->data, count * sizeof(T));
|
||||
memcpy(bufStart, _in->readBuf, count * sizeof(T));
|
||||
_in->flush();
|
||||
|
||||
// Write to output
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
|
||||
if constexpr (std::is_same_v<T, float>) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
volk_32f_x2_dot_prod_32f((float*)&out.data[i], (float*)&buffer[i+1], taps, tapCount);
|
||||
volk_32f_x2_dot_prod_32f((float*)&out.writeBuf[i], (float*)&buffer[i+1], taps, tapCount);
|
||||
}
|
||||
}
|
||||
if constexpr (std::is_same_v<T, complex_t>) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
volk_32fc_32f_dot_prod_32fc((lv_32fc_t*)&out.data[i], (lv_32fc_t*)&buffer[i+1], taps, tapCount);
|
||||
volk_32fc_32f_dot_prod_32fc((lv_32fc_t*)&out.writeBuf[i], (lv_32fc_t*)&buffer[i+1], taps, tapCount);
|
||||
}
|
||||
}
|
||||
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
|
||||
memmove(buffer, &buffer[count], tapCount * sizeof(T));
|
||||
|
||||
@ -135,25 +132,23 @@ namespace dsp {
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (bypass) {
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
memcpy(out.data, _in->data, count * sizeof(float));
|
||||
memcpy(out.writeBuf, _in->readBuf, count * sizeof(float));
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
if (isnan(lastOut)) {
|
||||
lastOut = 0.0f;
|
||||
}
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
out.data[0] = (alpha * _in->data[0]) + ((1-alpha) * lastOut);
|
||||
out.writeBuf[0] = (alpha * _in->readBuf[0]) + ((1-alpha) * lastOut);
|
||||
for (int i = 1; i < count; i++) {
|
||||
out.data[i] = (alpha * _in->data[i]) + ((1 - alpha) * out.data[i - 1]);
|
||||
out.writeBuf[i] = (alpha * _in->readBuf[i]) + ((1 - alpha) * out.writeBuf[i - 1]);
|
||||
}
|
||||
lastOut = out.data[count - 1];
|
||||
lastOut = out.writeBuf[count - 1];
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -31,17 +31,16 @@ namespace dsp {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
if constexpr (std::is_same_v<T, complex_t> || std::is_same_v<T, stereo_t>) {
|
||||
volk_32fc_x2_add_32fc(out.data, _a->data, _b->data, a_count);
|
||||
volk_32fc_x2_add_32fc(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
|
||||
}
|
||||
else {
|
||||
volk_32f_x2_add_32f(out.data, _a->data, _b->data, a_count);
|
||||
volk_32f_x2_add_32f(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
|
||||
}
|
||||
|
||||
_a->flush();
|
||||
_b->flush();
|
||||
out.write(a_count);
|
||||
if (!out.swap(a_count)) { return -1; }
|
||||
return a_count;
|
||||
}
|
||||
|
||||
@ -82,17 +81,16 @@ namespace dsp {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
if constexpr (std::is_same_v<T, complex_t>) {
|
||||
volk_32fc_x2_multiply_32fc(out.data, _a->data, _b->data, a_count);
|
||||
volk_32fc_x2_multiply_32fc(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
|
||||
}
|
||||
else {
|
||||
volk_32f_x2_multiply_32f(out.data, _a->data, _b->data, a_count);
|
||||
volk_32f_x2_multiply_32f(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
|
||||
}
|
||||
|
||||
_a->flush();
|
||||
_b->flush();
|
||||
out.write(a_count);
|
||||
if (!out.swap(a_count)) { return -1; }
|
||||
return a_count;
|
||||
}
|
||||
|
||||
|
@ -60,18 +60,16 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
|
||||
// TODO: Do float xlation
|
||||
if constexpr (std::is_same_v<T, float>) {
|
||||
spdlog::error("XLATOR NOT IMPLEMENTED FOR FLOAT");
|
||||
}
|
||||
if constexpr (std::is_same_v<T, complex_t>) {
|
||||
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.data, (lv_32fc_t*)_in->data, phaseDelta, &phase, count);
|
||||
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.writeBuf, (lv_32fc_t*)_in->readBuf, phaseDelta, &phase, count);
|
||||
}
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -115,17 +113,13 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
level = (fabsf(_in->data[i]) * _ratio) + (level * (1.0f - _ratio));
|
||||
out.data[i] = _in->data[i] / level;
|
||||
level = (fabsf(_in->readBuf[i]) * _ratio) + (level * (1.0f - _ratio));
|
||||
out.writeBuf[i] = _in->readBuf[i] / level;
|
||||
}
|
||||
|
||||
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -185,27 +179,25 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
|
||||
if (_muted) {
|
||||
if constexpr (std::is_same_v<T, stereo_t>) {
|
||||
memset(out.data, 0, sizeof(stereo_t) * count);
|
||||
memset(out.writeBuf, 0, sizeof(stereo_t) * count);
|
||||
}
|
||||
else {
|
||||
memset(out.data, 0, sizeof(float) * count);
|
||||
memset(out.writeBuf, 0, sizeof(float) * count);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if constexpr (std::is_same_v<T, stereo_t>) {
|
||||
volk_32f_s32f_multiply_32f((float*)out.data, (float*)_in->data, level, count * 2);
|
||||
volk_32f_s32f_multiply_32f((float*)out.writeBuf, (float*)_in->readBuf, level, count * 2);
|
||||
}
|
||||
else {
|
||||
volk_32f_s32f_multiply_32f((float*)out.data, (float*)_in->data, level, count);
|
||||
volk_32f_s32f_multiply_32f((float*)out.writeBuf, (float*)_in->readBuf, level, count);
|
||||
}
|
||||
}
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -260,21 +252,20 @@ namespace dsp {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
float sum = 0.0f;
|
||||
volk_32fc_magnitude_32f(normBuffer, (lv_32fc_t*)_in->data, count);
|
||||
volk_32fc_magnitude_32f(normBuffer, (lv_32fc_t*)_in->readBuf, count);
|
||||
volk_32f_accumulator_s32f(&sum, normBuffer, count);
|
||||
sum /= (float)count;
|
||||
|
||||
if (10.0f * log10f(sum) >= _level) {
|
||||
memcpy(out.data, _in->data, count * sizeof(complex_t));
|
||||
memcpy(out.writeBuf, _in->readBuf, count * sizeof(complex_t));
|
||||
}
|
||||
else {
|
||||
memset(out.data, 0, count * sizeof(complex_t));
|
||||
memset(out.writeBuf, 0, count * sizeof(complex_t));
|
||||
}
|
||||
|
||||
_in->flush();
|
||||
out.write(count);
|
||||
if (!out.swap(count)) { return -1; }
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -100,35 +100,32 @@ namespace dsp {
|
||||
|
||||
int outCount = calcOutSize(count);
|
||||
|
||||
memcpy(&buffer[tapCount], _in->data, count * sizeof(T));
|
||||
memcpy(&buffer[tapCount], _in->readBuf, count * sizeof(T));
|
||||
_in->flush();
|
||||
|
||||
// Write to output
|
||||
if (out.aquire() < 0) {
|
||||
return -1;
|
||||
}
|
||||
int outIndex = 0;
|
||||
if constexpr (std::is_same_v<T, float>) {
|
||||
for (int i = 0; outIndex < outCount; i += _decim) {
|
||||
out.data[outIndex] = 0;
|
||||
out.writeBuf[outIndex] = 0;
|
||||
for (int j = i % _interp; j < tapCount; j += _interp) {
|
||||
out.data[outIndex] += buffer[((i - j) / _interp) + tapCount] * taps[j];
|
||||
out.writeBuf[outIndex] += buffer[((i - j) / _interp) + tapCount] * taps[j];
|
||||
}
|
||||
outIndex++;
|
||||
}
|
||||
}
|
||||
if constexpr (std::is_same_v<T, complex_t>) {
|
||||
for (int i = 0; outIndex < outCount; i += _decim) {
|
||||
out.data[outIndex].i = 0;
|
||||
out.data[outIndex].q = 0;
|
||||
out.writeBuf[outIndex].i = 0;
|
||||
out.writeBuf[outIndex].q = 0;
|
||||
for (int j = i % _interp; j < tapCount; j += _interp) {
|
||||
out.data[outIndex].i += buffer[((i - j) / _interp) + tapCount].i * taps[j];
|
||||
out.data[outIndex].q += buffer[((i - j) / _interp) + tapCount].q * taps[j];
|
||||
out.writeBuf[outIndex].i += buffer[((i - j) / _interp) + tapCount].i * taps[j];
|
||||
out.writeBuf[outIndex].q += buffer[((i - j) / _interp) + tapCount].q * taps[j];
|
||||
}
|
||||
outIndex++;
|
||||
}
|
||||
}
|
||||
out.write(outCount);
|
||||
if (!out.swap(outCount)) { return -1; }
|
||||
|
||||
memmove(buffer, &buffer[count], tapCount * sizeof(T));
|
||||
|
||||
|
@ -51,9 +51,8 @@ namespace dsp {
|
||||
int count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
for (const auto& stream : out) {
|
||||
if (stream->aquire() < 0) { return -1; }
|
||||
memcpy(stream->data, _in->data, count * sizeof(T));
|
||||
stream->write(count);
|
||||
memcpy(stream->writeBuf, _in->readBuf, count * sizeof(T));
|
||||
if (!stream->swap(count)) { return -1; }
|
||||
}
|
||||
_in->flush();
|
||||
return count;
|
||||
@ -115,7 +114,7 @@ namespace dsp {
|
||||
int run() {
|
||||
int count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
ringBuf.write(_in->data, count);
|
||||
ringBuf.write(_in->readBuf, count);
|
||||
_in->flush();
|
||||
return count;
|
||||
}
|
||||
@ -172,9 +171,8 @@ namespace dsp {
|
||||
}
|
||||
}
|
||||
if (ringBuf.readAndSkip(start, readCount, skip) < 0) { break; };
|
||||
if (out.aquire() < 0) { break; }
|
||||
memcpy(out.data, buf, _keep * sizeof(complex_t));
|
||||
out.write(_keep);
|
||||
memcpy(out.writeBuf, buf, _keep * sizeof(complex_t));
|
||||
if (!out.swap(_keep)) { break; }
|
||||
}
|
||||
delete[] buf;
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ namespace dsp {
|
||||
int run() {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
_handler(_in->data, count, _ctx);
|
||||
_handler(_in->readBuf, count, _ctx);
|
||||
_in->flush();
|
||||
return count;
|
||||
}
|
||||
@ -79,7 +79,7 @@ namespace dsp {
|
||||
int run() {
|
||||
count = _in->read();
|
||||
if (count < 0) { return -1; }
|
||||
if (data.write(_in->data, count) < 0) { return -1; }
|
||||
if (data.write(_in->readBuf, count) < 0) { return -1; }
|
||||
_in->flush();
|
||||
return count;
|
||||
}
|
||||
|
@ -55,9 +55,8 @@ namespace dsp {
|
||||
}
|
||||
|
||||
int run() {
|
||||
if (out.aquire() < 0) { return -1; }
|
||||
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.data, zeroPhase, phaseDelta, &phase, _blockSize);
|
||||
out.write(_blockSize);
|
||||
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.writeBuf, zeroPhase, phaseDelta, &phase, _blockSize);
|
||||
if(!out.swap(_blockSize)) { return -1; }
|
||||
return _blockSize;
|
||||
}
|
||||
|
||||
|
@ -9,100 +9,122 @@
|
||||
namespace dsp {
|
||||
class untyped_steam {
|
||||
public:
|
||||
virtual int aquire() { return -1; }
|
||||
virtual void write(int size) {}
|
||||
virtual bool swap(int size) { return false; }
|
||||
virtual int read() { return -1; }
|
||||
virtual void flush() {}
|
||||
virtual void stopReader() {}
|
||||
virtual void clearReadStop() {}
|
||||
virtual void stopWriter() {}
|
||||
virtual void clearWriteStop() {}
|
||||
virtual void stopReader() {}
|
||||
virtual void clearReadStop() {}
|
||||
};
|
||||
|
||||
template <class T>
|
||||
class stream : public untyped_steam {
|
||||
public:
|
||||
stream() {
|
||||
data = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
|
||||
writeBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
|
||||
readBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
|
||||
}
|
||||
|
||||
int aquire() {
|
||||
waitReady();
|
||||
if (writerStop) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
~stream() {
|
||||
volk_free(writeBuf);
|
||||
volk_free(readBuf);
|
||||
}
|
||||
|
||||
void write(int size) {
|
||||
bool swap(int size) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(sigMtx);
|
||||
contentSize = size;
|
||||
// Wait to either swap or stop
|
||||
std::unique_lock<std::mutex> lck(swapMtx);
|
||||
swapCV.wait(lck, [this]{ return (canSwap || writerStop); });
|
||||
|
||||
// If writer was stopped, abandon operation
|
||||
if (writerStop) {
|
||||
writerStop = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Swap buffers
|
||||
dataSize = size;
|
||||
T* temp = writeBuf;
|
||||
writeBuf = readBuf;
|
||||
readBuf = temp;
|
||||
canSwap = false;
|
||||
}
|
||||
|
||||
// Notify reader that some data is ready
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(rdyMtx);
|
||||
dataReady = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
rdyCV.notify_all();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int read() {
|
||||
waitData();
|
||||
// Wait for data to be ready or to be stopped
|
||||
std::unique_lock<std::mutex> lck(rdyMtx);
|
||||
rdyCV.wait(lck, [this]{ return (dataReady || readerStop); });
|
||||
|
||||
// If stopped, abort
|
||||
if (readerStop) {
|
||||
readerStop = false;
|
||||
return -1;
|
||||
}
|
||||
return contentSize;
|
||||
|
||||
dataReady = false;
|
||||
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
void flush() {
|
||||
// Notify writer that buffers can be swapped
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(sigMtx);
|
||||
dataReady = false;
|
||||
std::lock_guard<std::mutex> lck(swapMtx);
|
||||
canSwap = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
}
|
||||
|
||||
void stopReader() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(sigMtx);
|
||||
readerStop = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
}
|
||||
|
||||
void clearReadStop() {
|
||||
readerStop = false;
|
||||
swapCV.notify_all();
|
||||
}
|
||||
|
||||
void stopWriter() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(sigMtx);
|
||||
std::lock_guard<std::mutex> lck(swapMtx);
|
||||
writerStop = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
swapCV.notify_all();
|
||||
}
|
||||
|
||||
void clearWriteStop() {
|
||||
writerStop = false;
|
||||
}
|
||||
|
||||
T* data;
|
||||
void stopReader() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lck(rdyMtx);
|
||||
readerStop = true;
|
||||
}
|
||||
rdyCV.notify_all();
|
||||
}
|
||||
|
||||
void clearReadStop() {
|
||||
readerStop = false;
|
||||
}
|
||||
|
||||
T* writeBuf;
|
||||
T* readBuf;
|
||||
|
||||
private:
|
||||
void waitReady() {
|
||||
std::unique_lock<std::mutex> lck(sigMtx);
|
||||
cv.wait(lck, [this]{ return (!dataReady || writerStop); });
|
||||
}
|
||||
std::mutex swapMtx;
|
||||
std::condition_variable swapCV;
|
||||
bool canSwap = true;
|
||||
|
||||
void waitData() {
|
||||
std::unique_lock<std::mutex> lck(sigMtx);
|
||||
cv.wait(lck, [this]{ return (dataReady || readerStop); });
|
||||
}
|
||||
|
||||
std::mutex sigMtx;
|
||||
std::condition_variable cv;
|
||||
std::mutex rdyMtx;
|
||||
std::condition_variable rdyCV;
|
||||
bool dataReady = false;
|
||||
|
||||
bool readerStop = false;
|
||||
bool writerStop = false;
|
||||
|
||||
int contentSize = 0;
|
||||
int dataSize = 0;
|
||||
};
|
||||
}
|
@ -229,12 +229,11 @@ private:
|
||||
|
||||
int16_t* buf = (int16_t*)iio_buffer_first(rxbuf, rx0_i);
|
||||
|
||||
if (_this->stream.aquire() < 0) { break; }
|
||||
for (int i = 0; i < blockSize; i++) {
|
||||
_this->stream.data[i].q = (float)buf[i * 2] / 32768.0f;
|
||||
_this->stream.data[i].i = (float)buf[(i * 2) + 1] / 32768.0f;
|
||||
_this->stream.writeBuf[i].q = (float)buf[i * 2] / 32768.0f;
|
||||
_this->stream.writeBuf[i].i = (float)buf[(i * 2) + 1] / 32768.0f;
|
||||
}
|
||||
_this->stream.write(blockSize);
|
||||
if (!_this->stream.swap(blockSize)) { break; };
|
||||
}
|
||||
|
||||
iio_buffer_destroy(rxbuf);
|
||||
|
@ -254,8 +254,8 @@ private:
|
||||
int count = _this->audioStream->read();
|
||||
if (count < 0) { break; }
|
||||
for (int i = 0; i < count; i++) {
|
||||
sampleBuf[(i * 2) + 0] = _this->audioStream->data[i].l * 0x7FFF;
|
||||
sampleBuf[(i * 2) + 1] = _this->audioStream->data[i].r * 0x7FFF;
|
||||
sampleBuf[(i * 2) + 0] = _this->audioStream->readBuf[i].l * 0x7FFF;
|
||||
sampleBuf[(i * 2) + 1] = _this->audioStream->readBuf[i].r * 0x7FFF;
|
||||
}
|
||||
_this->audioStream->flush();
|
||||
_this->samplesWritten += count;
|
||||
@ -270,8 +270,8 @@ private:
|
||||
int count = _this->iqStream->read();
|
||||
if (count < 0) { break; }
|
||||
for (int i = 0; i < count; i++) {
|
||||
sampleBuf[(i * 2) + 0] = _this->iqStream->data[i].q * 0x7FFF;
|
||||
sampleBuf[(i * 2) + 1] = _this->iqStream->data[i].i * 0x7FFF;
|
||||
sampleBuf[(i * 2) + 0] = _this->iqStream->readBuf[i].q * 0x7FFF;
|
||||
sampleBuf[(i * 2) + 1] = _this->iqStream->readBuf[i].i * 0x7FFF;
|
||||
}
|
||||
_this->iqStream->flush();
|
||||
_this->samplesWritten += count;
|
||||
|
@ -218,12 +218,11 @@ private:
|
||||
while (true) {
|
||||
// Read samples here
|
||||
_this->client.receiveData(inBuf, blockSize * 2);
|
||||
if (_this->stream.aquire() < 0) { break; }
|
||||
for (int i = 0; i < blockSize; i++) {
|
||||
_this->stream.data[i].q = ((double)inBuf[i * 2] - 128.0) / 128.0;
|
||||
_this->stream.data[i].i = ((double)inBuf[(i * 2) + 1] - 128.0) / 128.0;
|
||||
_this->stream.writeBuf[i].q = ((double)inBuf[i * 2] - 128.0) / 128.0;
|
||||
_this->stream.writeBuf[i].i = ((double)inBuf[(i * 2) + 1] - 128.0) / 128.0;
|
||||
}
|
||||
_this->stream.write(blockSize);
|
||||
if (!_this->stream.swap(blockSize)) { break; };
|
||||
}
|
||||
|
||||
delete[] inBuf;
|
||||
|
@ -351,12 +351,11 @@ private:
|
||||
long long timeMs = 0;
|
||||
|
||||
while (_this->running) {
|
||||
if (_this->stream.aquire() < 0) { break; }
|
||||
int res = _this->dev->readStream(_this->devStream, (void**)&_this->stream.data, blockSize, flags, timeMs);
|
||||
int res = _this->dev->readStream(_this->devStream, (void**)&_this->stream.writeBuf, blockSize, flags, timeMs);
|
||||
if (res < 1) {
|
||||
continue;
|
||||
}
|
||||
_this->stream.write(res);
|
||||
if (!_this->stream.swap(res)) { return; }
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user