switched all streams to double buffering

This commit is contained in:
Ryzerth 2020-12-25 16:58:07 +01:00
parent 450896b122
commit 42bc2d01f7
17 changed files with 158 additions and 171 deletions

View File

@ -262,11 +262,8 @@ private:
static int callback(airspyhf_transfer_t* transfer) {
AirspyHFSourceModule* _this = (AirspyHFSourceModule*)transfer->ctx;
if (_this->stream.aquire() < 0) {
return -1;
}
memcpy(_this->stream.data, transfer->samples, transfer->sample_count * sizeof(dsp::complex_t));
_this->stream.write(transfer->sample_count);
memcpy(_this->stream.writeBuf, transfer->samples, transfer->sample_count * sizeof(dsp::complex_t));
if (!_this->stream.swap(transfer->sample_count)) { return -1; }
return 0;
}

View File

@ -29,14 +29,13 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
for (int i = 0; i < count; i++) {
out.data[i].l = _in->data[i];
out.data[i].r = _in->data[i];
out.writeBuf[i].l = _in->readBuf[i];
out.writeBuf[i].r = _in->readBuf[i];
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -75,13 +74,12 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
for (int i = 0; i < count; i++) {
out.data[i] = (_in->data[i].l + _in->data[i].r) / 2.0f;
out.writeBuf[i] = (_in->readBuf[i].l + _in->readBuf[i].r) / 2.0f;
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}

View File

@ -6,6 +6,8 @@
#include <vector>
#include <algorithm>
#include <spdlog/spdlog.h>
#define FL_M_PI 3.1415926535f
namespace dsp {
@ -77,10 +79,10 @@ namespace dsp {
}
virtual void doStop() {
for (auto const& in : inputs) {
for (auto& in : inputs) {
in->stopReader();
}
for (auto const& out : outputs) {
for (auto& out : outputs) {
out->stopWriter();
}
@ -88,11 +90,11 @@ namespace dsp {
if (workerThread.joinable()) {
workerThread.join();
}
for (auto const& in : inputs) {
for (auto& in : inputs) {
in->clearReadStop();
}
for (auto const& out : outputs) {
for (auto& out : outputs) {
out->clearWriteStop();
}
}

View File

@ -31,11 +31,10 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
memcpy(out.data, _in->data, count * sizeof(complex_t));
memcpy(out.writeBuf, _in->readBuf, count * sizeof(complex_t));
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -75,11 +74,10 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
volk_32fc_deinterleave_real_32f(out.data, (lv_32fc_t*)_in->data, count);
volk_32fc_deinterleave_real_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count);
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -119,11 +117,10 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
volk_32fc_deinterleave_imag_32f(out.data, (lv_32fc_t*)_in->data, count);
volk_32fc_deinterleave_imag_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count);
_in->flush();
out.write(count);
if(!out.swap(count)) { return -1; }
return count;
}

View File

@ -83,19 +83,17 @@ namespace dsp {
// This is somehow faster than volk...
float diff, currentPhase;
if (out.aquire() < 0) { return -1; }
for (int i = 0; i < count; i++) {
currentPhase = fast_arctan2(_in->data[i].i, _in->data[i].q);
currentPhase = fast_arctan2(_in->readBuf[i].i, _in->readBuf[i].q);
diff = currentPhase - phase;
if (diff > 3.1415926535f) { diff -= 2 * 3.1415926535f; }
else if (diff <= -3.1415926535f) { diff += 2 * 3.1415926535f; }
out.data[i] = diff / phasorSpeed;
out.writeBuf[i] = diff / phasorSpeed;
phase = currentPhase;
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -135,19 +133,18 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
volk_32fc_magnitude_32f(out.data, (lv_32fc_t*)_in->data, count);
volk_32fc_magnitude_32f(out.writeBuf, (lv_32fc_t*)_in->readBuf, count);
_in->flush();
volk_32f_accumulator_s32f(&avg, out.data, count);
volk_32f_accumulator_s32f(&avg, out.writeBuf, count);
avg /= (float)count;
for (int i = 0; i < count; i++) {
out.data[i] -= avg;
out.writeBuf[i] -= avg;
}
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -259,12 +256,11 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
volk_32fc_s32fc_x2_rotator_32fc(buffer, (lv_32fc_t*)_in->data, phaseDelta, &phase, count);
volk_32fc_deinterleave_real_32f(out.data, buffer, count);
volk_32fc_s32fc_x2_rotator_32fc(buffer, (lv_32fc_t*)_in->readBuf, phaseDelta, &phase, count);
volk_32fc_deinterleave_real_32f(out.writeBuf, buffer, count);
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}

View File

@ -52,24 +52,21 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
memcpy(bufStart, _in->data, count * sizeof(T));
memcpy(bufStart, _in->readBuf, count * sizeof(T));
_in->flush();
// Write to output
if (out.aquire() < 0) { return -1; }
if constexpr (std::is_same_v<T, float>) {
for (int i = 0; i < count; i++) {
volk_32f_x2_dot_prod_32f((float*)&out.data[i], (float*)&buffer[i+1], taps, tapCount);
volk_32f_x2_dot_prod_32f((float*)&out.writeBuf[i], (float*)&buffer[i+1], taps, tapCount);
}
}
if constexpr (std::is_same_v<T, complex_t>) {
for (int i = 0; i < count; i++) {
volk_32fc_32f_dot_prod_32fc((lv_32fc_t*)&out.data[i], (lv_32fc_t*)&buffer[i+1], taps, tapCount);
volk_32fc_32f_dot_prod_32fc((lv_32fc_t*)&out.writeBuf[i], (lv_32fc_t*)&buffer[i+1], taps, tapCount);
}
}
out.write(count);
if (!out.swap(count)) { return -1; }
memmove(buffer, &buffer[count], tapCount * sizeof(T));
@ -135,25 +132,23 @@ namespace dsp {
if (count < 0) { return -1; }
if (bypass) {
if (out.aquire() < 0) { return -1; }
memcpy(out.data, _in->data, count * sizeof(float));
memcpy(out.writeBuf, _in->readBuf, count * sizeof(float));
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
if (isnan(lastOut)) {
lastOut = 0.0f;
}
if (out.aquire() < 0) { return -1; }
out.data[0] = (alpha * _in->data[0]) + ((1-alpha) * lastOut);
out.writeBuf[0] = (alpha * _in->readBuf[0]) + ((1-alpha) * lastOut);
for (int i = 1; i < count; i++) {
out.data[i] = (alpha * _in->data[i]) + ((1 - alpha) * out.data[i - 1]);
out.writeBuf[i] = (alpha * _in->readBuf[i]) + ((1 - alpha) * out.writeBuf[i - 1]);
}
lastOut = out.data[count - 1];
lastOut = out.writeBuf[count - 1];
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}

View File

@ -31,17 +31,16 @@ namespace dsp {
return 0;
}
if (out.aquire() < 0) { return -1; }
if constexpr (std::is_same_v<T, complex_t> || std::is_same_v<T, stereo_t>) {
volk_32fc_x2_add_32fc(out.data, _a->data, _b->data, a_count);
volk_32fc_x2_add_32fc(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
}
else {
volk_32f_x2_add_32f(out.data, _a->data, _b->data, a_count);
volk_32f_x2_add_32f(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
}
_a->flush();
_b->flush();
out.write(a_count);
if (!out.swap(a_count)) { return -1; }
return a_count;
}
@ -82,17 +81,16 @@ namespace dsp {
return 0;
}
if (out.aquire() < 0) { return -1; }
if constexpr (std::is_same_v<T, complex_t>) {
volk_32fc_x2_multiply_32fc(out.data, _a->data, _b->data, a_count);
volk_32fc_x2_multiply_32fc(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
}
else {
volk_32f_x2_multiply_32f(out.data, _a->data, _b->data, a_count);
volk_32f_x2_multiply_32f(out.writeBuf, _a->readBuf, _b->readBuf, a_count);
}
_a->flush();
_b->flush();
out.write(a_count);
if (!out.swap(a_count)) { return -1; }
return a_count;
}

View File

@ -60,18 +60,16 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
// TODO: Do float xlation
if constexpr (std::is_same_v<T, float>) {
spdlog::error("XLATOR NOT IMPLEMENTED FOR FLOAT");
}
if constexpr (std::is_same_v<T, complex_t>) {
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.data, (lv_32fc_t*)_in->data, phaseDelta, &phase, count);
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.writeBuf, (lv_32fc_t*)_in->readBuf, phaseDelta, &phase, count);
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -115,17 +113,13 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
for (int i = 0; i < count; i++) {
level = (fabsf(_in->data[i]) * _ratio) + (level * (1.0f - _ratio));
out.data[i] = _in->data[i] / level;
level = (fabsf(_in->readBuf[i]) * _ratio) + (level * (1.0f - _ratio));
out.writeBuf[i] = _in->readBuf[i] / level;
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -185,27 +179,25 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
if (_muted) {
if constexpr (std::is_same_v<T, stereo_t>) {
memset(out.data, 0, sizeof(stereo_t) * count);
memset(out.writeBuf, 0, sizeof(stereo_t) * count);
}
else {
memset(out.data, 0, sizeof(float) * count);
memset(out.writeBuf, 0, sizeof(float) * count);
}
}
else {
if constexpr (std::is_same_v<T, stereo_t>) {
volk_32f_s32f_multiply_32f((float*)out.data, (float*)_in->data, level, count * 2);
volk_32f_s32f_multiply_32f((float*)out.writeBuf, (float*)_in->readBuf, level, count * 2);
}
else {
volk_32f_s32f_multiply_32f((float*)out.data, (float*)_in->data, level, count);
volk_32f_s32f_multiply_32f((float*)out.writeBuf, (float*)_in->readBuf, level, count);
}
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}
@ -260,21 +252,20 @@ namespace dsp {
count = _in->read();
if (count < 0) { return -1; }
if (out.aquire() < 0) { return -1; }
float sum = 0.0f;
volk_32fc_magnitude_32f(normBuffer, (lv_32fc_t*)_in->data, count);
volk_32fc_magnitude_32f(normBuffer, (lv_32fc_t*)_in->readBuf, count);
volk_32f_accumulator_s32f(&sum, normBuffer, count);
sum /= (float)count;
if (10.0f * log10f(sum) >= _level) {
memcpy(out.data, _in->data, count * sizeof(complex_t));
memcpy(out.writeBuf, _in->readBuf, count * sizeof(complex_t));
}
else {
memset(out.data, 0, count * sizeof(complex_t));
memset(out.writeBuf, 0, count * sizeof(complex_t));
}
_in->flush();
out.write(count);
if (!out.swap(count)) { return -1; }
return count;
}

View File

@ -100,35 +100,32 @@ namespace dsp {
int outCount = calcOutSize(count);
memcpy(&buffer[tapCount], _in->data, count * sizeof(T));
memcpy(&buffer[tapCount], _in->readBuf, count * sizeof(T));
_in->flush();
// Write to output
if (out.aquire() < 0) {
return -1;
}
int outIndex = 0;
if constexpr (std::is_same_v<T, float>) {
for (int i = 0; outIndex < outCount; i += _decim) {
out.data[outIndex] = 0;
out.writeBuf[outIndex] = 0;
for (int j = i % _interp; j < tapCount; j += _interp) {
out.data[outIndex] += buffer[((i - j) / _interp) + tapCount] * taps[j];
out.writeBuf[outIndex] += buffer[((i - j) / _interp) + tapCount] * taps[j];
}
outIndex++;
}
}
if constexpr (std::is_same_v<T, complex_t>) {
for (int i = 0; outIndex < outCount; i += _decim) {
out.data[outIndex].i = 0;
out.data[outIndex].q = 0;
out.writeBuf[outIndex].i = 0;
out.writeBuf[outIndex].q = 0;
for (int j = i % _interp; j < tapCount; j += _interp) {
out.data[outIndex].i += buffer[((i - j) / _interp) + tapCount].i * taps[j];
out.data[outIndex].q += buffer[((i - j) / _interp) + tapCount].q * taps[j];
out.writeBuf[outIndex].i += buffer[((i - j) / _interp) + tapCount].i * taps[j];
out.writeBuf[outIndex].q += buffer[((i - j) / _interp) + tapCount].q * taps[j];
}
outIndex++;
}
}
out.write(outCount);
if (!out.swap(outCount)) { return -1; }
memmove(buffer, &buffer[count], tapCount * sizeof(T));

View File

@ -51,9 +51,8 @@ namespace dsp {
int count = _in->read();
if (count < 0) { return -1; }
for (const auto& stream : out) {
if (stream->aquire() < 0) { return -1; }
memcpy(stream->data, _in->data, count * sizeof(T));
stream->write(count);
memcpy(stream->writeBuf, _in->readBuf, count * sizeof(T));
if (!stream->swap(count)) { return -1; }
}
_in->flush();
return count;
@ -115,7 +114,7 @@ namespace dsp {
int run() {
int count = _in->read();
if (count < 0) { return -1; }
ringBuf.write(_in->data, count);
ringBuf.write(_in->readBuf, count);
_in->flush();
return count;
}
@ -172,9 +171,8 @@ namespace dsp {
}
}
if (ringBuf.readAndSkip(start, readCount, skip) < 0) { break; };
if (out.aquire() < 0) { break; }
memcpy(out.data, buf, _keep * sizeof(complex_t));
out.write(_keep);
memcpy(out.writeBuf, buf, _keep * sizeof(complex_t));
if (!out.swap(_keep)) { break; }
}
delete[] buf;
}

View File

@ -39,7 +39,7 @@ namespace dsp {
int run() {
count = _in->read();
if (count < 0) { return -1; }
_handler(_in->data, count, _ctx);
_handler(_in->readBuf, count, _ctx);
_in->flush();
return count;
}
@ -79,7 +79,7 @@ namespace dsp {
int run() {
count = _in->read();
if (count < 0) { return -1; }
if (data.write(_in->data, count) < 0) { return -1; }
if (data.write(_in->readBuf, count) < 0) { return -1; }
_in->flush();
return count;
}

View File

@ -55,9 +55,8 @@ namespace dsp {
}
int run() {
if (out.aquire() < 0) { return -1; }
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.data, zeroPhase, phaseDelta, &phase, _blockSize);
out.write(_blockSize);
volk_32fc_s32fc_x2_rotator_32fc((lv_32fc_t*)out.writeBuf, zeroPhase, phaseDelta, &phase, _blockSize);
if(!out.swap(_blockSize)) { return -1; }
return _blockSize;
}

View File

@ -9,100 +9,122 @@
namespace dsp {
class untyped_steam {
public:
virtual int aquire() { return -1; }
virtual void write(int size) {}
virtual bool swap(int size) { return false; }
virtual int read() { return -1; }
virtual void flush() {}
virtual void stopReader() {}
virtual void clearReadStop() {}
virtual void stopWriter() {}
virtual void clearWriteStop() {}
virtual void stopReader() {}
virtual void clearReadStop() {}
};
template <class T>
class stream : public untyped_steam {
public:
stream() {
data = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
writeBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
readBuf = (T*)volk_malloc(STREAM_BUFFER_SIZE * sizeof(T), volk_get_alignment());
}
int aquire() {
waitReady();
if (writerStop) {
return -1;
}
return 0;
~stream() {
volk_free(writeBuf);
volk_free(readBuf);
}
void write(int size) {
bool swap(int size) {
{
std::lock_guard<std::mutex> lck(sigMtx);
contentSize = size;
// Wait to either swap or stop
std::unique_lock<std::mutex> lck(swapMtx);
swapCV.wait(lck, [this]{ return (canSwap || writerStop); });
// If writer was stopped, abandon operation
if (writerStop) {
writerStop = false;
return false;
}
// Swap buffers
dataSize = size;
T* temp = writeBuf;
writeBuf = readBuf;
readBuf = temp;
canSwap = false;
}
// Notify reader that some data is ready
{
std::lock_guard<std::mutex> lck(rdyMtx);
dataReady = true;
}
cv.notify_one();
rdyCV.notify_all();
return true;
}
int read() {
waitData();
// Wait for data to be ready or to be stopped
std::unique_lock<std::mutex> lck(rdyMtx);
rdyCV.wait(lck, [this]{ return (dataReady || readerStop); });
// If stopped, abort
if (readerStop) {
readerStop = false;
return -1;
}
return contentSize;
dataReady = false;
return dataSize;
}
void flush() {
// Notify writer that buffers can be swapped
{
std::lock_guard<std::mutex> lck(sigMtx);
dataReady = false;
std::lock_guard<std::mutex> lck(swapMtx);
canSwap = true;
}
cv.notify_one();
}
void stopReader() {
{
std::lock_guard<std::mutex> lck(sigMtx);
readerStop = true;
}
cv.notify_one();
}
void clearReadStop() {
readerStop = false;
swapCV.notify_all();
}
void stopWriter() {
{
std::lock_guard<std::mutex> lck(sigMtx);
std::lock_guard<std::mutex> lck(swapMtx);
writerStop = true;
}
cv.notify_one();
swapCV.notify_all();
}
void clearWriteStop() {
writerStop = false;
}
T* data;
void stopReader() {
{
std::lock_guard<std::mutex> lck(rdyMtx);
readerStop = true;
}
rdyCV.notify_all();
}
void clearReadStop() {
readerStop = false;
}
T* writeBuf;
T* readBuf;
private:
void waitReady() {
std::unique_lock<std::mutex> lck(sigMtx);
cv.wait(lck, [this]{ return (!dataReady || writerStop); });
}
std::mutex swapMtx;
std::condition_variable swapCV;
bool canSwap = true;
void waitData() {
std::unique_lock<std::mutex> lck(sigMtx);
cv.wait(lck, [this]{ return (dataReady || readerStop); });
}
std::mutex sigMtx;
std::condition_variable cv;
std::mutex rdyMtx;
std::condition_variable rdyCV;
bool dataReady = false;
bool readerStop = false;
bool writerStop = false;
int contentSize = 0;
int dataSize = 0;
};
}

View File

@ -229,12 +229,11 @@ private:
int16_t* buf = (int16_t*)iio_buffer_first(rxbuf, rx0_i);
if (_this->stream.aquire() < 0) { break; }
for (int i = 0; i < blockSize; i++) {
_this->stream.data[i].q = (float)buf[i * 2] / 32768.0f;
_this->stream.data[i].i = (float)buf[(i * 2) + 1] / 32768.0f;
_this->stream.writeBuf[i].q = (float)buf[i * 2] / 32768.0f;
_this->stream.writeBuf[i].i = (float)buf[(i * 2) + 1] / 32768.0f;
}
_this->stream.write(blockSize);
if (!_this->stream.swap(blockSize)) { break; };
}
iio_buffer_destroy(rxbuf);

View File

@ -254,8 +254,8 @@ private:
int count = _this->audioStream->read();
if (count < 0) { break; }
for (int i = 0; i < count; i++) {
sampleBuf[(i * 2) + 0] = _this->audioStream->data[i].l * 0x7FFF;
sampleBuf[(i * 2) + 1] = _this->audioStream->data[i].r * 0x7FFF;
sampleBuf[(i * 2) + 0] = _this->audioStream->readBuf[i].l * 0x7FFF;
sampleBuf[(i * 2) + 1] = _this->audioStream->readBuf[i].r * 0x7FFF;
}
_this->audioStream->flush();
_this->samplesWritten += count;
@ -270,8 +270,8 @@ private:
int count = _this->iqStream->read();
if (count < 0) { break; }
for (int i = 0; i < count; i++) {
sampleBuf[(i * 2) + 0] = _this->iqStream->data[i].q * 0x7FFF;
sampleBuf[(i * 2) + 1] = _this->iqStream->data[i].i * 0x7FFF;
sampleBuf[(i * 2) + 0] = _this->iqStream->readBuf[i].q * 0x7FFF;
sampleBuf[(i * 2) + 1] = _this->iqStream->readBuf[i].i * 0x7FFF;
}
_this->iqStream->flush();
_this->samplesWritten += count;

View File

@ -218,12 +218,11 @@ private:
while (true) {
// Read samples here
_this->client.receiveData(inBuf, blockSize * 2);
if (_this->stream.aquire() < 0) { break; }
for (int i = 0; i < blockSize; i++) {
_this->stream.data[i].q = ((double)inBuf[i * 2] - 128.0) / 128.0;
_this->stream.data[i].i = ((double)inBuf[(i * 2) + 1] - 128.0) / 128.0;
_this->stream.writeBuf[i].q = ((double)inBuf[i * 2] - 128.0) / 128.0;
_this->stream.writeBuf[i].i = ((double)inBuf[(i * 2) + 1] - 128.0) / 128.0;
}
_this->stream.write(blockSize);
if (!_this->stream.swap(blockSize)) { break; };
}
delete[] inBuf;

View File

@ -351,12 +351,11 @@ private:
long long timeMs = 0;
while (_this->running) {
if (_this->stream.aquire() < 0) { break; }
int res = _this->dev->readStream(_this->devStream, (void**)&_this->stream.data, blockSize, flags, timeMs);
int res = _this->dev->readStream(_this->devStream, (void**)&_this->stream.writeBuf, blockSize, flags, timeMs);
if (res < 1) {
continue;
}
_this->stream.write(res);
if (!_this->stream.swap(res)) { return; }
}
}