mirror of
https://github.com/mihonapp/mihon.git
synced 2025-06-30 21:17:50 +02:00
Add automatic gallery updating
This commit is contained in:
@ -1,5 +1,6 @@
|
||||
package exh
|
||||
|
||||
import com.elvishew.xlog.XLog
|
||||
import com.pushtorefresh.storio.sqlite.queries.Query
|
||||
import com.pushtorefresh.storio.sqlite.queries.RawQuery
|
||||
import eu.kanade.tachiyomi.data.backup.models.DHistory
|
||||
@ -20,6 +21,8 @@ import java.net.URISyntaxException
|
||||
object EXHMigrations {
|
||||
private val db: DatabaseHelper by injectLazy()
|
||||
|
||||
private val logger = XLog.tag("EXHMigrations")
|
||||
|
||||
private const val CURRENT_MIGRATION_VERSION = 1
|
||||
|
||||
/**
|
||||
@ -31,45 +34,49 @@ object EXHMigrations {
|
||||
fun upgrade(preferences: PreferencesHelper): Boolean {
|
||||
val context = preferences.context
|
||||
val oldVersion = preferences.eh_lastVersionCode().getOrDefault()
|
||||
if (oldVersion < CURRENT_MIGRATION_VERSION) {
|
||||
preferences.eh_lastVersionCode().set(CURRENT_MIGRATION_VERSION)
|
||||
|
||||
if(oldVersion < 1) {
|
||||
db.inTransaction {
|
||||
// Migrate HentaiCafe source IDs
|
||||
db.lowLevel().executeSQL(RawQuery.builder()
|
||||
.query("""
|
||||
try {
|
||||
if (oldVersion < CURRENT_MIGRATION_VERSION) {
|
||||
if (oldVersion < 1) {
|
||||
db.inTransaction {
|
||||
// Migrate HentaiCafe source IDs
|
||||
db.lowLevel().executeSQL(RawQuery.builder()
|
||||
.query("""
|
||||
UPDATE ${MangaTable.TABLE}
|
||||
SET ${MangaTable.COL_SOURCE} = $HENTAI_CAFE_SOURCE_ID
|
||||
WHERE ${MangaTable.COL_SOURCE} = 6908
|
||||
""".trimIndent())
|
||||
.affectsTables(MangaTable.TABLE)
|
||||
.build())
|
||||
.affectsTables(MangaTable.TABLE)
|
||||
.build())
|
||||
|
||||
// Migrate nhentai URLs
|
||||
val nhentaiManga = db.db.get()
|
||||
.listOfObjects(Manga::class.java)
|
||||
.withQuery(Query.builder()
|
||||
.table(MangaTable.TABLE)
|
||||
.where("${MangaTable.COL_SOURCE} = $NHENTAI_SOURCE_ID")
|
||||
.build())
|
||||
.prepare()
|
||||
.executeAsBlocking()
|
||||
// Migrate nhentai URLs
|
||||
val nhentaiManga = db.db.get()
|
||||
.listOfObjects(Manga::class.java)
|
||||
.withQuery(Query.builder()
|
||||
.table(MangaTable.TABLE)
|
||||
.where("${MangaTable.COL_SOURCE} = $NHENTAI_SOURCE_ID")
|
||||
.build())
|
||||
.prepare()
|
||||
.executeAsBlocking()
|
||||
|
||||
nhentaiManga.forEach {
|
||||
it.url = getUrlWithoutDomain(it.url)
|
||||
nhentaiManga.forEach {
|
||||
it.url = getUrlWithoutDomain(it.url)
|
||||
}
|
||||
|
||||
db.db.put()
|
||||
.objects(nhentaiManga)
|
||||
// Extremely slow without the resolver :/
|
||||
.withPutResolver(MangaUrlPutResolver())
|
||||
.prepare()
|
||||
.executeAsBlocking()
|
||||
}
|
||||
|
||||
db.db.put()
|
||||
.objects(nhentaiManga)
|
||||
// Extremely slow without the resolver :/
|
||||
.withPutResolver(MangaUrlPutResolver())
|
||||
.prepare()
|
||||
.executeAsBlocking()
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
preferences.eh_lastVersionCode().set(CURRENT_MIGRATION_VERSION)
|
||||
|
||||
return true
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
logger.e( "Failed to migrate app from $oldVersion -> $CURRENT_MIGRATION_VERSION!", e)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -61,7 +61,8 @@ class GalleryAdder {
|
||||
|
||||
fun addGallery(url: String,
|
||||
fav: Boolean = false,
|
||||
forceSource: Long? = null): GalleryAddEvent {
|
||||
forceSource: Long? = null,
|
||||
throttleFunc: () -> Unit = {}): GalleryAddEvent {
|
||||
XLog.d("Importing gallery (url: %s, fav: %s, forceSource: %s)...", url, fav, forceSource)
|
||||
try {
|
||||
val urlObj = Uri.parse(url)
|
||||
@ -167,7 +168,6 @@ class GalleryAdder {
|
||||
// Fetch and copy details
|
||||
val newManga = sourceObj.fetchMangaDetails(manga).toBlocking().first()
|
||||
manga.copyFrom(newManga)
|
||||
manga.title = newManga.title //Forcibly copy title as copyFrom does not copy title
|
||||
manga.initialized = true
|
||||
|
||||
if (fav) manga.favorite = true
|
||||
@ -180,13 +180,13 @@ class GalleryAdder {
|
||||
syncChaptersWithSource(db, it, manga, sourceObj)
|
||||
}.toBlocking().first()
|
||||
} catch (e: Exception) {
|
||||
XLog.w("Failed to update chapters for gallery: %s!", manga.title)
|
||||
XLog.w("Failed to update chapters for gallery: ${manga.title}!", e)
|
||||
return GalleryAddEvent.Fail.Error(url, "Failed to update chapters for gallery: $url")
|
||||
}
|
||||
|
||||
return GalleryAddEvent.Success(url, manga)
|
||||
} catch(e: Exception) {
|
||||
XLog.w("Could not add gallery!", e)
|
||||
XLog.w("Could not add gallery (url: $url)!", e)
|
||||
|
||||
if(e is EHentai.GalleryNotFoundException) {
|
||||
return GalleryAddEvent.Fail.NotFound(url)
|
||||
|
@ -1,5 +1,7 @@
|
||||
package exh.debug
|
||||
|
||||
import android.app.Application
|
||||
import android.os.Build
|
||||
import com.pushtorefresh.storio.sqlite.queries.RawQuery
|
||||
import eu.kanade.tachiyomi.data.database.DatabaseHelper
|
||||
import eu.kanade.tachiyomi.data.database.tables.MangaTable
|
||||
@ -7,9 +9,11 @@ import eu.kanade.tachiyomi.data.preference.PreferencesHelper
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
import exh.EH_SOURCE_ID
|
||||
import exh.EXH_SOURCE_ID
|
||||
import exh.eh.EHentaiUpdateWorker
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
|
||||
object DebugFunctions {
|
||||
val app: Application by injectLazy()
|
||||
val db: DatabaseHelper by injectLazy()
|
||||
val prefs: PreferencesHelper by injectLazy()
|
||||
val sourceManager: SourceManager by injectLazy()
|
||||
@ -48,6 +52,14 @@ object DebugFunctions {
|
||||
|
||||
fun convertAllExhentaiGalleriesToEhentai() = convertSources(EXH_SOURCE_ID, EH_SOURCE_ID)
|
||||
|
||||
fun testLaunchBackgroundUpdater() {
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) {
|
||||
EHentaiUpdateWorker.launchBackgroundTest(app)
|
||||
} else {
|
||||
error("OS/SDK version too old!")
|
||||
}
|
||||
}
|
||||
|
||||
private fun convertSources(from: Long, to: Long) {
|
||||
db.lowLevel().executeSQL(RawQuery.builder()
|
||||
.query("""
|
||||
|
137
app/src/main/java/exh/eh/EHentaiUpdateHelper.kt
Normal file
137
app/src/main/java/exh/eh/EHentaiUpdateHelper.kt
Normal file
@ -0,0 +1,137 @@
|
||||
package exh.eh
|
||||
|
||||
import android.content.Context
|
||||
import eu.kanade.tachiyomi.data.database.DatabaseHelper
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter
|
||||
import eu.kanade.tachiyomi.data.database.models.ChapterImpl
|
||||
import eu.kanade.tachiyomi.data.database.models.Manga
|
||||
import eu.kanade.tachiyomi.data.database.models.MangaCategory
|
||||
import rx.Observable
|
||||
import rx.Single
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
import java.io.File
|
||||
|
||||
data class ChapterChain(val manga: Manga, val chapters: List<Chapter>)
|
||||
|
||||
class EHentaiUpdateHelper(context: Context) {
|
||||
val parentLookupTable =
|
||||
MemAutoFlushingLookupTable(
|
||||
File(context.filesDir, "exh-plt.maftable"),
|
||||
GalleryEntry.Serializer()
|
||||
)
|
||||
private val db: DatabaseHelper by injectLazy()
|
||||
|
||||
/**
|
||||
* @param chapters Cannot be an empty list!
|
||||
*
|
||||
* @return Pair<Accepted, Discarded>
|
||||
*/
|
||||
fun findAcceptedRootAndDiscardOthers(chapters: List<Chapter>): Single<Pair<ChapterChain, List<ChapterChain>>> {
|
||||
// Find other chains
|
||||
val chainsObservable = Observable.merge(chapters.map { chapter ->
|
||||
db.getChapters(chapter.url).asRxSingle().toObservable()
|
||||
}).toList().map { allChapters ->
|
||||
allChapters.flatMap { innerChapters -> innerChapters.map { it.manga_id!! } }.distinct()
|
||||
}.flatMap { mangaIds ->
|
||||
Observable.merge(
|
||||
mangaIds.map { mangaId ->
|
||||
Single.zip(
|
||||
db.getManga(mangaId).asRxSingle(),
|
||||
db.getChaptersByMangaId(mangaId).asRxSingle()
|
||||
) { manga, chapters ->
|
||||
ChapterChain(manga, chapters)
|
||||
}.toObservable()
|
||||
}
|
||||
)
|
||||
}.toList()
|
||||
|
||||
// Accept oldest chain
|
||||
val chainsWithAccepted = chainsObservable.map { chains ->
|
||||
val acceptedChain = chains.minBy { it.manga.id!! }!!
|
||||
|
||||
acceptedChain to chains
|
||||
}
|
||||
|
||||
return chainsWithAccepted.map { (accepted, chains) ->
|
||||
val toDiscard = chains.filter { it.manga.favorite && it.manga.id != accepted.manga.id }
|
||||
|
||||
if(toDiscard.isNotEmpty()) {
|
||||
// Copy chain chapters to curChapters
|
||||
val newChapters = toDiscard
|
||||
.flatMap { it.chapters }
|
||||
.fold(accepted.chapters) { curChapters, chapter ->
|
||||
val existing = curChapters.find { it.url == chapter.url }
|
||||
|
||||
if (existing != null) {
|
||||
existing.read = existing.read || chapter.read
|
||||
existing.last_page_read = existing.last_page_read.coerceAtLeast(chapter.last_page_read)
|
||||
existing.bookmark = existing.bookmark || chapter.bookmark
|
||||
curChapters
|
||||
} else if (chapter.date_upload > 0) { // Ignore chapters using the old system
|
||||
curChapters + ChapterImpl().apply {
|
||||
manga_id = accepted.manga.id
|
||||
url = chapter.url
|
||||
name = chapter.name
|
||||
read = chapter.read
|
||||
bookmark = chapter.bookmark
|
||||
last_page_read = chapter.last_page_read
|
||||
date_fetch = chapter.date_fetch
|
||||
date_upload = chapter.date_upload
|
||||
}
|
||||
} else curChapters
|
||||
}
|
||||
.filter { it.date_upload <= 0 } // Ignore chapters using the old system (filter after to prevent dupes from insert)
|
||||
.sortedBy { it.date_upload }
|
||||
.apply {
|
||||
withIndex().map { (index, chapter) ->
|
||||
chapter.name = "v${index + 1}: " + chapter.name.substringAfter(" ")
|
||||
chapter.chapter_number = index + 1f
|
||||
chapter.source_order = index
|
||||
}
|
||||
}
|
||||
|
||||
toDiscard.forEach { it.manga.favorite = false }
|
||||
accepted.manga.favorite = true
|
||||
|
||||
val newAccepted = ChapterChain(accepted.manga, newChapters)
|
||||
val rootsToMutate = toDiscard + newAccepted
|
||||
|
||||
db.inTransaction {
|
||||
// Apply changes to all manga
|
||||
db.insertMangas(rootsToMutate.map { it.manga }).executeAsBlocking()
|
||||
// Insert new chapters for accepted manga
|
||||
db.insertChapters(newAccepted.chapters)
|
||||
// Copy categories from all chains to accepted manga
|
||||
val newCategories = rootsToMutate.flatMap {
|
||||
db.getCategoriesForManga(it.manga).executeAsBlocking()
|
||||
}.distinctBy { it.id }.map {
|
||||
MangaCategory.create(newAccepted.manga, it)
|
||||
}
|
||||
db.setMangaCategories(newCategories, rootsToMutate.map { it.manga })
|
||||
}
|
||||
|
||||
newAccepted to toDiscard
|
||||
} else accepted to emptyList()
|
||||
}.toSingle()
|
||||
}
|
||||
}
|
||||
|
||||
data class GalleryEntry(val gId: String, val gToken: String) {
|
||||
class Serializer: MemAutoFlushingLookupTable.EntrySerializer<GalleryEntry> {
|
||||
/**
|
||||
* Serialize an entry as a String.
|
||||
*/
|
||||
override fun write(entry: GalleryEntry) = with(entry) { "$gId:$gToken" }
|
||||
|
||||
/**
|
||||
* Read an entry from a String.
|
||||
*/
|
||||
override fun read(string: String): GalleryEntry {
|
||||
val colonIndex = string.indexOf(':')
|
||||
return GalleryEntry(
|
||||
string.substring(0, colonIndex),
|
||||
string.substring(colonIndex + 1, string.length)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
351
app/src/main/java/exh/eh/EHentaiUpdateWorker.kt
Normal file
351
app/src/main/java/exh/eh/EHentaiUpdateWorker.kt
Normal file
@ -0,0 +1,351 @@
|
||||
package exh.eh
|
||||
|
||||
import android.app.job.JobInfo
|
||||
import android.app.job.JobParameters
|
||||
import android.app.job.JobScheduler
|
||||
import android.app.job.JobService
|
||||
import android.content.ComponentName
|
||||
import android.content.Context
|
||||
import android.os.Build
|
||||
import android.support.annotation.RequiresApi
|
||||
import com.elvishew.xlog.XLog
|
||||
import com.evernote.android.job.JobRequest
|
||||
import com.google.gson.Gson
|
||||
import com.kizitonwose.time.days
|
||||
import com.kizitonwose.time.hours
|
||||
import com.kizitonwose.time.minutes
|
||||
import eu.kanade.tachiyomi.data.database.DatabaseHelper
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter
|
||||
import eu.kanade.tachiyomi.data.database.models.ChapterImpl
|
||||
import eu.kanade.tachiyomi.data.database.models.Manga
|
||||
import eu.kanade.tachiyomi.data.preference.PreferencesHelper
|
||||
import eu.kanade.tachiyomi.data.preference.getOrDefault
|
||||
import eu.kanade.tachiyomi.source.SourceManager
|
||||
import eu.kanade.tachiyomi.source.model.SChapter
|
||||
import eu.kanade.tachiyomi.source.online.all.EHentai
|
||||
import eu.kanade.tachiyomi.util.jobScheduler
|
||||
import eu.kanade.tachiyomi.util.syncChaptersWithSource
|
||||
import exh.EH_SOURCE_ID
|
||||
import exh.EXH_SOURCE_ID
|
||||
import exh.metadata.metadata.EHentaiSearchMetadata
|
||||
import exh.metadata.metadata.base.*
|
||||
import exh.metadata.sql.models.SearchMetadata
|
||||
import exh.util.await
|
||||
import exh.util.awaitSuspending
|
||||
import exh.util.cancellable
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.*
|
||||
import rx.schedulers.Schedulers
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
import java.io.IOException
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
@RequiresApi(Build.VERSION_CODES.LOLLIPOP)
|
||||
class EHentaiUpdateWorker: JobService(), CoroutineScope {
|
||||
override val coroutineContext: CoroutineContext
|
||||
get() = Dispatchers.Default + Job()
|
||||
|
||||
private val db: DatabaseHelper by injectLazy()
|
||||
private val prefs: PreferencesHelper by injectLazy()
|
||||
private val gson: Gson by injectLazy()
|
||||
private val sourceManager: SourceManager by injectLazy()
|
||||
private val updateHelper: EHentaiUpdateHelper by injectLazy()
|
||||
private val logger = XLog.tag("EHUpdater")
|
||||
|
||||
/**
|
||||
* This method is called if the system has determined that you must stop execution of your job
|
||||
* even before you've had a chance to call [.jobFinished].
|
||||
*
|
||||
*
|
||||
* This will happen if the requirements specified at schedule time are no longer met. For
|
||||
* example you may have requested WiFi with
|
||||
* [android.app.job.JobInfo.Builder.setRequiredNetworkType], yet while your
|
||||
* job was executing the user toggled WiFi. Another example is if you had specified
|
||||
* [android.app.job.JobInfo.Builder.setRequiresDeviceIdle], and the phone left its
|
||||
* idle maintenance window. You are solely responsible for the behavior of your application
|
||||
* upon receipt of this message; your app will likely start to misbehave if you ignore it.
|
||||
*
|
||||
*
|
||||
* Once this method returns, the system releases the wakelock that it is holding on
|
||||
* behalf of the job.
|
||||
*
|
||||
* @param params The parameters identifying this job, as supplied to
|
||||
* the job in the [.onStartJob] callback.
|
||||
* @return `true` to indicate to the JobManager whether you'd like to reschedule
|
||||
* this job based on the retry criteria provided at job creation-time; or `false`
|
||||
* to end the job entirely. Regardless of the value returned, your job must stop executing.
|
||||
*/
|
||||
override fun onStopJob(params: JobParameters?): Boolean {
|
||||
runBlocking { coroutineContext[Job]?.cancelAndJoin() }
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to indicate that the job has begun executing. Override this method with the
|
||||
* logic for your job. Like all other component lifecycle callbacks, this method executes
|
||||
* on your application's main thread.
|
||||
*
|
||||
*
|
||||
* Return `true` from this method if your job needs to continue running. If you
|
||||
* do this, the job remains active until you call
|
||||
* [.jobFinished] to tell the system that it has completed
|
||||
* its work, or until the job's required constraints are no longer satisfied. For
|
||||
* example, if the job was scheduled using
|
||||
* [setRequiresCharging(true)][JobInfo.Builder.setRequiresCharging],
|
||||
* it will be immediately halted by the system if the user unplugs the device from power,
|
||||
* the job's [.onStopJob] callback will be invoked, and the app
|
||||
* will be expected to shut down all ongoing work connected with that job.
|
||||
*
|
||||
*
|
||||
* The system holds a wakelock on behalf of your app as long as your job is executing.
|
||||
* This wakelock is acquired before this method is invoked, and is not released until either
|
||||
* you call [.jobFinished], or after the system invokes
|
||||
* [.onStopJob] to notify your job that it is being shut down
|
||||
* prematurely.
|
||||
*
|
||||
*
|
||||
* Returning `false` from this method means your job is already finished. The
|
||||
* system's wakelock for the job will be released, and [.onStopJob]
|
||||
* will not be invoked.
|
||||
*
|
||||
* @param params Parameters specifying info about this job, including the optional
|
||||
* extras configured with [ This object serves to identify this specific running job instance when calling][JobInfo.Builder.setExtras]
|
||||
*/
|
||||
override fun onStartJob(params: JobParameters): Boolean {
|
||||
launch {
|
||||
startUpdating()
|
||||
logger.d("Update job completed!")
|
||||
jobFinished(params, false)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
suspend fun startUpdating() {
|
||||
logger.d("Update job started!")
|
||||
val startTime = System.currentTimeMillis()
|
||||
|
||||
logger.d("Finding manga with metadata...")
|
||||
val metadataManga = db.getMangaWithMetadata().await()
|
||||
|
||||
logger.d("Filtering manga and raising metadata...")
|
||||
val curTime = System.currentTimeMillis()
|
||||
val allMeta = metadataManga.asFlow().cancellable().mapNotNull { manga ->
|
||||
if (!manga.favorite || (manga.source != EH_SOURCE_ID && manga.source != EXH_SOURCE_ID))
|
||||
return@mapNotNull null
|
||||
|
||||
val meta = db.getFlatMetadataForManga(manga.id!!).asRxSingle().await()
|
||||
?: return@mapNotNull null
|
||||
|
||||
val raisedMeta = meta.raise<EHentaiSearchMetadata>()
|
||||
|
||||
// Don't update galleries too frequently
|
||||
if (raisedMeta.aged || curTime - raisedMeta.lastUpdateCheck < MIN_BACKGROUND_UPDATE_FREQ)
|
||||
return@mapNotNull null
|
||||
|
||||
val chapter = db.getChaptersByMangaId(manga.id!!).asRxSingle().await().minBy {
|
||||
it.date_upload
|
||||
}
|
||||
|
||||
UpdateEntry(manga, raisedMeta, chapter)
|
||||
}.toList()
|
||||
|
||||
logger.d("Found %s manga to update, starting updates!", allMeta.size)
|
||||
val mangaMetaToUpdateThisIter = allMeta.take(UPDATES_PER_ITERATION)
|
||||
|
||||
var failuresThisIteration = 0
|
||||
var updatedThisIteration = 0
|
||||
val modifiedThisIteration = mutableSetOf<Long>()
|
||||
|
||||
try {
|
||||
for ((index, entry) in mangaMetaToUpdateThisIter.withIndex()) {
|
||||
val (manga, meta) = entry
|
||||
if (failuresThisIteration > MAX_UPDATE_FAILURES) {
|
||||
logger.w("Too many update failures, aborting...")
|
||||
break
|
||||
}
|
||||
|
||||
logger.d("Updating gallery (index: %s, manga.id: %s, meta.gId: %s, meta.gToken: %s, failures-so-far: %s, modifiedThisIteration.size: %s)...",
|
||||
index,
|
||||
manga.id,
|
||||
meta.gId,
|
||||
meta.gToken,
|
||||
failuresThisIteration,
|
||||
modifiedThisIteration.size)
|
||||
|
||||
if (manga.id in modifiedThisIteration) {
|
||||
// We already processed this manga!
|
||||
logger.w("Gallery already updated this iteration, skipping...")
|
||||
updatedThisIteration++
|
||||
continue
|
||||
}
|
||||
|
||||
val chapters = try {
|
||||
updateEntryAndGetChapters(manga)
|
||||
} catch (e: GalleryNotUpdatedException) {
|
||||
if (e.network) {
|
||||
failuresThisIteration++
|
||||
|
||||
logger.e("> Network error while updating gallery!", e)
|
||||
logger.e("> (manga.id: %s, meta.gId: %s, meta.gToken: %s, failures-so-far: %s)",
|
||||
manga.id,
|
||||
meta.gId,
|
||||
meta.gToken,
|
||||
failuresThisIteration)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if (chapters.isEmpty()) {
|
||||
logger.e("No chapters found for gallery (manga.id: %s, meta.gId: %s, meta.gToken: %s, failures-so-far: %s)!",
|
||||
manga.id,
|
||||
meta.gId,
|
||||
meta.gToken,
|
||||
failuresThisIteration)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Find accepted root and discard others
|
||||
val (acceptedRoot, discardedRoots) =
|
||||
updateHelper.findAcceptedRootAndDiscardOthers(chapters).await()
|
||||
|
||||
modifiedThisIteration += acceptedRoot.manga.id!!
|
||||
modifiedThisIteration += discardedRoots.map { it.manga.id!! }
|
||||
updatedThisIteration++
|
||||
}
|
||||
} finally {
|
||||
prefs.eh_autoUpdateStats().set(
|
||||
gson.toJson(
|
||||
EHentaiUpdaterStats(
|
||||
startTime,
|
||||
allMeta.size,
|
||||
updatedThisIteration
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun updateEntryAndGetChapters(manga: Manga): List<Chapter> {
|
||||
val source = sourceManager.get(manga.source) as EHentai
|
||||
|
||||
try {
|
||||
val updatedManga = source.fetchMangaDetails(manga).toSingle().await(Schedulers.io())
|
||||
manga.copyFrom(updatedManga)
|
||||
db.insertManga(manga).asRxSingle().await()
|
||||
|
||||
val newChapters = source.fetchChapterList(manga).toSingle().await(Schedulers.io())
|
||||
syncChaptersWithSource(db, newChapters, manga, source) // Not suspending, but does block, maybe fix this?
|
||||
return db.getChapters(manga).await()
|
||||
} catch(t: Throwable) {
|
||||
if(t is EHentai.GalleryNotFoundException) {
|
||||
val meta = db.getFlatMetadataForManga(manga.id!!).await()?.raise<EHentaiSearchMetadata>()
|
||||
if(meta != null) {
|
||||
// Age dead galleries
|
||||
meta.aged = true
|
||||
db.insertFlatMetadata(meta.flatten()).await()
|
||||
}
|
||||
throw GalleryNotUpdatedException(false, t)
|
||||
}
|
||||
throw GalleryNotUpdatedException(true, t)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val UPDATES_PER_ITERATION = 50
|
||||
private const val MAX_UPDATE_FAILURES = 5
|
||||
|
||||
private val MIN_BACKGROUND_UPDATE_FREQ = 1.days.inMilliseconds.longValue
|
||||
|
||||
val GALLERY_AGE_TIME = 365.days.inMilliseconds.longValue
|
||||
|
||||
private const val JOB_ID_UPDATE_BACKGROUND = 0
|
||||
private const val JOB_ID_UPDATE_BACKGROUND_TEST = 1
|
||||
|
||||
private val logger by lazy { XLog.tag("EHUpdaterScheduler") }
|
||||
|
||||
private fun Context.componentName(): ComponentName {
|
||||
return ComponentName(this, EHentaiUpdateWorker::class.java)
|
||||
}
|
||||
|
||||
private fun Context.baseBackgroundJobInfo(isTest: Boolean): JobInfo.Builder {
|
||||
return JobInfo.Builder(
|
||||
if(isTest) JOB_ID_UPDATE_BACKGROUND_TEST
|
||||
else JOB_ID_UPDATE_BACKGROUND, componentName())
|
||||
.apply {
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.P) {
|
||||
setEstimatedNetworkBytes(15000L * UPDATES_PER_ITERATION,
|
||||
1000L * UPDATES_PER_ITERATION)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun Context.periodicBackgroundJobInfo(period: Long,
|
||||
requireCharging: Boolean,
|
||||
requireUnmetered: Boolean): JobInfo {
|
||||
return baseBackgroundJobInfo(false)
|
||||
.setPeriodic(period)
|
||||
.setPersisted(true)
|
||||
.apply {
|
||||
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
|
||||
setRequiresBatteryNotLow(true)
|
||||
}
|
||||
}
|
||||
.setRequiresCharging(requireCharging)
|
||||
.setRequiredNetworkType(
|
||||
if(requireUnmetered) JobInfo.NETWORK_TYPE_UNMETERED
|
||||
else JobInfo.NETWORK_TYPE_ANY)
|
||||
.setRequiresDeviceIdle(true)
|
||||
.build()
|
||||
}
|
||||
|
||||
private fun Context.testBackgroundJobInfo(): JobInfo {
|
||||
return baseBackgroundJobInfo(true)
|
||||
.setOverrideDeadline(1)
|
||||
.build()
|
||||
}
|
||||
|
||||
fun launchBackgroundTest(context: Context) {
|
||||
val jobScheduler = context.jobScheduler
|
||||
if(jobScheduler.schedule(context.testBackgroundJobInfo()) == JobScheduler.RESULT_FAILURE) {
|
||||
logger.e("Failed to schedule background test job!")
|
||||
} else {
|
||||
logger.d("Successfully scheduled background test job!")
|
||||
}
|
||||
}
|
||||
|
||||
fun scheduleBackground(context: Context, prefInterval: Int? = null) {
|
||||
cancelBackground(context)
|
||||
|
||||
val preferences = Injekt.get<PreferencesHelper>()
|
||||
val interval = prefInterval ?: preferences.eh_autoUpdateFrequency().getOrDefault()
|
||||
if (interval > 0) {
|
||||
val restrictions = preferences.eh_autoUpdateRequirements()!!
|
||||
val acRestriction = "ac" in restrictions
|
||||
val wifiRestriction = "wifi" in restrictions
|
||||
|
||||
val jobInfo = context.periodicBackgroundJobInfo(
|
||||
interval.hours.inMilliseconds.longValue,
|
||||
acRestriction,
|
||||
wifiRestriction
|
||||
)
|
||||
|
||||
if(context.jobScheduler.schedule(jobInfo) == JobScheduler.RESULT_FAILURE) {
|
||||
logger.e("Failed to schedule background update job!")
|
||||
} else {
|
||||
logger.d("Successfully scheduled background update job!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun cancelBackground(context: Context) {
|
||||
context.jobScheduler.cancel(JOB_ID_UPDATE_BACKGROUND)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class UpdateEntry(val manga: Manga, val meta: EHentaiSearchMetadata, val rootChapter: Chapter?)
|
7
app/src/main/java/exh/eh/EHentaiUpdaterStats.kt
Normal file
7
app/src/main/java/exh/eh/EHentaiUpdaterStats.kt
Normal file
@ -0,0 +1,7 @@
|
||||
package exh.eh
|
||||
|
||||
data class EHentaiUpdaterStats(
|
||||
val startTime: Long,
|
||||
val possibleUpdates: Int,
|
||||
val updateCount: Int
|
||||
)
|
3
app/src/main/java/exh/eh/GalleryNotUpdatedException.kt
Normal file
3
app/src/main/java/exh/eh/GalleryNotUpdatedException.kt
Normal file
@ -0,0 +1,3 @@
|
||||
package exh.eh
|
||||
|
||||
class GalleryNotUpdatedException(val network: Boolean, cause: Throwable): RuntimeException(cause)
|
214
app/src/main/java/exh/eh/MemAutoFlushingLookupTable.kt
Normal file
214
app/src/main/java/exh/eh/MemAutoFlushingLookupTable.kt
Normal file
@ -0,0 +1,214 @@
|
||||
package exh.eh
|
||||
|
||||
import android.support.v4.util.AtomicFile
|
||||
import android.util.SparseArray
|
||||
import android.util.SparseIntArray
|
||||
import com.elvishew.xlog.XLog
|
||||
import exh.ui.captcha.SolveCaptchaActivity.Companion.launch
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import java.io.*
|
||||
import java.nio.ByteBuffer
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
/**
|
||||
* In memory Int -> Obj lookup table implementation that
|
||||
* automatically persists itself to disk atomically and asynchronously.
|
||||
*
|
||||
* Thread safe
|
||||
*
|
||||
* @author nulldev
|
||||
*/
|
||||
class MemAutoFlushingLookupTable<T>(
|
||||
file: File,
|
||||
private val serializer: EntrySerializer<T>,
|
||||
private val debounceTimeMs: Long = 3000
|
||||
) : CoroutineScope, Closeable {
|
||||
/**
|
||||
* The context of this scope.
|
||||
* Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
|
||||
* Accessing this property in general code is not recommended for any purposes except accessing [Job] instance for advanced usages.
|
||||
*
|
||||
* By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
|
||||
*/
|
||||
override val coroutineContext: CoroutineContext
|
||||
get() = Dispatchers.IO + SupervisorJob()
|
||||
|
||||
private val table = SparseArray<T>(INITIAL_SIZE)
|
||||
private val mutex = Mutex(true)
|
||||
|
||||
// Used to debounce
|
||||
@Volatile
|
||||
private var writeCounter = Long.MIN_VALUE
|
||||
@Volatile
|
||||
private var flushed = true
|
||||
|
||||
private val atomicFile = AtomicFile(file)
|
||||
|
||||
private val shutdownHook = thread(start = false) {
|
||||
if(!flushed) writeSynchronously()
|
||||
}
|
||||
|
||||
init {
|
||||
initialLoad()
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(shutdownHook)
|
||||
}
|
||||
|
||||
private fun InputStream.requireBytes(targetArray: ByteArray, byteCount: Int): Boolean {
|
||||
var readIter = 0
|
||||
while (true) {
|
||||
val readThisIter = read(targetArray, readIter, byteCount - readIter)
|
||||
if(readThisIter <= 0) return false // No more data to read
|
||||
readIter += readThisIter
|
||||
if(readIter == byteCount) return true
|
||||
}
|
||||
}
|
||||
|
||||
private fun initialLoad() {
|
||||
launch {
|
||||
try {
|
||||
atomicFile.openRead().buffered().use { input ->
|
||||
val bb = ByteBuffer.allocate(8)
|
||||
|
||||
while(true) {
|
||||
if(!input.requireBytes(bb.array(), 8)) break
|
||||
val k = bb.getInt(0)
|
||||
val size = bb.getInt(4)
|
||||
val strBArr = ByteArray(size)
|
||||
if(!input.requireBytes(strBArr, size)) break
|
||||
table.put(k, serializer.read(strBArr.toString(Charsets.UTF_8)))
|
||||
}
|
||||
}
|
||||
} catch(e: FileNotFoundException) {
|
||||
XLog.d("Lookup table not found!", e)
|
||||
// Ignored
|
||||
}
|
||||
|
||||
mutex.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
private fun tryWrite() {
|
||||
val id = ++writeCounter
|
||||
flushed = false
|
||||
launch {
|
||||
delay(debounceTimeMs)
|
||||
if(id != writeCounter) return@launch
|
||||
|
||||
mutex.withLock {
|
||||
// Second check inside of mutex to prevent dupe writes
|
||||
if(id != writeCounter) return@launch
|
||||
withContext(NonCancellable) {
|
||||
writeSynchronously()
|
||||
|
||||
// Yes there is a race here, no it's isn't critical
|
||||
if (id == writeCounter) flushed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun writeSynchronously() {
|
||||
val bb = ByteBuffer.allocate(ENTRY_SIZE_BYTES)
|
||||
|
||||
val fos = atomicFile.startWrite()
|
||||
try {
|
||||
val out = fos.buffered()
|
||||
for(i in 0 until table.size()) {
|
||||
val k = table.keyAt(i)
|
||||
val v = serializer.write(table.valueAt(i)).toByteArray(Charsets.UTF_8)
|
||||
bb.putInt(0, k)
|
||||
bb.putInt(4, v.size)
|
||||
out.write(bb.array())
|
||||
out.write(v)
|
||||
}
|
||||
out.flush()
|
||||
atomicFile.finishWrite(fos)
|
||||
} catch(t: Throwable) {
|
||||
atomicFile.failWrite(fos)
|
||||
throw t
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun put(key: Int, value: T) {
|
||||
mutex.withLock { table.put(key, value) }
|
||||
tryWrite()
|
||||
}
|
||||
|
||||
suspend fun get(key: Int): T? {
|
||||
return mutex.withLock { table.get(key) }
|
||||
}
|
||||
|
||||
suspend fun size(): Int {
|
||||
return mutex.withLock { table.size() }
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes this resource, relinquishing any underlying resources.
|
||||
* This method is invoked automatically on objects managed by the
|
||||
* `try`-with-resources statement.
|
||||
*
|
||||
*
|
||||
* While this interface method is declared to throw `Exception`, implementers are *strongly* encouraged to
|
||||
* declare concrete implementations of the `close` method to
|
||||
* throw more specific exceptions, or to throw no exception at all
|
||||
* if the close operation cannot fail.
|
||||
*
|
||||
*
|
||||
* Cases where the close operation may fail require careful
|
||||
* attention by implementers. It is strongly advised to relinquish
|
||||
* the underlying resources and to internally *mark* the
|
||||
* resource as closed, prior to throwing the exception. The `close` method is unlikely to be invoked more than once and so
|
||||
* this ensures that the resources are released in a timely manner.
|
||||
* Furthermore it reduces problems that could arise when the resource
|
||||
* wraps, or is wrapped, by another resource.
|
||||
*
|
||||
*
|
||||
* *Implementers of this interface are also strongly advised
|
||||
* to not have the `close` method throw [ ].*
|
||||
*
|
||||
* This exception interacts with a thread's interrupted status,
|
||||
* and runtime misbehavior is likely to occur if an `InterruptedException` is [ suppressed][Throwable.addSuppressed].
|
||||
*
|
||||
* More generally, if it would cause problems for an
|
||||
* exception to be suppressed, the `AutoCloseable.close`
|
||||
* method should not throw it.
|
||||
*
|
||||
*
|
||||
* Note that unlike the [close][java.io.Closeable.close]
|
||||
* method of [java.io.Closeable], this `close` method
|
||||
* is *not* required to be idempotent. In other words,
|
||||
* calling this `close` method more than once may have some
|
||||
* visible side effect, unlike `Closeable.close` which is
|
||||
* required to have no effect if called more than once.
|
||||
*
|
||||
* However, implementers of this interface are strongly encouraged
|
||||
* to make their `close` methods idempotent.
|
||||
*
|
||||
* @throws Exception if this resource cannot be closed
|
||||
*/
|
||||
override fun close() {
|
||||
runBlocking { coroutineContext[Job]?.cancelAndJoin() }
|
||||
Runtime.getRuntime().removeShutdownHook(shutdownHook)
|
||||
}
|
||||
|
||||
interface EntrySerializer<T> {
|
||||
/**
|
||||
* Serialize an entry as a String.
|
||||
*/
|
||||
fun write(entry: T): String
|
||||
|
||||
/**
|
||||
* Read an entry from a String.
|
||||
*/
|
||||
fun read(string: String): T
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val INITIAL_SIZE = 1000
|
||||
private const val ENTRY_SIZE_BYTES = 8
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@ import eu.kanade.tachiyomi.util.powerManager
|
||||
import eu.kanade.tachiyomi.util.toast
|
||||
import eu.kanade.tachiyomi.util.wifiManager
|
||||
import exh.*
|
||||
import exh.eh.EHentaiUpdateWorker
|
||||
import exh.util.ignore
|
||||
import exh.util.trans
|
||||
import okhttp3.FormBody
|
||||
@ -112,6 +113,9 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
"teh:ExhFavoritesSyncWifi")
|
||||
}
|
||||
|
||||
// Do not update galleries while syncing favorites
|
||||
EHentaiUpdateWorker.cancelBackground(context)
|
||||
|
||||
storage.getRealm().use { realm ->
|
||||
realm.trans {
|
||||
db.inTransaction {
|
||||
@ -161,6 +165,9 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
wifiLock?.release()
|
||||
wifiLock = null
|
||||
}
|
||||
|
||||
// Update galleries again!
|
||||
EHentaiUpdateWorker.scheduleBackground(context)
|
||||
}
|
||||
|
||||
if(errorList.isEmpty())
|
||||
@ -338,7 +345,8 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
//Import using gallery adder
|
||||
val result = galleryAdder.addGallery("${exh.baseUrl}${it.getUrl()}",
|
||||
true,
|
||||
EXH_SOURCE_ID)
|
||||
EXH_SOURCE_ID,
|
||||
::throttle)
|
||||
|
||||
if(result is GalleryAddEvent.Fail) {
|
||||
if(result is GalleryAddEvent.Fail.NotFound) {
|
||||
@ -396,7 +404,7 @@ class FavoritesSyncHelper(val context: Context) {
|
||||
class IgnoredException : RuntimeException()
|
||||
|
||||
companion object {
|
||||
private const val THROTTLE_MAX = 4500
|
||||
private const val THROTTLE_MAX = 5500
|
||||
private const val THROTTLE_INC = 10
|
||||
private const val THROTTLE_WARN = 1000
|
||||
}
|
||||
|
@ -36,6 +36,9 @@ class EHentaiSearchMetadata : RaisedSearchMetadata() {
|
||||
var ratingCount: Int? = null
|
||||
var averageRating: Double? = null
|
||||
|
||||
var aged: Boolean = false
|
||||
var lastUpdateCheck: Long = 0
|
||||
|
||||
override fun copyTo(manga: SManga) {
|
||||
gId?.let { gId ->
|
||||
gToken?.let { gToken ->
|
||||
|
@ -25,7 +25,7 @@ data class FlatMetadata(
|
||||
|
||||
fun DatabaseHelper.getFlatMetadataForManga(mangaId: Long): PreparedOperation<FlatMetadata?> {
|
||||
// We have to use fromCallable because StorIO messes up the thread scheduling if we use their rx functions
|
||||
fun getSingle() = Single.fromCallable {
|
||||
val single = Single.fromCallable {
|
||||
val meta = getSearchMetadataForManga(mangaId).executeAsBlocking()
|
||||
if(meta != null) {
|
||||
val tags = getSearchTagsForManga(mangaId).executeAsBlocking()
|
||||
@ -35,7 +35,11 @@ fun DatabaseHelper.getFlatMetadataForManga(mangaId: Long): PreparedOperation<Fla
|
||||
} else null
|
||||
}
|
||||
|
||||
return object : PreparedOperation<FlatMetadata?> {
|
||||
return preparedOperationFromSingle(single)
|
||||
}
|
||||
|
||||
private fun <T> preparedOperationFromSingle(single: Single<T>): PreparedOperation<T> {
|
||||
return object : PreparedOperation<T> {
|
||||
/**
|
||||
* Creates [rx.Observable] that emits result of Operation.
|
||||
*
|
||||
@ -44,7 +48,7 @@ fun DatabaseHelper.getFlatMetadataForManga(mangaId: Long): PreparedOperation<Fla
|
||||
*
|
||||
* @return observable result of operation with only one [rx.Observer.onNext] call.
|
||||
*/
|
||||
override fun createObservable() = getSingle().toObservable()
|
||||
override fun createObservable() = single.toObservable()
|
||||
|
||||
/**
|
||||
* Executes operation synchronously in current thread.
|
||||
@ -57,7 +61,7 @@ fun DatabaseHelper.getFlatMetadataForManga(mangaId: Long): PreparedOperation<Fla
|
||||
*
|
||||
* @return nullable result of operation.
|
||||
*/
|
||||
override fun executeAsBlocking() = getSingle().toBlocking().value()
|
||||
override fun executeAsBlocking() = single.toBlocking().value()
|
||||
|
||||
/**
|
||||
* Creates [rx.Observable] that emits result of Operation.
|
||||
@ -67,7 +71,7 @@ fun DatabaseHelper.getFlatMetadataForManga(mangaId: Long): PreparedOperation<Fla
|
||||
*
|
||||
* @return observable result of operation with only one [rx.Observer.onNext] call.
|
||||
*/
|
||||
override fun asRxObservable() = getSingle().toObservable()
|
||||
override fun asRxObservable() = single.toObservable()
|
||||
|
||||
/**
|
||||
* Creates [rx.Single] that emits result of Operation lazily when somebody subscribes to it.
|
||||
@ -76,8 +80,7 @@ fun DatabaseHelper.getFlatMetadataForManga(mangaId: Long): PreparedOperation<Fla
|
||||
*
|
||||
* @return single result of operation.
|
||||
*/
|
||||
override fun asRxSingle() = getSingle()
|
||||
|
||||
override fun asRxSingle() = single
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,15 @@ interface SearchMetadataQueries : DbProvider {
|
||||
.build())
|
||||
.prepare()
|
||||
|
||||
fun getSearchMetadataByIndexedExtra(extra: String) = db.get()
|
||||
.listOfObjects(SearchMetadata::class.java)
|
||||
.withQuery(Query.builder()
|
||||
.table(SearchMetadataTable.TABLE)
|
||||
.where("${SearchMetadataTable.COL_INDEXED_EXTRA} = ?")
|
||||
.whereArgs(extra)
|
||||
.build())
|
||||
.prepare()
|
||||
|
||||
fun insertSearchMetadata(metadata: SearchMetadata) = db.put().`object`(metadata).prepare()
|
||||
|
||||
fun deleteSearchMetadata(metadata: SearchMetadata) = db.delete().`object`(metadata).prepare()
|
||||
|
@ -1,8 +1,11 @@
|
||||
package exh.util
|
||||
|
||||
import rx.Observable
|
||||
import rx.Single
|
||||
import com.pushtorefresh.storio.operations.PreparedOperation
|
||||
import com.pushtorefresh.storio.sqlite.operations.get.PreparedGetObject
|
||||
import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import rx.*
|
||||
import rx.subjects.ReplaySubject
|
||||
import kotlin.coroutines.resumeWithException
|
||||
|
||||
/**
|
||||
* Transform a cold single to a hot single
|
||||
@ -24,4 +27,45 @@ fun <T> Observable<T>.melt(): Observable<T> {
|
||||
val rs = ReplaySubject.create<T>()
|
||||
subscribe(rs)
|
||||
return rs
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
|
||||
return suspendCancellableCoroutine { continuation ->
|
||||
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
||||
lateinit var sub: Subscription
|
||||
sub = self.subscribe({
|
||||
continuation.resume(it) {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
}, {
|
||||
if (!continuation.isCancelled)
|
||||
continuation.resumeWithException(it)
|
||||
})
|
||||
|
||||
continuation.invokeOnCancellation {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun <T> PreparedOperation<T>.await(): T = asRxSingle().await()
|
||||
suspend fun <T> PreparedGetObject<T>.await(): T? = asRxSingle().await()
|
||||
|
||||
suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
||||
return suspendCancellableCoroutine { continuation ->
|
||||
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
||||
lateinit var sub: Subscription
|
||||
sub = self.subscribe({
|
||||
continuation.resume(Unit) {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
}, {
|
||||
if (!continuation.isCancelled)
|
||||
continuation.resumeWithException(it)
|
||||
})
|
||||
|
||||
continuation.invokeOnCancellation {
|
||||
sub.unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user