mirror of
https://github.com/mihonapp/mihon.git
synced 2025-11-15 13:37:29 +01:00
Use kapt, remove retrolambda, migrate database and source to Kotlin
This commit is contained in:
@@ -10,6 +10,8 @@ import rx.Observable;
|
||||
import rx.Observable.Operator;
|
||||
import rx.Subscriber;
|
||||
import rx.Subscription;
|
||||
import rx.functions.Action0;
|
||||
import rx.functions.Action1;
|
||||
import rx.functions.Func1;
|
||||
import rx.subscriptions.CompositeSubscription;
|
||||
import rx.subscriptions.Subscriptions;
|
||||
@@ -58,29 +60,35 @@ public class DynamicConcurrentMergeOperator<T, R> implements Operator<R, T> {
|
||||
}
|
||||
|
||||
public void init(Observable<Integer> workerCount) {
|
||||
Subscription wc = workerCount.subscribe(n -> {
|
||||
int n0 = workers.size();
|
||||
if (n0 < n) {
|
||||
for (int i = n0; i < n; i++) {
|
||||
DynamicWorker<T, R> dw = new DynamicWorker<>(++id, this);
|
||||
workers.add(dw);
|
||||
request(1);
|
||||
dw.tryNext();
|
||||
}
|
||||
} else if (n0 > n) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
workers.get(i).start();
|
||||
Subscription wc = workerCount.subscribe(new Action1<Integer>() {
|
||||
@Override
|
||||
public void call(Integer n) {
|
||||
int n0 = workers.size();
|
||||
if (n0 < n) {
|
||||
for (int i = n0; i < n; i++) {
|
||||
DynamicWorker<T, R> dw = new DynamicWorker<>(++id, DynamicConcurrentMerge.this);
|
||||
workers.add(dw);
|
||||
DynamicConcurrentMerge.this.request(1);
|
||||
dw.tryNext();
|
||||
}
|
||||
} else if (n0 > n) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
workers.get(i).start();
|
||||
}
|
||||
|
||||
for (int i = n0 - 1; i >= n; i--) {
|
||||
workers.get(i).stop();
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = n0 - 1; i >= n; i--) {
|
||||
workers.get(i).stop();
|
||||
if (!once.get() && once.compareAndSet(false, true)) {
|
||||
DynamicConcurrentMerge.this.request(n);
|
||||
}
|
||||
}
|
||||
|
||||
if (!once.get() && once.compareAndSet(false, true)) {
|
||||
request(n);
|
||||
}
|
||||
}, this::onError);
|
||||
}, new Action1<Throwable>() {
|
||||
@Override
|
||||
public void call(Throwable e) {DynamicConcurrentMerge.this.onError(e);}
|
||||
});
|
||||
|
||||
composite.add(wc);
|
||||
}
|
||||
@@ -138,9 +146,9 @@ public class DynamicConcurrentMergeOperator<T, R> implements Operator<R, T> {
|
||||
return;
|
||||
}
|
||||
|
||||
Observable out = parent.mapper.call(t);
|
||||
Observable<? extends R> out = parent.mapper.call(t);
|
||||
|
||||
Subscriber<R> s = new Subscriber<R>() {
|
||||
final Subscriber<R> s = new Subscriber<R>() {
|
||||
@Override
|
||||
public void onNext(R t) {
|
||||
parent.actual.onNext(t);
|
||||
@@ -163,9 +171,11 @@ public class DynamicConcurrentMergeOperator<T, R> implements Operator<R, T> {
|
||||
};
|
||||
|
||||
parent.composite.add(s);
|
||||
s.add(Subscriptions.create(() -> parent.composite.remove(s)));
|
||||
s.add(Subscriptions.create(new Action0() {
|
||||
@Override
|
||||
public void call() {parent.composite.remove(s);}
|
||||
}));
|
||||
|
||||
// Unchecked assignment to avoid weird Android Studio errors
|
||||
out.subscribe(s);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
package eu.kanade.tachiyomi.util;
|
||||
|
||||
import android.util.Pair;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import rx.Observable;
|
||||
import rx.functions.Func1;
|
||||
import rx.subjects.PublishSubject;
|
||||
|
||||
public class RxPager<T> {
|
||||
|
||||
private final PublishSubject<List<T>> results = PublishSubject.create();
|
||||
private int requestedCount;
|
||||
|
||||
public Observable<Pair<Integer, List<T>>> results() {
|
||||
requestedCount = 0;
|
||||
return results.map(list -> Pair.create(requestedCount++, list));
|
||||
}
|
||||
|
||||
public Observable<List<T>> request(Func1<Integer, Observable<List<T>>> networkObservable) {
|
||||
return networkObservable.call(requestedCount).doOnNext(results::onNext);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
21
app/src/main/java/eu/kanade/tachiyomi/util/RxPager.kt
Normal file
21
app/src/main/java/eu/kanade/tachiyomi/util/RxPager.kt
Normal file
@@ -0,0 +1,21 @@
|
||||
package eu.kanade.tachiyomi.util
|
||||
|
||||
import android.util.Pair
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
|
||||
class RxPager<T> {
|
||||
|
||||
private val results = PublishSubject.create<List<T>>()
|
||||
private var requestedCount: Int = 0
|
||||
|
||||
fun results(): Observable<Pair<Int, List<T>>> {
|
||||
requestedCount = 0
|
||||
return results.map { Pair(requestedCount++, it) }
|
||||
}
|
||||
|
||||
fun request(networkObservable: (Int) -> Observable<List<T>>) =
|
||||
networkObservable(requestedCount).doOnNext { results.onNext(it) }
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user