Convert refrshing manga metadata to coroutines + 5 sources concurrently

Sorry and good luck Carlos 😔
This commit is contained in:
Jay 2020-05-09 01:07:56 -04:00
parent f3d4e87542
commit 2bbef55737
2 changed files with 54 additions and 83 deletions

View File

@ -31,6 +31,7 @@ import eu.kanade.tachiyomi.data.preference.PreferencesHelper
import eu.kanade.tachiyomi.data.preference.getOrDefault import eu.kanade.tachiyomi.data.preference.getOrDefault
import eu.kanade.tachiyomi.data.track.TrackManager import eu.kanade.tachiyomi.data.track.TrackManager
import eu.kanade.tachiyomi.source.SourceManager import eu.kanade.tachiyomi.source.SourceManager
import eu.kanade.tachiyomi.source.fetchMangaDetailsAsync
import eu.kanade.tachiyomi.source.model.SManga import eu.kanade.tachiyomi.source.model.SManga
import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.source.online.HttpSource
import eu.kanade.tachiyomi.ui.main.MainActivity import eu.kanade.tachiyomi.ui.main.MainActivity
@ -50,9 +51,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import rx.Observable
import rx.Subscription
import rx.schedulers.Schedulers
import timber.log.Timber import timber.log.Timber
import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get import uy.kohesive.injekt.api.get
@ -82,11 +80,6 @@ class LibraryUpdateService(
*/ */
private lateinit var wakeLock: PowerManager.WakeLock private lateinit var wakeLock: PowerManager.WakeLock
/**
* Subscription where the update is done.
*/
private var subscription: Subscription? = null
/** /**
* Pending intent of action that cancels the library update * Pending intent of action that cancels the library update
*/ */
@ -318,14 +311,12 @@ class LibraryUpdateService(
} }
/** /**
* Method called when the service is destroyed. It destroys subscriptions and releases the wake * Method called when the service is destroyed. It cancels jobs and releases the wake lock.
* lock.
*/ */
override fun onDestroy() { override fun onDestroy() {
job?.cancel() job?.cancel()
if (instance == this) if (instance == this)
instance = null instance = null
subscription?.unsubscribe()
if (wakeLock.isHeld) { if (wakeLock.isHeld) {
wakeLock.release() wakeLock.release()
} }
@ -350,30 +341,14 @@ class LibraryUpdateService(
*/ */
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
if (intent == null) return START_NOT_STICKY if (intent == null) return START_NOT_STICKY
val target = intent.getSerializableExtra(KEY_TARGET) as? Target val target = intent.getSerializableExtra(KEY_TARGET) as? Target ?: return START_NOT_STICKY
?: return START_NOT_STICKY
// Unsubscribe from any previous subscription if needed.
subscription?.unsubscribe()
instance = this instance = this
val selectedScheme = preferences.libraryUpdatePrioritization().getOrDefault() val selectedScheme = preferences.libraryUpdatePrioritization().getOrDefault()
val mangaList = val mangaList = getMangaToUpdate(intent, target).sortedWith(rankingScheme[selectedScheme])
getMangaToUpdate(intent, target).sortedWith(rankingScheme[selectedScheme])
// Update favorite manga. Destroy service when completed or in case of an error. // Update favorite manga. Destroy service when completed or in case of an error.
if (target == Target.DETAILS) {
// Update either chapter list or manga details.
subscription = Observable.defer {
updateDetails(mangaList)
}.subscribeOn(Schedulers.io()).subscribe({}, {
Timber.e(it)
stopSelf(startId)
}, {
stopSelf(startId)
})
} else {
launchTarget(target, mangaList, startId) launchTarget(target, mangaList, startId)
}
return START_REDELIVER_INTENT return START_REDELIVER_INTENT
} }
@ -382,16 +357,20 @@ class LibraryUpdateService(
Timber.e(exception) Timber.e(exception)
stopSelf(startId) stopSelf(startId)
} }
job = if (target == Target.CHAPTERS) { job = GlobalScope.launch(handler) {
when (target) {
Target.CHAPTERS -> {
listener?.onUpdateManga(LibraryManga()) listener?.onUpdateManga(LibraryManga())
GlobalScope.launch(handler) {
updateChaptersJob(mangaToAdd) updateChaptersJob(mangaToAdd)
} }
} else { Target.DETAILS -> {
GlobalScope.launch(handler) { updateDetails(mangaToAdd)
}
else -> {
updateTrackings(mangaToAdd) updateTrackings(mangaToAdd)
} }
} }
}
job?.invokeOnCompletion { stopSelf(startId) } job?.invokeOnCompletion { stopSelf(startId) }
} }
@ -417,18 +396,17 @@ class LibraryUpdateService(
} }
} }
private fun finishUpdates() { private suspend fun finishUpdates() {
if (jobCount.get() != 0) return if (jobCount.get() != 0) return
if (newUpdates.isNotEmpty()) { if (newUpdates.isNotEmpty()) {
showResultNotification(newUpdates) showResultNotification(newUpdates)
if (preferences.refreshCoversToo().getOrDefault() && job?.isCancelled == false) { if (preferences.refreshCoversToo().getOrDefault() && job?.isCancelled == false) {
updateDetails(newUpdates.map { it.key }).observeOn(Schedulers.io()).doOnCompleted { updateDetails(newUpdates.keys.toList())
cancelProgressNotification() cancelProgressNotification()
if (downloadNew && hasDownloads) { if (downloadNew && hasDownloads) {
DownloadService.start(this) DownloadService.start(this)
} }
}.subscribeOn(Schedulers.io()).subscribe {}
} else if (downloadNew && hasDownloads) { } else if (downloadNew && hasDownloads) {
DownloadService.start(this) DownloadService.start(this)
} }
@ -517,52 +495,47 @@ class LibraryUpdateService(
downloadManager.downloadChapters(manga, dbChapters, false) downloadManager.downloadChapters(manga, dbChapters, false)
} }
/**
* Updates the chapters for the given manga and adds them to the database.
*
* @param manga the manga to update.
* @return a pair of the inserted and removed chapters.
*/
fun updateManga(manga: Manga): Observable<Pair<List<Chapter>, List<Chapter>>> {
val source = sourceManager.get(manga.source) as? HttpSource ?: return Observable.empty()
return source.fetchChapterList(manga)
.map { syncChaptersWithSource(db, it, manga, source) }
}
/** /**
* Method that updates the details of the given list of manga. It's called in a background * Method that updates the details of the given list of manga. It's called in a background
* thread, so it's safe to do heavy operations or network calls here. * thread, so it's safe to do heavy operations or network calls here.
* *
* @param mangaToUpdate the list to update * @param mangaToUpdate the list to update
* @return an observable delivering the progress of each update.
*/ */
fun updateDetails(mangaToUpdate: List<LibraryManga>): Observable<LibraryManga> { suspend fun updateDetails(mangaToUpdate: List<LibraryManga>) = coroutineScope {
// Initialize the variables holding the progress of the updates. // Initialize the variables holding the progress of the updates.
val count = AtomicInteger(0) val count = AtomicInteger(0)
val asyncList = mangaToUpdate.groupBy { it.source }.values.map { list ->
async {
requestSemaphore.withPermit {
list.forEach { manga ->
if (job?.isCancelled == true) {
return@async
}
val source = sourceManager.get(manga.source) as? HttpSource ?: return@async
showProgressNotification(manga, count.andIncrement, mangaToUpdate.size)
// Emit each manga and update it sequentially. val networkManga = try {
return Observable.from(mangaToUpdate) source.fetchMangaDetailsAsync(manga)
// Notify manga that will update. } catch (e: java.lang.Exception) {
.doOnNext { showProgressNotification(it, count.andIncrement, mangaToUpdate.size) } Timber.e(e)
// Update the details of the manga. null
.concatMap { manga -> }
val source = sourceManager.get(manga.source) as? HttpSource if (networkManga != null) {
?: return@concatMap Observable.empty<LibraryManga>()
source.fetchMangaDetails(manga)
.map { networkManga ->
val thumbnailUrl = manga.thumbnail_url val thumbnailUrl = manga.thumbnail_url
manga.copyFrom(networkManga) manga.copyFrom(networkManga)
manga.initialized = true
db.insertManga(manga).executeAsBlocking() db.insertManga(manga).executeAsBlocking()
if (thumbnailUrl != networkManga.thumbnail_url) if (thumbnailUrl != networkManga.thumbnail_url && !manga.hasCustomCover()) {
MangaImpl.setLastCoverFetch(manga.id!!, Date().time) MangaImpl.setLastCoverFetch(manga.id!!, Date().time)
manga
} }
.onErrorReturn { manga }
} }
.doOnCompleted { }
}
}
}
asyncList.awaitAll()
cancelProgressNotification() cancelProgressNotification()
} }
}
/** /**
* Method that updates the metadata of the connected tracking services. It's called in a * Method that updates the metadata of the connected tracking services. It's called in a

View File

@ -78,13 +78,11 @@ class LibraryUpdateServiceTest {
`when`(source.fetchChapterList(manga)).thenReturn(Observable.just(sourceChapters)) `when`(source.fetchChapterList(manga)).thenReturn(Observable.just(sourceChapters))
service.updateManga(manga).subscribe()
assertThat(service.db.getChapters(manga).executeAsBlocking()).hasSize(2) assertThat(service.db.getChapters(manga).executeAsBlocking()).hasSize(2)
} }
@Test @Test
fun testContinuesUpdatingWhenAMangaFails() { suspend fun testContinuesUpdatingWhenAMangaFails() {
var favManga = createManga("/manga1", "/manga2", "/manga3") var favManga = createManga("/manga1", "/manga2", "/manga3")
service.db.insertMangas(favManga).executeAsBlocking() service.db.insertMangas(favManga).executeAsBlocking()
favManga = service.db.getLibraryMangas().executeAsBlocking() favManga = service.db.getLibraryMangas().executeAsBlocking()
@ -99,7 +97,7 @@ class LibraryUpdateServiceTest {
val intent = Intent() val intent = Intent()
val target = LibraryUpdateService.Target.CHAPTERS val target = LibraryUpdateService.Target.CHAPTERS
service.updateDetails(favManga).subscribe() service.updateDetails(favManga)
// There are 3 network attempts and 2 insertions (1 request failed) // There are 3 network attempts and 2 insertions (1 request failed)
assertThat(service.db.getChapters(favManga[0]).executeAsBlocking()).hasSize(2) assertThat(service.db.getChapters(favManga[0]).executeAsBlocking()).hasSize(2)