mirror of
				https://github.com/mihonapp/mihon.git
				synced 2025-11-04 08:08:55 +01:00 
			
		
		
		
	Replace RxJava in HttpPageLoader downloader (#8955)
* Convert downloader Observable to flow Uses `runInterruptible` to turn the blocking call to `queue.take()` into a cancellable call. Flow collection is ended by cancelling the scope in `recycle`. This means the `HttpPageLoader` can't be reused after calling `recycle`, but this was true with the `Observable` as well.) * Convert load Observables to suspending function Inlining the Observables allows for some simplification of the error handling. Behavior should be otherwise identical. * Convert cleanup Completable to coroutine Uses global `launchIO`, not ideal but similar to previous behavior. Can't be scheduled on the local `scope` as this runs after `scope` is cancelled.
This commit is contained in:
		@@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page
 | 
			
		||||
import eu.kanade.tachiyomi.source.online.HttpSource
 | 
			
		||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
 | 
			
		||||
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
 | 
			
		||||
import eu.kanade.tachiyomi.util.lang.plusAssign
 | 
			
		||||
import eu.kanade.tachiyomi.util.system.logcat
 | 
			
		||||
import eu.kanade.tachiyomi.util.lang.awaitSingle
 | 
			
		||||
import eu.kanade.tachiyomi.util.lang.launchIO
 | 
			
		||||
import kotlinx.coroutines.CancellationException
 | 
			
		||||
import logcat.LogPriority
 | 
			
		||||
import rx.Completable
 | 
			
		||||
import kotlinx.coroutines.CoroutineScope
 | 
			
		||||
import kotlinx.coroutines.Dispatchers
 | 
			
		||||
import kotlinx.coroutines.SupervisorJob
 | 
			
		||||
import kotlinx.coroutines.cancel
 | 
			
		||||
import kotlinx.coroutines.flow.filter
 | 
			
		||||
import kotlinx.coroutines.flow.flow
 | 
			
		||||
import kotlinx.coroutines.runInterruptible
 | 
			
		||||
import rx.Observable
 | 
			
		||||
import rx.schedulers.Schedulers
 | 
			
		||||
import rx.subjects.PublishSubject
 | 
			
		||||
import rx.subjects.SerializedSubject
 | 
			
		||||
import rx.subscriptions.CompositeSubscription
 | 
			
		||||
import uy.kohesive.injekt.Injekt
 | 
			
		||||
import uy.kohesive.injekt.api.get
 | 
			
		||||
import java.util.concurrent.PriorityBlockingQueue
 | 
			
		||||
@@ -31,33 +35,27 @@ class HttpPageLoader(
 | 
			
		||||
    private val chapterCache: ChapterCache = Injekt.get(),
 | 
			
		||||
) : PageLoader() {
 | 
			
		||||
 | 
			
		||||
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * A queue used to manage requests one by one while allowing priorities.
 | 
			
		||||
     */
 | 
			
		||||
    private val queue = PriorityBlockingQueue<PriorityPage>()
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Current active subscriptions.
 | 
			
		||||
     */
 | 
			
		||||
    private val subscriptions = CompositeSubscription()
 | 
			
		||||
 | 
			
		||||
    private val preloadSize = 4
 | 
			
		||||
 | 
			
		||||
    init {
 | 
			
		||||
        subscriptions += Observable.defer { Observable.just(queue.take().page) }
 | 
			
		||||
            .filter { it.status == Page.State.QUEUE }
 | 
			
		||||
            .concatMap { source.fetchImageFromCacheThenNet(it) }
 | 
			
		||||
            .repeat()
 | 
			
		||||
            .subscribeOn(Schedulers.io())
 | 
			
		||||
            .subscribe(
 | 
			
		||||
                {
 | 
			
		||||
                },
 | 
			
		||||
                { error ->
 | 
			
		||||
                    if (error !is InterruptedException) {
 | 
			
		||||
                        logcat(LogPriority.ERROR, error)
 | 
			
		||||
                    }
 | 
			
		||||
                },
 | 
			
		||||
            )
 | 
			
		||||
        scope.launchIO {
 | 
			
		||||
            flow {
 | 
			
		||||
                while (true) {
 | 
			
		||||
                    emit(runInterruptible { queue.take() }.page)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
                .filter { it.status == Page.State.QUEUE }
 | 
			
		||||
                .collect {
 | 
			
		||||
                    loadPage(it)
 | 
			
		||||
                }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@@ -65,21 +63,23 @@ class HttpPageLoader(
 | 
			
		||||
     */
 | 
			
		||||
    override fun recycle() {
 | 
			
		||||
        super.recycle()
 | 
			
		||||
        subscriptions.unsubscribe()
 | 
			
		||||
        scope.cancel()
 | 
			
		||||
        queue.clear()
 | 
			
		||||
 | 
			
		||||
        // Cache current page list progress for online chapters to allow a faster reopen
 | 
			
		||||
        val pages = chapter.pages
 | 
			
		||||
        if (pages != null) {
 | 
			
		||||
            Completable
 | 
			
		||||
                .fromAction {
 | 
			
		||||
            launchIO {
 | 
			
		||||
                try {
 | 
			
		||||
                    // Convert to pages without reader information
 | 
			
		||||
                    val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
 | 
			
		||||
                    chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
 | 
			
		||||
                } catch (e: Throwable) {
 | 
			
		||||
                    if (e is CancellationException) {
 | 
			
		||||
                        throw e
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                .onErrorComplete()
 | 
			
		||||
                .subscribeOn(Schedulers.io())
 | 
			
		||||
                .subscribe()
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -192,61 +192,32 @@ class HttpPageLoader(
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns an observable of the page with the downloaded image.
 | 
			
		||||
     * Loads the page, retrieving the image URL and downloading the image if necessary.
 | 
			
		||||
     * Downloaded images are stored in the chapter cache.
 | 
			
		||||
     *
 | 
			
		||||
     * @param page the page whose source image has to be downloaded.
 | 
			
		||||
     */
 | 
			
		||||
    private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> {
 | 
			
		||||
        return if (page.imageUrl.isNullOrEmpty()) {
 | 
			
		||||
            getImageUrl(page).flatMap { getCachedImage(it) }
 | 
			
		||||
        } else {
 | 
			
		||||
            getCachedImage(page)
 | 
			
		||||
    private suspend fun loadPage(page: ReaderPage) {
 | 
			
		||||
        try {
 | 
			
		||||
            if (page.imageUrl.isNullOrEmpty()) {
 | 
			
		||||
                page.status = Page.State.LOAD_PAGE
 | 
			
		||||
                page.imageUrl = source.fetchImageUrl(page).awaitSingle()
 | 
			
		||||
            }
 | 
			
		||||
            val imageUrl = page.imageUrl!!
 | 
			
		||||
 | 
			
		||||
            if (!chapterCache.isImageInCache(imageUrl)) {
 | 
			
		||||
                page.status = Page.State.DOWNLOAD_IMAGE
 | 
			
		||||
                val imageResponse = source.fetchImage(page).awaitSingle()
 | 
			
		||||
                chapterCache.putImageToCache(imageUrl, imageResponse)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
 | 
			
		||||
            page.status = Page.State.READY
 | 
			
		||||
        } catch (e: Throwable) {
 | 
			
		||||
            page.status = Page.State.ERROR
 | 
			
		||||
            if (e is CancellationException) {
 | 
			
		||||
                throw e
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
 | 
			
		||||
        page.status = Page.State.LOAD_PAGE
 | 
			
		||||
        return fetchImageUrl(page)
 | 
			
		||||
            .doOnError { page.status = Page.State.ERROR }
 | 
			
		||||
            .onErrorReturn { null }
 | 
			
		||||
            .doOnNext { page.imageUrl = it }
 | 
			
		||||
            .map { page }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns an observable of the page that gets the image from the chapter or fallbacks to
 | 
			
		||||
     * network and copies it to the cache calling [cacheImage].
 | 
			
		||||
     *
 | 
			
		||||
     * @param page the page.
 | 
			
		||||
     */
 | 
			
		||||
    private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
 | 
			
		||||
        val imageUrl = page.imageUrl ?: return Observable.just(page)
 | 
			
		||||
 | 
			
		||||
        return Observable.just(page)
 | 
			
		||||
            .flatMap {
 | 
			
		||||
                if (!chapterCache.isImageInCache(imageUrl)) {
 | 
			
		||||
                    cacheImage(page)
 | 
			
		||||
                } else {
 | 
			
		||||
                    Observable.just(page)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            .doOnNext {
 | 
			
		||||
                page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
 | 
			
		||||
                page.status = Page.State.READY
 | 
			
		||||
            }
 | 
			
		||||
            .doOnError { page.status = Page.State.ERROR }
 | 
			
		||||
            .onErrorReturn { page }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns an observable of the page that downloads the image to [ChapterCache].
 | 
			
		||||
     *
 | 
			
		||||
     * @param page the page.
 | 
			
		||||
     */
 | 
			
		||||
    private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
 | 
			
		||||
        page.status = Page.State.DOWNLOAD_IMAGE
 | 
			
		||||
        return fetchImage(page)
 | 
			
		||||
            .doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) }
 | 
			
		||||
            .map { page }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user