mirror of
				https://github.com/rtlsdrblog/rtl-sdr-blog.git
				synced 2025-10-31 08:58:09 +01:00 
			
		
		
		
	Perf: Replace rtl_tcp linked list with ring buffer
Removed the linked list structure, locking, and malloc/free on each USB callback. Replaced it with a non blocking ring buffer for performance. This resulted in a large performance improvement when using an Raspberry Pi or Raspberry Pi Zero W as a rtl_tcp server. The sample rate could be doubled and instead of wired ethernet wifi could be used. Frequnecy change lag was reduced greatly also. Signed-off-by: Stephen Blinick <stephen@stesoft.com>
This commit is contained in:
		
							
								
								
									
										202
									
								
								src/rtl_tcp.c
									
									
									
									
									
								
							
							
						
						
									
										202
									
								
								src/rtl_tcp.c
									
									
									
									
									
								
							| @@ -61,15 +61,8 @@ static pthread_t command_thread; | ||||
| static pthread_cond_t exit_cond; | ||||
| static pthread_mutex_t exit_cond_lock; | ||||
|  | ||||
| static pthread_mutex_t ll_mutex; | ||||
| static pthread_cond_t cond; | ||||
|  | ||||
| struct llist { | ||||
| 	char *data; | ||||
| 	size_t len; | ||||
| 	struct llist *next; | ||||
| }; | ||||
|  | ||||
| typedef struct { /* structure size must be multiple of 2 bytes */ | ||||
| 	char magic[4]; | ||||
| 	uint32_t tuner_type; | ||||
| @@ -79,9 +72,17 @@ typedef struct { /* structure size must be multiple of 2 bytes */ | ||||
| static rtlsdr_dev_t *dev = NULL; | ||||
|  | ||||
| static int enable_biastee = 0; | ||||
| static int global_numq = 0; | ||||
| static struct llist *ll_buffers = 0; | ||||
| static int llbuf_num = 500; | ||||
|  | ||||
| // Ring Buffer declarations | ||||
| // 8MB appears to cover several seconds at high bitrates -- about as much lag as you'd want | ||||
| #define RINGBUFSZ_INIT (8*1024*1024) | ||||
| static int ringbuf_sz = RINGBUFSZ_INIT; | ||||
| static int ringbuf_trimsz = 512*1024; | ||||
| static unsigned char *ringbuf = NULL; | ||||
| static volatile unsigned int ringbuf_head = 0; | ||||
| static volatile unsigned int ringbuf_tail = 0; | ||||
| static unsigned int total_radio_bytes = 0; | ||||
| static unsigned int max_bytes_in_flight = 0; | ||||
|  | ||||
| static volatile int do_exit = 0; | ||||
|  | ||||
| @@ -145,53 +146,60 @@ static void sighandler(int signum) | ||||
|  | ||||
| void rtlsdr_callback(unsigned char *buf, uint32_t len, void *ctx) | ||||
| { | ||||
|     static time_t lasttime = 0; | ||||
| 	static int lastbytes = 0; | ||||
| 	time_t curtime; | ||||
|  | ||||
| 	if(!do_exit) { | ||||
| 		struct llist *rpt = (struct llist*)malloc(sizeof(struct llist)); | ||||
| 		rpt->data = (char*)malloc(len); | ||||
| 		memcpy(rpt->data, buf, len); | ||||
| 		rpt->len = len; | ||||
| 		rpt->next = NULL; | ||||
| 		unsigned int bufferleft; | ||||
|  | ||||
| 		pthread_mutex_lock(&ll_mutex); | ||||
|  | ||||
| 		if (ll_buffers == NULL) { | ||||
| 			ll_buffers = rpt; | ||||
| 		} else { | ||||
| 			struct llist *cur = ll_buffers; | ||||
| 			int num_queued = 0; | ||||
|  | ||||
| 			while (cur->next != NULL) { | ||||
| 				cur = cur->next; | ||||
| 				num_queued++; | ||||
| 			} | ||||
|  | ||||
| 			if(llbuf_num && llbuf_num == num_queued-2){ | ||||
| 				struct llist *curelem; | ||||
|  | ||||
| 				free(ll_buffers->data); | ||||
| 				curelem = ll_buffers->next; | ||||
| 				free(ll_buffers); | ||||
| 				ll_buffers = curelem; | ||||
| 			} | ||||
|  | ||||
| 			cur->next = rpt; | ||||
|  | ||||
| 			if (num_queued > global_numq) | ||||
| 				printf("ll+, now %d\n", num_queued); | ||||
| 			else if (num_queued < global_numq) | ||||
| 				printf("ll-, now %d\n", num_queued); | ||||
|  | ||||
| 			global_numq = num_queued; | ||||
| 		if (ringbuf == NULL) | ||||
| 		{ | ||||
| 			printf("Allocate %d bytes for ringbuf.\n", ringbuf_sz); | ||||
| 			ringbuf = (unsigned char*)malloc(ringbuf_sz); | ||||
| 		} | ||||
|  | ||||
| 		bufferleft = ringbuf_sz - ((ringbuf_head < ringbuf_tail) ? (ringbuf_head - ringbuf_tail + ringbuf_sz) : (ringbuf_head - ringbuf_tail)); | ||||
| 		if (len < bufferleft) | ||||
| 		{ | ||||
| 			if ((ringbuf_head+len) < (unsigned int)ringbuf_sz) | ||||
| 			{ | ||||
| 				memcpy(((unsigned char*)(ringbuf+ringbuf_head)), buf, len); | ||||
| 			} | ||||
| 			else | ||||
| 			{ | ||||
| 				memcpy(((unsigned char*)ringbuf+ringbuf_head), buf, ringbuf_sz-ringbuf_head); | ||||
| 				memcpy((unsigned char*)ringbuf, buf+(ringbuf_sz-ringbuf_head), len-(ringbuf_sz-ringbuf_head)); | ||||
| 			} | ||||
| 			ringbuf_head = (ringbuf_head + len) % ringbuf_sz; | ||||
| 		} | ||||
| 		else | ||||
| 		{ | ||||
| 			printf("overrun: head=%d tail=%d, Trimming %d bytes from tail of buffer\n", ringbuf_head, ringbuf_tail, ringbuf_trimsz); | ||||
| 			ringbuf_tail = (ringbuf_tail + ringbuf_trimsz) % ringbuf_sz; | ||||
| 		} | ||||
|  | ||||
| 		total_radio_bytes += len; | ||||
| 		curtime = time (NULL); | ||||
| 		if ((curtime - lasttime) > 30) | ||||
| 		{ | ||||
| 		   int nsecs = curtime - lasttime; | ||||
| 		   int nbytes = total_radio_bytes - lastbytes; | ||||
| 		   int bytes_in_flight = (ringbuf_head - ringbuf_tail); | ||||
| 		   if (bytes_in_flight < 0) | ||||
| 			  bytes_in_flight = ringbuf_sz + bytes_in_flight; | ||||
| 		   lasttime=curtime; | ||||
| 		   lastbytes=total_radio_bytes; | ||||
| 		   printf(">> [ %3.2fMB/s ]  [ bytes_in_flight(cur/max) = %4dK / %4dK ]\n", | ||||
| 			  (float)nbytes/(float)nsecs/1000.0/1000.0, bytes_in_flight/1024, max_bytes_in_flight/1024); | ||||
| 		   max_bytes_in_flight=0; | ||||
| 		} | ||||
| 		pthread_cond_signal(&cond); | ||||
| 		pthread_mutex_unlock(&ll_mutex); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| static void *tcp_worker(void *arg) | ||||
| { | ||||
| 	struct llist *curelem,*prev; | ||||
| 	int bytesleft,bytessent, index; | ||||
| 	int bytesleft, bytessent; | ||||
| 	struct timeval tv= {1,0}; | ||||
| 	struct timespec ts; | ||||
| 	struct timeval tp; | ||||
| @@ -202,47 +210,33 @@ static void *tcp_worker(void *arg) | ||||
| 		if(do_exit) | ||||
| 			pthread_exit(0); | ||||
|  | ||||
| 		pthread_mutex_lock(&ll_mutex); | ||||
| 		gettimeofday(&tp, NULL); | ||||
| 		ts.tv_sec  = tp.tv_sec+5; | ||||
| 		ts.tv_nsec = tp.tv_usec * 1000; | ||||
| 		r = pthread_cond_timedwait(&cond, &ll_mutex, &ts); | ||||
| 		if(r == ETIMEDOUT) { | ||||
| 			pthread_mutex_unlock(&ll_mutex); | ||||
| 			printf("worker cond timeout\n"); | ||||
| 			sighandler(0); | ||||
| 			pthread_exit(NULL); | ||||
| 		} | ||||
|  | ||||
| 		curelem = ll_buffers; | ||||
| 		ll_buffers = 0; | ||||
| 		pthread_mutex_unlock(&ll_mutex); | ||||
|  | ||||
| 		while(curelem != 0) { | ||||
| 			bytesleft = curelem->len; | ||||
| 			index = 0; | ||||
| 			bytessent = 0; | ||||
| 			while(bytesleft > 0) { | ||||
| 				FD_ZERO(&writefds); | ||||
| 				FD_SET(s, &writefds); | ||||
| 				tv.tv_sec = 1; | ||||
| 				tv.tv_usec = 0; | ||||
| 				r = select(s+1, NULL, &writefds, NULL, &tv); | ||||
| 				if(r) { | ||||
| 					bytessent = send(s,  &curelem->data[index], bytesleft, 0); | ||||
| 					bytesleft -= bytessent; | ||||
| 					index += bytessent; | ||||
| 				} | ||||
| 				if(bytessent == SOCKET_ERROR || do_exit) { | ||||
| 						printf("worker socket bye\n"); | ||||
| 						sighandler(0); | ||||
| 						pthread_exit(NULL); | ||||
| 				} | ||||
| 			} | ||||
| 			prev = curelem; | ||||
| 			curelem = curelem->next; | ||||
| 			free(prev->data); | ||||
| 			free(prev); | ||||
| 		bytesleft = (ringbuf_head < ringbuf_tail) ? | ||||
| 		            (ringbuf_head - ringbuf_tail + ringbuf_sz) : | ||||
| 					(ringbuf_head - ringbuf_tail); | ||||
| 		while (bytesleft > 0) | ||||
| 		{ | ||||
| 		   FD_ZERO(&writefds); | ||||
| 		   FD_SET(s, &writefds); | ||||
| 		   tv.tv_sec = 1; | ||||
| 		   tv.tv_usec = 0; | ||||
| 		   r = select(s+1, NULL, &writefds, NULL, &tv); | ||||
| 		   if(r) { | ||||
| 			  unsigned int sendchunk; | ||||
| 			  if (ringbuf_tail < ringbuf_head) | ||||
| 				 sendchunk = ringbuf_head - ringbuf_tail; | ||||
| 			  else | ||||
| 				 sendchunk = ringbuf_sz - ringbuf_tail; | ||||
| 			  if (sendchunk > max_bytes_in_flight) | ||||
| 				 max_bytes_in_flight = sendchunk; | ||||
| 			  bytessent = send(s,  (unsigned char*)(ringbuf+ringbuf_tail), sendchunk, 0); | ||||
| 			  bytesleft -= bytessent; | ||||
| 			  ringbuf_tail = (ringbuf_tail + bytessent) % ringbuf_sz; | ||||
| 		   } | ||||
| 		   if(bytessent == SOCKET_ERROR || do_exit) { | ||||
| 			  printf("worker socket bye\n"); | ||||
| 			  sighandler(0); | ||||
| 			  pthread_exit(NULL); | ||||
| 		   } | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -397,7 +391,7 @@ int main(int argc, char **argv) | ||||
| 	struct sigaction sigact, sigign; | ||||
| #endif | ||||
|  | ||||
| 	while ((opt = getopt(argc, argv, "a:p:f:g:s:b:n:d:P:T")) != -1) { | ||||
| 	while ((opt = getopt(argc, argv, "a:p:f:g:s:b:d:P:T")) != -1) { | ||||
| 		switch (opt) { | ||||
| 		case 'd': | ||||
| 			dev_index = verbose_device_search(optarg); | ||||
| @@ -421,9 +415,6 @@ int main(int argc, char **argv) | ||||
| 		case 'b': | ||||
| 			buf_num = atoi(optarg); | ||||
| 			break; | ||||
| 		case 'n': | ||||
| 			llbuf_num = atoi(optarg); | ||||
| 			break; | ||||
| 		case 'P': | ||||
| 			ppm_error = atoi(optarg); | ||||
| 			break; | ||||
| @@ -444,7 +435,7 @@ int main(int argc, char **argv) | ||||
| 	} | ||||
|  | ||||
| 	if (dev_index < 0) { | ||||
| 	    exit(1); | ||||
| 		exit(1); | ||||
| 	} | ||||
|  | ||||
| 	rtlsdr_open(&dev, (uint32_t)dev_index); | ||||
| @@ -510,7 +501,6 @@ int main(int argc, char **argv) | ||||
| 		fprintf(stderr, "WARNING: Failed to reset buffers.\n"); | ||||
|  | ||||
| 	pthread_mutex_init(&exit_cond_lock, NULL); | ||||
| 	pthread_mutex_init(&ll_mutex, NULL); | ||||
| 	pthread_mutex_init(&exit_cond_lock, NULL); | ||||
| 	pthread_cond_init(&cond, NULL); | ||||
| 	pthread_cond_init(&exit_cond, NULL); | ||||
| @@ -536,10 +526,10 @@ int main(int argc, char **argv) | ||||
| 	while(1) { | ||||
| 		printf("listening...\n"); | ||||
| 		printf("Use the device argument 'rtl_tcp=%s:%d' in OsmoSDR " | ||||
| 		       "(gr-osmosdr) source\n" | ||||
| 		       "to receive samples in GRC and control " | ||||
| 		       "rtl_tcp parameters (frequency, gain, ...).\n", | ||||
| 		       addr, port); | ||||
| 			   "(gr-osmosdr) source\n" | ||||
| 			   "to receive samples in GRC and control " | ||||
| 			   "rtl_tcp parameters (frequency, gain, ...).\n", | ||||
| 			   addr, port); | ||||
| 		listen(listensocket,1); | ||||
|  | ||||
| 		while(1) { | ||||
| @@ -590,24 +580,16 @@ int main(int argc, char **argv) | ||||
| 		closesocket(s); | ||||
|  | ||||
| 		printf("all threads dead..\n"); | ||||
| 		curelem = ll_buffers; | ||||
| 		ll_buffers = 0; | ||||
|  | ||||
| 		while(curelem != 0) { | ||||
| 			prev = curelem; | ||||
| 			curelem = curelem->next; | ||||
| 			free(prev->data); | ||||
| 			free(prev); | ||||
| 		} | ||||
|  | ||||
| 		do_exit = 0; | ||||
| 		global_numq = 0; | ||||
| 	} | ||||
|  | ||||
| out: | ||||
| 	rtlsdr_close(dev); | ||||
| 	closesocket(listensocket); | ||||
| 	closesocket(s); | ||||
| 	if (ringbuf) | ||||
| 	   free(ringbuf); | ||||
| #ifdef _WIN32 | ||||
| 	WSACleanup(); | ||||
| #endif | ||||
|   | ||||
		Reference in New Issue
	
	Block a user