Replace RxJava in DownloadQueueScreenModel (#8872)
This commit is contained in:
parent
46774771ec
commit
2245658363
@ -18,9 +18,8 @@ data class Download(
|
||||
var pages: List<Page>? = null,
|
||||
) {
|
||||
|
||||
@Volatile
|
||||
@Transient
|
||||
var totalProgress: Int = 0
|
||||
val totalProgress: Int
|
||||
get() = pages?.sumOf(Page::progress) ?: 0
|
||||
|
||||
@Volatile
|
||||
@Transient
|
||||
|
@ -11,19 +11,21 @@ import eu.kanade.tachiyomi.data.download.model.Download
|
||||
import eu.kanade.tachiyomi.databinding.DownloadListBinding
|
||||
import eu.kanade.tachiyomi.source.model.Page
|
||||
import eu.kanade.tachiyomi.util.system.logcat
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.update
|
||||
import kotlinx.coroutines.launch
|
||||
import logcat.LogPriority
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class DownloadQueueScreenModel(
|
||||
private val downloadManager: DownloadManager = Injekt.get(),
|
||||
@ -40,9 +42,9 @@ class DownloadQueueScreenModel(
|
||||
var adapter: DownloadAdapter? = null
|
||||
|
||||
/**
|
||||
* Map of subscriptions for active downloads.
|
||||
* Map of jobs for active downloads.
|
||||
*/
|
||||
val progressSubscriptions by lazy { mutableMapOf<Download, Subscription>() }
|
||||
private val progressJobs = mutableMapOf<Download, Job>()
|
||||
|
||||
val listener = object : DownloadAdapter.DownloadItemListener {
|
||||
/**
|
||||
@ -130,10 +132,10 @@ class DownloadQueueScreenModel(
|
||||
}
|
||||
|
||||
override fun onDispose() {
|
||||
for (subscription in progressSubscriptions.values) {
|
||||
subscription.unsubscribe()
|
||||
for (job in progressJobs.values) {
|
||||
job.cancel()
|
||||
}
|
||||
progressSubscriptions.clear()
|
||||
progressJobs.clear()
|
||||
adapter = null
|
||||
}
|
||||
|
||||
@ -180,16 +182,16 @@ class DownloadQueueScreenModel(
|
||||
fun onStatusChange(download: Download) {
|
||||
when (download.status) {
|
||||
Download.State.DOWNLOADING -> {
|
||||
observeProgress(download)
|
||||
launchProgressJob(download)
|
||||
// Initial update of the downloaded pages
|
||||
onUpdateDownloadedPages(download)
|
||||
}
|
||||
Download.State.DOWNLOADED -> {
|
||||
unsubscribeProgress(download)
|
||||
cancelProgressJob(download)
|
||||
onUpdateProgress(download)
|
||||
onUpdateDownloadedPages(download)
|
||||
}
|
||||
Download.State.ERROR -> unsubscribeProgress(download)
|
||||
Download.State.ERROR -> cancelProgressJob(download)
|
||||
else -> {
|
||||
/* unused */
|
||||
}
|
||||
@ -201,29 +203,25 @@ class DownloadQueueScreenModel(
|
||||
*
|
||||
* @param download the download to observe its progress.
|
||||
*/
|
||||
private fun observeProgress(download: Download) {
|
||||
val subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
|
||||
// Get the sum of percentages for all the pages.
|
||||
.flatMap {
|
||||
Observable.from(download.pages)
|
||||
.map(Page::progress)
|
||||
.reduce { x, y -> x + y }
|
||||
private fun launchProgressJob(download: Download) {
|
||||
val job = coroutineScope.launch {
|
||||
while (download.pages == null) {
|
||||
delay(50)
|
||||
}
|
||||
// Keep only the latest emission to avoid backpressure.
|
||||
.onBackpressureLatest()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe { progress ->
|
||||
// Update the view only if the progress has changed.
|
||||
if (download.totalProgress != progress) {
|
||||
download.totalProgress = progress
|
||||
|
||||
val progressFlows = download.pages!!.map(Page::progressFlow)
|
||||
combine(progressFlows, Array<Int>::sum)
|
||||
.distinctUntilChanged()
|
||||
.debounce(50)
|
||||
.collectLatest {
|
||||
onUpdateProgress(download)
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid leaking subscriptions
|
||||
progressSubscriptions.remove(download)?.unsubscribe()
|
||||
// Avoid leaking jobs
|
||||
progressJobs.remove(download)?.cancel()
|
||||
|
||||
progressSubscriptions[download] = subscription
|
||||
progressJobs[download] = job
|
||||
}
|
||||
|
||||
/**
|
||||
@ -231,8 +229,8 @@ class DownloadQueueScreenModel(
|
||||
*
|
||||
* @param download the download to unsubscribe.
|
||||
*/
|
||||
private fun unsubscribeProgress(download: Download) {
|
||||
progressSubscriptions.remove(download)?.unsubscribe()
|
||||
private fun cancelProgressJob(download: Download) {
|
||||
progressJobs.remove(download)?.cancel()
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user