Merge pull request #13 from BatchDrake/feature/xfer-completion

lib: Fix race condition after cancellation of USB transfers
This commit is contained in:
rtlsdrblog 2023-05-10 18:12:00 +12:00 committed by GitHub
commit acb0b5cd2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -92,18 +92,56 @@ static const int fir_default[FIR_LEN] = {
101, 156, 215, 273, 327, 372, 404, 421 /* 12 bit signed */ 101, 156, 215, 273, 327, 372, 404, 421 /* 12 bit signed */
}; };
struct rtlsdr_dev;
/*
* RTL-SDR transfer context.
*
* Proper transfer cancellation is a stateful operation that involves
* delivering an asynchronous cancellation request and waiting for its
* completion. This is necessary to prevent a race condition during
* cleanup when the device is closed (especially in Win32 systems).
*
* The rtlsdr_xfer_ctx structure encapsulates a transfer state, including
* its buffer, its completion status and a pointer to the device it
* belongs to.
*/
struct rtlsdr_xfer_ctx {
struct rtlsdr_dev *dev;
struct libusb_transfer *xfer;
unsigned char *buffer;
int done;
int zero_copy;
};
typedef struct rtlsdr_xfer_ctx rtlsdr_xfer_ctx_t;
static rtlsdr_xfer_ctx_t *rtlsdr_xfer_ctx_new(struct rtlsdr_dev *dev);
static int rtlsdr_xfer_ctx_alloc_buffer(rtlsdr_xfer_ctx_t *self);
static void rtlsdr_xfer_ctx_free_buffer(rtlsdr_xfer_ctx_t *self);
static int rtlsdr_xfer_ctx_issue_transfer(rtlsdr_xfer_ctx_t *self);
static void LIBUSB_CALL _rtlsdr_xfer_libusb_callback(
struct libusb_transfer *xfer);
static void rtlsdr_xfer_ctx_destroy(rtlsdr_xfer_ctx_t *self);
#define rtlsdr_xfer_ctx_status(self) \
((self)->xfer->status)
/* Friend of rtlsdr_xfer_ctx */
struct rtlsdr_dev { struct rtlsdr_dev {
libusb_context *ctx; libusb_context *ctx;
struct libusb_device_handle *devh; struct libusb_device_handle *devh;
uint32_t xfer_buf_num;
rtlsdr_xfer_ctx_t **xfer_ctx_list;
uint32_t xfer_ctx_num;
uint32_t xfer_buf_len; uint32_t xfer_buf_len;
struct libusb_transfer **xfer;
unsigned char **xfer_buf;
rtlsdr_read_async_cb_t cb; rtlsdr_read_async_cb_t cb;
void *cb_ctx; void *cb_ctx;
enum rtlsdr_async_status async_status; enum rtlsdr_async_status async_status;
int async_cancel; int async_cancel;
int use_zerocopy;
/* rtl demod context */ /* rtl demod context */
uint32_t rate; /* Hz */ uint32_t rate; /* Hz */
uint32_t rtl_xtal; /* Hz */ uint32_t rtl_xtal; /* Hz */
@ -1718,157 +1756,259 @@ int rtlsdr_read_sync(rtlsdr_dev_t *dev, void *buf, int len, int *n_read)
return libusb_bulk_transfer(dev->devh, 0x81, buf, len, n_read, BULK_TIMEOUT); return libusb_bulk_transfer(dev->devh, 0x81, buf, len, n_read, BULK_TIMEOUT);
} }
static void LIBUSB_CALL _libusb_callback(struct libusb_transfer *xfer)
{
rtlsdr_dev_t *dev = (rtlsdr_dev_t *)xfer->user_data;
if (LIBUSB_TRANSFER_COMPLETED == xfer->status) {
if (dev->cb)
dev->cb(xfer->buffer, xfer->actual_length, dev->cb_ctx);
libusb_submit_transfer(xfer); /* resubmit transfer */
dev->xfer_errors = 0;
} else if (LIBUSB_TRANSFER_CANCELLED != xfer->status) {
#ifndef _WIN32
if (LIBUSB_TRANSFER_ERROR == xfer->status)
dev->xfer_errors++;
if (dev->xfer_errors >= dev->xfer_buf_num ||
LIBUSB_TRANSFER_NO_DEVICE == xfer->status) {
#endif
dev->dev_lost = 1;
rtlsdr_cancel_async(dev);
fprintf(stderr, "cb transfer status: %d, "
"canceling...\n", xfer->status);
#ifndef _WIN32
}
#endif
}
}
int rtlsdr_wait_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx) int rtlsdr_wait_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx)
{ {
return rtlsdr_read_async(dev, cb, ctx, 0, 0); return rtlsdr_read_async(dev, cb, ctx, 0, 0);
} }
static int _rtlsdr_alloc_async_buffers(rtlsdr_dev_t *dev) static void
rtlsdr_xfer_ctx_free_buffer(rtlsdr_xfer_ctx_t *self)
{ {
unsigned int i; /*
* This path is excersed only if we managed to
* allocate a buffer, which in turn only happens if
* self->dev->xfer_buf_len is set to something meaningful
*/
if (!dev) if (self->buffer != NULL) {
return -1; if (self->zero_copy) {
#if defined (__linux__) && LIBUSB_API_VERSION >= 0x01000105
libusb_dev_mem_free(
self->dev->devh,
self->buffer,
self->dev->xfer_buf_len);
#endif /* __linux__ */
} else {
free(self->buffer);
}
}
}
if (!dev->xfer) { static int
dev->xfer = malloc(dev->xfer_buf_num * rtlsdr_xfer_ctx_cancel(rtlsdr_xfer_ctx_t *self)
sizeof(struct libusb_transfer *)); {
int r = 0;
for(i = 0; i < dev->xfer_buf_num; ++i) struct timeval zero_tv = {0, 0};
dev->xfer[i] = libusb_alloc_transfer(0);
self->done = 0;
r = libusb_cancel_transfer(self->xfer);
while (!self->done) {
libusb_handle_events_timeout_completed(
self->dev->ctx,
&zero_tv,
&self->done);
} }
if (dev->xfer_buf) return r;
return -2; }
dev->xfer_buf = malloc(dev->xfer_buf_num * sizeof(unsigned char *)); static int
memset(dev->xfer_buf, 0, dev->xfer_buf_num * sizeof(unsigned char *)); rtlsdr_xfer_ctx_issue_transfer(rtlsdr_xfer_ctx_t *self)
{
self->done = 0;
libusb_fill_bulk_transfer(
self->xfer,
self->dev->devh,
0x81,
self->buffer,
self->dev->xfer_buf_len,
_rtlsdr_xfer_libusb_callback,
self,
BULK_TIMEOUT);
return libusb_submit_transfer(self->xfer);
}
static int
rtlsdr_xfer_ctx_alloc_buffer(rtlsdr_xfer_ctx_t *self)
{
int ok = 0;
uint32_t buf_len = self->dev->xfer_buf_len;
self->buffer = NULL;
self->zero_copy = 0;
#if defined(ENABLE_ZEROCOPY) && defined (__linux__) && LIBUSB_API_VERSION >= 0x01000105 #if defined(ENABLE_ZEROCOPY) && defined (__linux__) && LIBUSB_API_VERSION >= 0x01000105
fprintf(stderr, "Allocating %d zero-copy buffers\n", dev->xfer_buf_num); self->buffer = libusb_dev_mem_alloc(self->dev->devh, buf_len);
dev->use_zerocopy = 1; if (self->buffer != NULL) {
for (i = 0; i < dev->xfer_buf_num; ++i) { /* Check if Kernel usbfs mmap() bug is present: if the
dev->xfer_buf[i] = libusb_dev_mem_alloc(dev->devh, dev->xfer_buf_len); * mapping is correct, the buffers point to memory that
* was memset to 0 by the Kernel, otherwise, they point
if (dev->xfer_buf[i]) { * to random memory. We check if the buffers are zeroed
/* Check if Kernel usbfs mmap() bug is present: if the * and otherwise fall back to buffers in userspace.
* mapping is correct, the buffers point to memory that */
* was memset to 0 by the Kernel, otherwise, they point if (self->buffer[0] || memcmp(self->buffer,
* to random memory. We check if the buffers are zeroed self->buffer + 1,
* and otherwise fall back to buffers in userspace. buf_len - 1)) {
*/ fprintf(stderr, "Detected Kernel usbfs mmap() "
if (dev->xfer_buf[i][0] || memcmp(dev->xfer_buf[i], "bug, falling back to buffers "
dev->xfer_buf[i] + 1, "in userspace\n");
dev->xfer_buf_len - 1)) {
fprintf(stderr, "Detected Kernel usbfs mmap() "
"bug, falling back to buffers "
"in userspace\n");
dev->use_zerocopy = 0;
break;
}
} else { } else {
fprintf(stderr, "Failed to allocate zero-copy " /* All looks good! Green light to zerocopy */
"buffer for transfer %d\nFalling " self->zero_copy = 1;
"back to buffers in userspace\n", i);
dev->use_zerocopy = 0;
break;
} }
} }
/* zero-copy buffer allocation failed (partially or completely) /*
* we need to free the buffers again if already allocated */ * Zero copy buffer allocation failed. We just need to free the
if (!dev->use_zerocopy) { * failed ones, as the current implementation allows mixed
for (i = 0; i < dev->xfer_buf_num; ++i) { * userspace / zero-copy buffers.
if (dev->xfer_buf[i]) */
libusb_dev_mem_free(dev->devh,
dev->xfer_buf[i], if (!self->zero_copy && self->buffer != NULL) {
dev->xfer_buf_len); libusb_dev_mem_free(
} self->dev->devh,
self->buffer,
buf_len);
self->buffer = NULL;
} }
#endif #endif
/* no zero-copy available, allocate buffers in userspace */ /* No zero copy, allocate in userspace */
if (!dev->use_zerocopy) { if (!self->zero_copy)
for (i = 0; i < dev->xfer_buf_num; ++i) { if ((self->buffer = malloc(buf_len)) == NULL)
dev->xfer_buf[i] = malloc(dev->xfer_buf_len); goto done;
if (!dev->xfer_buf[i]) ok = 1;
return -ENOMEM;
} done:
return ok;
}
rtlsdr_xfer_ctx_t *rtlsdr_xfer_ctx_new(struct rtlsdr_dev *dev)
{
rtlsdr_xfer_ctx_t *new = NULL;
if (dev == NULL)
return NULL;
if ((new = calloc(1, sizeof(rtlsdr_xfer_ctx_t))) == NULL)
goto fail;
new->dev = dev;
if ((new->xfer = libusb_alloc_transfer(0)) == NULL)
goto fail;
if (!rtlsdr_xfer_ctx_alloc_buffer(new))
goto fail;
return new;
fail:
if (new != NULL)
rtlsdr_xfer_ctx_destroy(new);
return NULL;
}
static void LIBUSB_CALL _rtlsdr_xfer_libusb_callback(
struct libusb_transfer *xfer)
{
rtlsdr_xfer_ctx_t *ctx = (rtlsdr_xfer_ctx_t *) xfer->user_data;
rtlsdr_dev_t *dev = ctx->dev;
switch (xfer->status) {
case LIBUSB_TRANSFER_COMPLETED:
if (dev->cb)
dev->cb(xfer->buffer, xfer->actual_length, dev->cb_ctx);
libusb_submit_transfer(xfer); /* resubmit transfer */
dev->xfer_errors = 0;
ctx->done = 1;
break;
case LIBUSB_TRANSFER_CANCELLED:
ctx->done = 1;
break;
default:
/* Maybe an error */
#ifndef _WIN32
if (LIBUSB_TRANSFER_ERROR == xfer->status)
dev->xfer_errors++;
if (dev->xfer_errors >= dev->xfer_ctx_num ||
LIBUSB_TRANSFER_NO_DEVICE == xfer->status) {
#endif
dev->dev_lost = 1;
rtlsdr_cancel_async(dev);
fprintf(stderr, "cb transfer status: %d, "
"canceling...\n", xfer->status);
#ifndef _WIN32
}
#endif
} }
}
return 0; void
rtlsdr_xfer_ctx_destroy(rtlsdr_xfer_ctx_t *self)
{
if (self->xfer != NULL)
libusb_free_transfer(self->xfer);
rtlsdr_xfer_ctx_free_buffer(self);
free(self);
} }
static int _rtlsdr_free_async_buffers(rtlsdr_dev_t *dev) static int _rtlsdr_free_async_buffers(rtlsdr_dev_t *dev)
{ {
unsigned int i; unsigned int i;
if (!dev) if (dev == NULL)
return -1; return -1;
if (dev->xfer) { if (dev->xfer_ctx_list != NULL) {
for(i = 0; i < dev->xfer_buf_num; ++i) { for (i = 0; i < dev->xfer_ctx_num; ++i)
if (dev->xfer[i]) { if (dev->xfer_ctx_list[i] != NULL)
libusb_free_transfer(dev->xfer[i]); rtlsdr_xfer_ctx_destroy(dev->xfer_ctx_list[i]);
}
}
free(dev->xfer); free(dev->xfer_ctx_list);
dev->xfer = NULL; dev->xfer_ctx_list = NULL;
}
if (dev->xfer_buf) {
for (i = 0; i < dev->xfer_buf_num; ++i) {
if (dev->xfer_buf[i]) {
if (dev->use_zerocopy) {
#if defined (__linux__) && LIBUSB_API_VERSION >= 0x01000105
libusb_dev_mem_free(dev->devh,
dev->xfer_buf[i],
dev->xfer_buf_len);
#endif
} else {
free(dev->xfer_buf[i]);
}
}
}
free(dev->xfer_buf);
dev->xfer_buf = NULL;
} }
return 0; return 0;
} }
static int _rtlsdr_alloc_async_buffers(rtlsdr_dev_t *dev)
{
unsigned int i;
int ret = -1;
if (dev == NULL)
goto done;
if (dev->xfer_ctx_list == NULL) {
/* Lazy allocation of async buffers */
if ((dev->xfer_ctx_list = calloc(
dev->xfer_ctx_num,
sizeof(rtlsdr_xfer_ctx_t *))) == NULL)
goto done;
for (i = 0; i < dev->xfer_ctx_num; ++i) {
dev->xfer_ctx_list[i] = rtlsdr_xfer_ctx_new(dev);
if (dev->xfer_ctx_list[i] == NULL)
goto done;
}
}
ret = 0;
done:
if (ret != 0)
_rtlsdr_free_async_buffers(dev);
return ret;
}
int rtlsdr_read_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx, int rtlsdr_read_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx,
uint32_t buf_num, uint32_t buf_len) uint32_t buf_num, uint32_t buf_len)
{ {
@ -1891,9 +2031,9 @@ int rtlsdr_read_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx,
dev->cb_ctx = ctx; dev->cb_ctx = ctx;
if (buf_num > 0) if (buf_num > 0)
dev->xfer_buf_num = buf_num; dev->xfer_ctx_num = buf_num;
else else
dev->xfer_buf_num = DEFAULT_BUF_NUMBER; dev->xfer_ctx_num = DEFAULT_BUF_NUMBER;
if (buf_len > 0 && buf_len % 512 == 0) /* len must be multiple of 512 */ if (buf_len > 0 && buf_len % 512 == 0) /* len must be multiple of 512 */
dev->xfer_buf_len = buf_len; dev->xfer_buf_len = buf_len;
@ -1902,17 +2042,9 @@ int rtlsdr_read_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx,
_rtlsdr_alloc_async_buffers(dev); _rtlsdr_alloc_async_buffers(dev);
for(i = 0; i < dev->xfer_buf_num; ++i) { for(i = 0; i < dev->xfer_ctx_num; ++i) {
libusb_fill_bulk_transfer(dev->xfer[i], r = rtlsdr_xfer_ctx_issue_transfer(dev->xfer_ctx_list[i]);
dev->devh,
0x81,
dev->xfer_buf[i],
dev->xfer_buf_len,
_libusb_callback,
(void *)dev,
BULK_TIMEOUT);
r = libusb_submit_transfer(dev->xfer[i]);
if (r < 0) { if (r < 0) {
fprintf(stderr, "Failed to submit transfer %i\n" fprintf(stderr, "Failed to submit transfer %i\n"
"Please increase your allowed " "Please increase your allowed "
@ -1938,21 +2070,16 @@ int rtlsdr_read_async(rtlsdr_dev_t *dev, rtlsdr_read_async_cb_t cb, void *ctx,
if (RTLSDR_CANCELING == dev->async_status) { if (RTLSDR_CANCELING == dev->async_status) {
next_status = RTLSDR_INACTIVE; next_status = RTLSDR_INACTIVE;
if (!dev->xfer) if (dev->xfer_ctx_list == NULL)
break; break;
for(i = 0; i < dev->xfer_buf_num; ++i) { for(i = 0; i < dev->xfer_ctx_num; ++i) {
if (!dev->xfer[i]) if (dev->xfer_ctx_list[i] == NULL)
continue; continue;
if (LIBUSB_TRANSFER_CANCELLED != if (LIBUSB_TRANSFER_CANCELLED !=
dev->xfer[i]->status) { rtlsdr_xfer_ctx_status(dev->xfer_ctx_list[i])) {
r = libusb_cancel_transfer(dev->xfer[i]); r = rtlsdr_xfer_ctx_cancel(dev->xfer_ctx_list[i]);
/* handle events after canceling
* to allow transfer status to
* propagate */
libusb_handle_events_timeout_completed(dev->ctx,
&zerotv, NULL);
if (r < 0) if (r < 0)
continue; continue;