mirror of
https://github.com/mihonapp/mihon.git
synced 2025-11-20 07:51:14 +01:00
More coroutine tweaks
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
package eu.kanade.tachiyomi.util.lang
|
||||
|
||||
import com.pushtorefresh.storio.operations.PreparedOperation
|
||||
import com.pushtorefresh.storio.sqlite.operations.get.PreparedGetObject
|
||||
import kotlinx.coroutines.CancellableContinuation
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineStart
|
||||
@@ -10,11 +8,8 @@ import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import rx.Completable
|
||||
import rx.CompletableSubscriber
|
||||
import rx.Emitter
|
||||
import rx.Observable
|
||||
import rx.Observer
|
||||
@@ -53,49 +48,46 @@ suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun <T> PreparedOperation<T>.await(): T = asRxSingle().await()
|
||||
suspend fun <T> PreparedGetObject<T>.await(): T? = asRxSingle().await()
|
||||
|
||||
suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
||||
return suspendCancellableCoroutine { continuation ->
|
||||
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
||||
lateinit var sub: Subscription
|
||||
sub = self.subscribe(
|
||||
{
|
||||
continuation.resume(Unit) {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
},
|
||||
{
|
||||
if (!continuation.isCancelled) {
|
||||
continuation.resumeWithException(it)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
continuation.invokeOnCancellation {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
|
||||
subscribe(
|
||||
object : CompletableSubscriber {
|
||||
override fun onSubscribe(s: Subscription) {
|
||||
cont.unsubscribeOnCancellation(s)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
cont.resume(Unit)
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable) {
|
||||
cont.resumeWithException(e)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
// suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
||||
// return suspendCancellableCoroutine { continuation ->
|
||||
// val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
||||
// lateinit var sub: Subscription
|
||||
// sub = self.subscribe(
|
||||
// {
|
||||
// continuation.resume(Unit) {
|
||||
// sub.unsubscribe()
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// if (!continuation.isCancelled) {
|
||||
// continuation.resumeWithException(it)
|
||||
// }
|
||||
// }
|
||||
// )
|
||||
//
|
||||
// continuation.invokeOnCancellation {
|
||||
// sub.unsubscribe()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
|
||||
// subscribe(
|
||||
// object : CompletableSubscriber {
|
||||
// override fun onSubscribe(s: Subscription) {
|
||||
// cont.unsubscribeOnCancellation(s)
|
||||
// }
|
||||
//
|
||||
// override fun onCompleted() {
|
||||
// cont.resume(Unit)
|
||||
// }
|
||||
//
|
||||
// override fun onError(e: Throwable) {
|
||||
// cont.resumeWithException(e)
|
||||
// }
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
|
||||
suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
|
||||
cont.unsubscribeOnCancellation(
|
||||
@@ -113,27 +105,27 @@ suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T =
|
||||
firstOrDefault(default).awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(
|
||||
Observable.fromCallable(
|
||||
defaultValue
|
||||
)
|
||||
).first().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
|
||||
// suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T =
|
||||
// firstOrDefault(default).awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(
|
||||
// Observable.fromCallable(
|
||||
// defaultValue
|
||||
// )
|
||||
// ).first().awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T =
|
||||
singleOrDefault(default).awaitOne()
|
||||
|
||||
suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
|
||||
// suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T =
|
||||
// singleOrDefault(default).awaitOne()
|
||||
//
|
||||
// suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
|
||||
|
||||
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
|
||||
cont.unsubscribeOnCancellation(
|
||||
@@ -192,31 +184,31 @@ fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
|
||||
awaitClose { subscription.unsubscribe() }
|
||||
}
|
||||
|
||||
fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
|
||||
return Observable.create(
|
||||
{ emitter ->
|
||||
/*
|
||||
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
|
||||
* asObservable is already invoked from unconfined
|
||||
*/
|
||||
val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
|
||||
try {
|
||||
collect { emitter.onNext(it) }
|
||||
emitter.onCompleted()
|
||||
} catch (e: Throwable) {
|
||||
// Ignore `CancellationException` as error, since it indicates "normal cancellation"
|
||||
if (e !is CancellationException) {
|
||||
emitter.onError(e)
|
||||
} else {
|
||||
emitter.onCompleted()
|
||||
}
|
||||
}
|
||||
}
|
||||
emitter.setCancellation { job.cancel() }
|
||||
},
|
||||
backpressureMode
|
||||
)
|
||||
}
|
||||
// fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
|
||||
// return Observable.create(
|
||||
// { emitter ->
|
||||
// /*
|
||||
// * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
|
||||
// * asObservable is already invoked from unconfined
|
||||
// */
|
||||
// val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
|
||||
// try {
|
||||
// collect { emitter.onNext(it) }
|
||||
// emitter.onCompleted()
|
||||
// } catch (e: Throwable) {
|
||||
// // Ignore `CancellationException` as error, since it indicates "normal cancellation"
|
||||
// if (e !is CancellationException) {
|
||||
// emitter.onError(e)
|
||||
// } else {
|
||||
// emitter.onCompleted()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// emitter.setCancellation { job.cancel() }
|
||||
// },
|
||||
// backpressureMode
|
||||
// )
|
||||
// }
|
||||
|
||||
fun <T> runAsObservable(
|
||||
block: suspend () -> T,
|
||||
|
||||
Reference in New Issue
Block a user