From 86b9d7e843c90c37f7e7374a20cbbcbf89caf10d Mon Sep 17 00:00:00 2001 From: arkon Date: Sat, 23 Jan 2021 11:20:16 -0500 Subject: [PATCH] Remove usage of RxJava from LibraryUpdateService --- .../data/library/LibraryUpdateService.kt | 259 ++++++++---------- 1 file changed, 114 insertions(+), 145 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt b/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt index 9450c0d1fb..fb6f9b4d2d 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt @@ -27,15 +27,16 @@ import eu.kanade.tachiyomi.source.model.toSChapter import eu.kanade.tachiyomi.source.model.toSManga import eu.kanade.tachiyomi.util.chapter.NoChaptersException import eu.kanade.tachiyomi.util.chapter.syncChaptersWithSource -import eu.kanade.tachiyomi.util.lang.runAsObservable +import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.prepUpdateCover import eu.kanade.tachiyomi.util.shouldDownloadNewChapters import eu.kanade.tachiyomi.util.storage.getUriCompat import eu.kanade.tachiyomi.util.system.acquireWakeLock import eu.kanade.tachiyomi.util.system.isServiceRunning -import rx.Observable -import rx.Subscription -import rx.schedulers.Schedulers +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.MainScope +import kotlinx.coroutines.cancel import timber.log.Timber import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get @@ -59,17 +60,11 @@ class LibraryUpdateService( val coverCache: CoverCache = Injekt.get() ) : Service() { - /** - * Wake lock that will be held until the service is destroyed. - */ private lateinit var wakeLock: PowerManager.WakeLock - private lateinit var notifier: LibraryUpdateNotifier + private lateinit var scope: CoroutineScope - /** - * Subscription where the update is done. - */ - private var subscription: Subscription? = null + private var updateJob: Job? = null /** * Defines what should be updated within a service execution. @@ -144,6 +139,7 @@ class LibraryUpdateService( override fun onCreate() { super.onCreate() + scope = MainScope() notifier = LibraryUpdateNotifier(this) wakeLock = acquireWakeLock(javaClass.name) @@ -155,7 +151,8 @@ class LibraryUpdateService( * lock. */ override fun onDestroy() { - subscription?.unsubscribe() + scope?.cancel() + updateJob?.cancel() if (wakeLock.isHeld) { wakeLock.release() } @@ -183,34 +180,27 @@ class LibraryUpdateService( ?: return START_NOT_STICKY // Unsubscribe from any previous subscription if needed. - subscription?.unsubscribe() + updateJob?.cancel() // Update favorite manga. Destroy service when completed or in case of an error. - subscription = Observable - .defer { - val selectedScheme = preferences.libraryUpdatePrioritization().get() - val mangaList = getMangaToUpdate(intent, target) - .sortedWith(rankingScheme[selectedScheme]) + val selectedScheme = preferences.libraryUpdatePrioritization().get() + val mangaList = getMangaToUpdate(intent, target) + .sortedWith(rankingScheme[selectedScheme]) - // Update either chapter list or manga details. + updateJob = scope.launchIO { + try { when (target) { Target.CHAPTERS -> updateChapterList(mangaList) Target.COVERS -> updateCovers(mangaList) Target.TRACKING -> updateTrackings(mangaList) } + } catch (e: Throwable) { + Timber.e(e) + stopSelf(startId) + } finally { + stopSelf(startId) } - .subscribeOn(Schedulers.io()) - .subscribe( - { - }, - { - Timber.e(it) - stopSelf(startId) - }, - { - stopSelf(startId) - } - ) + } return START_REDELIVER_INTENT } @@ -253,7 +243,7 @@ class LibraryUpdateService( * @param mangaToUpdate the list to update * @return an observable delivering the progress of each update. */ - fun updateChapterList(mangaToUpdate: List): Observable { + suspend fun updateChapterList(mangaToUpdate: List) { // Initialize the variables holding the progress of the updates. val count = AtomicInteger(0) // List containing new updates @@ -263,67 +253,60 @@ class LibraryUpdateService( // Boolean to determine if DownloadManager has downloads var hasDownloads = false - // Emit each manga and update it sequentially. - return Observable.from(mangaToUpdate) - // Notify manga that will update. - .doOnNext { notifier.showProgressNotification(it, count.andIncrement, mangaToUpdate.size) } - // Update the chapters of the manga - .concatMap { manga -> - updateManga(manga) + mangaToUpdate + .map { manga -> + // Notify manga that will update. + notifier.showProgressNotification(manga, count.andIncrement, mangaToUpdate.size) + + // Update the chapters of the manga + try { + val newChapters = updateManga(manga).first + Pair(manga, newChapters) + } catch (e: Throwable) { // If there's any error, return empty update and continue. - .onErrorReturn { - val errorMessage = if (it is NoChaptersException) { - getString(R.string.no_chapters_error) - } else { - it.message - } - failedUpdates.add(Pair(manga, errorMessage)) - Pair(emptyList(), emptyList()) - } - // Filter out mangas without new chapters (or failed). - .filter { (first) -> first.isNotEmpty() } - .doOnNext { - if (manga.shouldDownloadNewChapters(db, preferences)) { - downloadChapters(manga, it.first) - hasDownloads = true - } - } - // Convert to the manga that contains new chapters. - .map { - Pair( - manga, - ( - it.first.sortedByDescending { ch -> ch.source_order } - .toTypedArray() - ) - ) + val errorMessage = if (e is NoChaptersException) { + getString(R.string.no_chapters_error) + } else { + e.message } + failedUpdates.add(Pair(manga, errorMessage)) + Pair(manga, emptyList()) + } } - // Add manga with new chapters to the list. - .doOnNext { manga -> - // Add to the list - newUpdates.add(manga) - } - // Notify result of the overall update. - .doOnCompleted { - notifier.cancelProgressNotification() - - if (newUpdates.isNotEmpty()) { - notifier.showUpdateNotifications(newUpdates) - if (hasDownloads) { - DownloadService.start(this) - } + // Filter out mangas without new chapters (or failed). + .filter { (_, newChapters) -> newChapters.isNotEmpty() } + .forEach { (manga, newChapters) -> + if (manga.shouldDownloadNewChapters(db, preferences)) { + downloadChapters(manga, newChapters) + hasDownloads = true } - if (preferences.showLibraryUpdateErrors() && failedUpdates.isNotEmpty()) { - val errorFile = writeErrorFile(failedUpdates) - notifier.showUpdateErrorNotification( - failedUpdates.map { it.first.title }, - errorFile.getUriCompat(this) + // Convert to the manga that contains new chapters. + newUpdates.add( + Pair( + manga, + newChapters.sortedByDescending { ch -> ch.source_order }.toTypedArray() ) - } + ) } - .map { (first) -> first } + + // Notify result of the overall update. + notifier.cancelProgressNotification() + + if (newUpdates.isNotEmpty()) { + notifier.showUpdateNotifications(newUpdates) + if (hasDownloads) { + DownloadService.start(this) + } + } + + if (preferences.showLibraryUpdateErrors() && failedUpdates.isNotEmpty()) { + val errorFile = writeErrorFile(failedUpdates) + notifier.showUpdateErrorNotification( + failedUpdates.map { it.first.title }, + errorFile.getUriCompat(this) + ) + } } private fun downloadChapters(manga: Manga, chapters: List) { @@ -338,49 +321,38 @@ class LibraryUpdateService( * @param manga the manga to update. * @return a pair of the inserted and removed chapters. */ - fun updateManga(manga: Manga): Observable, List>> { + suspend fun updateManga(manga: Manga): Pair, List> { val source = sourceManager.getOrStub(manga.source) // Update manga details metadata in the background if (preferences.autoUpdateMetadata()) { - runAsObservable({ - val updatedManga = source.getMangaDetails(manga.toMangaInfo()) - val sManga = updatedManga.toSManga() - // Avoid "losing" existing cover - if (!sManga.thumbnail_url.isNullOrEmpty()) { - manga.prepUpdateCover(coverCache, sManga, false) - } else { - sManga.thumbnail_url = manga.thumbnail_url - } + val updatedManga = source.getMangaDetails(manga.toMangaInfo()) + val sManga = updatedManga.toSManga() + // Avoid "losing" existing cover + if (!sManga.thumbnail_url.isNullOrEmpty()) { + manga.prepUpdateCover(coverCache, sManga, false) + } else { + sManga.thumbnail_url = manga.thumbnail_url + } - manga.copyFrom(sManga) - db.insertManga(manga).executeAsBlocking() - manga - }) - .onErrorResumeNext { Observable.just(manga) } - .subscribeOn(Schedulers.io()) - .subscribe() + manga.copyFrom(sManga) + db.insertManga(manga).executeAsBlocking() } - return runAsObservable({ - source.getChapterList(manga.toMangaInfo()) - .map { it.toSChapter() } - }) - .map { syncChaptersWithSource(db, it, manga, source) } + val chapters = source.getChapterList(manga.toMangaInfo()) + .map { it.toSChapter() } + + return syncChaptersWithSource(db, chapters, manga, source) } - private fun updateCovers(mangaToUpdate: List): Observable { + private suspend fun updateCovers(mangaToUpdate: List) { var count = 0 - return Observable.from(mangaToUpdate) - .doOnNext { - notifier.showProgressNotification(it, count++, mangaToUpdate.size) - } - .flatMap { manga -> - val source = sourceManager.get(manga.source) - ?: return@flatMap Observable.empty() + mangaToUpdate.forEach { manga -> + notifier.showProgressNotification(manga, count++, mangaToUpdate.size) - runAsObservable({ + sourceManager.get(manga.source)?.let { source -> + try { val networkManga = source.getMangaDetails(manga.toMangaInfo()) val sManga = networkManga.toSManga() manga.prepUpdateCover(coverCache, sManga, true) @@ -388,49 +360,46 @@ class LibraryUpdateService( manga.thumbnail_url = it db.insertManga(manga).executeAsBlocking() } - manga - }) - .onErrorReturn { manga } - } - .doOnCompleted { - notifier.cancelProgressNotification() + } catch (e: Throwable) { + // Ignore errors and continue + Timber.e(e) + } } + } + + notifier.cancelProgressNotification() } /** * Method that updates the metadata of the connected tracking services. It's called in a * background thread, so it's safe to do heavy operations or network calls here. */ - private fun updateTrackings(mangaToUpdate: List): Observable { + private suspend fun updateTrackings(mangaToUpdate: List) { // Initialize the variables holding the progress of the updates. var count = 0 val loggedServices = trackManager.services.filter { it.isLogged } - // Emit each manga and update it sequentially. - return Observable.from(mangaToUpdate) + mangaToUpdate.forEach { manga -> // Notify manga that will update. - .doOnNext { notifier.showProgressNotification(it, count++, mangaToUpdate.size) } - // Update the tracking details. - .concatMap { manga -> - val tracks = db.getTracks(manga).executeAsBlocking() + notifier.showProgressNotification(manga, count++, mangaToUpdate.size) - Observable.from(tracks) - .concatMap { track -> - val service = trackManager.getService(track.sync_id) - if (service != null && service in loggedServices) { - runAsObservable({ service.refresh(track) }) - .doOnNext { db.insertTrack(it).executeAsBlocking() } - .onErrorReturn { track } - } else { - Observable.empty() - } + // Update the tracking details. + db.getTracks(manga).executeAsBlocking().forEach { track -> + val service = trackManager.getService(track.sync_id) + if (service != null && service in loggedServices) { + try { + val updatedTrack = service.refresh(track) + db.insertTrack(updatedTrack).executeAsBlocking() + } catch (e: Throwable) { + // Ignore errors and continue + Timber.e(e) } - .map { manga } - } - .doOnCompleted { - notifier.cancelProgressNotification() + } } + } + + notifier.cancelProgressNotification() } /**