diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt index b486a3e275..9a6da89b18 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt @@ -48,47 +48,6 @@ suspend fun Single.await(subscribeOn: Scheduler? = null): T { } } -// suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) { -// return suspendCancellableCoroutine { continuation -> -// val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this -// lateinit var sub: Subscription -// sub = self.subscribe( -// { -// continuation.resume(Unit) { -// sub.unsubscribe() -// } -// }, -// { -// if (!continuation.isCancelled) { -// continuation.resumeWithException(it) -// } -// } -// ) -// -// continuation.invokeOnCancellation { -// sub.unsubscribe() -// } -// } -// } -// -// suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont -> -// subscribe( -// object : CompletableSubscriber { -// override fun onSubscribe(s: Subscription) { -// cont.unsubscribeOnCancellation(s) -// } -// -// override fun onCompleted() { -// cont.resume(Unit) -// } -// -// override fun onError(e: Throwable) { -// cont.resumeWithException(e) -// } -// } -// ) -// } - suspend fun Single.await(): T = suspendCancellableCoroutine { cont -> cont.unsubscribeOnCancellation( subscribe( @@ -105,28 +64,8 @@ suspend fun Single.await(): T = suspendCancellableCoroutine { cont -> ) } -// suspend fun Observable.awaitFirst(): T = first().awaitOne() -// -// suspend fun Observable.awaitFirstOrDefault(default: T): T = -// firstOrDefault(default).awaitOne() -// -// suspend fun Observable.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne() -// -// suspend fun Observable.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty( -// Observable.fromCallable( -// defaultValue -// ) -// ).first().awaitOne() -// -// suspend fun Observable.awaitLast(): T = last().awaitOne() - suspend fun Observable.awaitSingle(): T = single().awaitOne() -// suspend fun Observable.awaitSingleOrDefault(default: T): T = -// singleOrDefault(default).awaitOne() -// -// suspend fun Observable.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne() - private suspend fun Observable.awaitOne(): T = suspendCancellableCoroutine { cont -> cont.unsubscribeOnCancellation( subscribe( @@ -184,32 +123,6 @@ fun Observable.asFlow(): Flow = callbackFlow { awaitClose { subscription.unsubscribe() } } -// fun Flow.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable { -// return Observable.create( -// { emitter -> -// /* -// * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if -// * asObservable is already invoked from unconfined -// */ -// val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { -// try { -// collect { emitter.onNext(it) } -// emitter.onCompleted() -// } catch (e: Throwable) { -// // Ignore `CancellationException` as error, since it indicates "normal cancellation" -// if (e !is CancellationException) { -// emitter.onError(e) -// } else { -// emitter.onCompleted() -// } -// } -// } -// emitter.setCancellation { job.cancel() } -// }, -// backpressureMode -// ) -// } - fun runAsObservable( block: suspend () -> T, backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE