Fixed formatting of downloader

This commit is contained in:
arkon 2020-05-08 18:16:27 -04:00 committed by Jay
parent 6e8c285646
commit f96f25b1f6

View File

@ -85,7 +85,8 @@ class Downloader(
/** /**
* Whether the downloader is running. * Whether the downloader is running.
*/ */
@Volatile private var isRunning: Boolean = false @Volatile
private var isRunning: Boolean = false
init { init {
launchNow { launchNow {
@ -102,11 +103,9 @@ class Downloader(
* @return true if the downloader is started, false otherwise. * @return true if the downloader is started, false otherwise.
*/ */
fun start(): Boolean { fun start(): Boolean {
if (isRunning || queue.isEmpty()) if (isRunning || queue.isEmpty()) return false
return false
notifier.paused = false notifier.paused = false
if (!subscriptions.hasSubscriptions()) if (!subscriptions.hasSubscriptions()) initializeSubscriptions()
initializeSubscriptions()
val pending = queue.filter { it.status != Download.DOWNLOADED } val pending = queue.filter { it.status != Download.DOWNLOADED }
pending.forEach { if (it.status != Download.QUEUE) it.status = Download.QUEUE } pending.forEach { if (it.status != Download.QUEUE) it.status = Download.QUEUE }
@ -120,9 +119,7 @@ class Downloader(
*/ */
fun stop(reason: String? = null) { fun stop(reason: String? = null) {
destroySubscriptions() destroySubscriptions()
queue queue.filter { it.status == Download.DOWNLOADING }.forEach { it.status = Download.ERROR }
.filter { it.status == Download.DOWNLOADING }
.forEach { it.status = Download.ERROR }
if (reason != null) { if (reason != null) {
notifier.onWarning(reason) notifier.onWarning(reason)
@ -145,9 +142,7 @@ class Downloader(
*/ */
fun pause() { fun pause() {
destroySubscriptions() destroySubscriptions()
queue queue.filter { it.status == Download.DOWNLOADING }.forEach { it.status = Download.QUEUE }
.filter { it.status == Download.DOWNLOADING }
.forEach { it.status = Download.QUEUE }
notifier.paused = true notifier.paused = true
} }
@ -166,9 +161,8 @@ class Downloader(
// Needed to update the chapter view // Needed to update the chapter view
if (isNotification) { if (isNotification) {
queue queue.filter { it.status == Download.QUEUE }
.filter { it.status == Download.QUEUE } .forEach { it.status = Download.NOT_DOWNLOADED }
.forEach { it.status = Download.NOT_DOWNLOADED }
} }
queue.clear() queue.clear()
notifier.dismiss() notifier.dismiss()
@ -182,8 +176,7 @@ class Downloader(
fun clearQueue(manga: Manga, isNotification: Boolean = false) { fun clearQueue(manga: Manga, isNotification: Boolean = false) {
// Needed to update the chapter view // Needed to update the chapter view
if (isNotification) { if (isNotification) {
queue queue.filter { it.status == Download.QUEUE && it.manga.id == manga.id }
.filter { it.status == Download.QUEUE && it.manga.id == manga.id }
.forEach { it.status = Download.NOT_DOWNLOADED } .forEach { it.status = Download.NOT_DOWNLOADED }
} }
queue.remove(manga) queue.remove(manga)
@ -204,15 +197,14 @@ class Downloader(
subscriptions.clear() subscriptions.clear()
subscriptions += downloadsRelay.concatMapIterable { it } subscriptions += downloadsRelay.concatMapIterable { it }
.concatMap { downloadChapter(it).subscribeOn(Schedulers.io()) } .concatMap { downloadChapter(it).subscribeOn(Schedulers.io()) }.onBackpressureBuffer()
.onBackpressureBuffer() .observeOn(AndroidSchedulers.mainThread()).subscribe({
.observeOn(AndroidSchedulers.mainThread()) completeDownload(it)
.subscribe({ completeDownload(it) }, { error ->
}, { error -> DownloadService.stop(context)
DownloadService.stop(context) Timber.e(error)
Timber.e(error) notifier.onError(error.message)
notifier.onError(error.message) })
})
} }
/** /**
@ -233,42 +225,41 @@ class Downloader(
* @param chapters the list of chapters to download. * @param chapters the list of chapters to download.
* @param autoStart whether to start the downloader after enqueing the chapters. * @param autoStart whether to start the downloader after enqueing the chapters.
*/ */
fun queueChapters(manga: Manga, chapters: List<Chapter>, autoStart: Boolean) = fun queueChapters(manga: Manga, chapters: List<Chapter>, autoStart: Boolean) = launchUI {
launchUI { val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchUI
val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchUI val wasEmpty = queue.isEmpty()
val wasEmpty = queue.isEmpty() // Called in background thread, the operation can be slow with SAF.
// Called in background thread, the operation can be slow with SAF. val chaptersWithoutDir = async {
val chaptersWithoutDir = async { chapters
chapters // Filter out those already downloaded.
// Filter out those already downloaded. .filter { provider.findChapterDir(it, manga, source) == null }
.filter { provider.findChapterDir(it, manga, source) == null } // Add chapters to queue from the start.
// Add chapters to queue from the start. .sortedByDescending { it.source_order }
.sortedByDescending { it.source_order } }
// Runs in main thread (synchronization needed).
val chaptersToQueue = chaptersWithoutDir.await()
// Filter out those already enqueued.
.filter { chapter -> queue.none { it.chapter.id == chapter.id } }
// Create a download for each one.
.map { Download(source, manga, it) }
if (chaptersToQueue.isNotEmpty()) {
queue.addAll(chaptersToQueue)
if (isRunning) {
// Send the list of downloads to the downloader.
downloadsRelay.call(chaptersToQueue)
} }
// Runs in main thread (synchronization needed). // Start downloader if needed
val chaptersToQueue = chaptersWithoutDir.await() if (autoStart && wasEmpty) {
// Filter out those already enqueued. DownloadService.start(this@Downloader.context)
.filter { chapter -> queue.none { it.chapter.id == chapter.id } } } else if (!isRunning && !LibraryUpdateService.isRunning()) {
// Create a download for each one. notifier.onDownloadPaused()
.map { Download(source, manga, it) }
if (chaptersToQueue.isNotEmpty()) {
queue.addAll(chaptersToQueue)
if (isRunning) {
// Send the list of downloads to the downloader.
downloadsRelay.call(chaptersToQueue)
}
// Start downloader if needed
if (autoStart && wasEmpty) {
DownloadService.start(this@Downloader.context)
} else if (!isRunning && !LibraryUpdateService.isRunning()) {
notifier.onDownloadPaused()
}
} }
} }
}
/** /**
* Returns the observable which downloads a chapter. * Returns the observable which downloads a chapter.
@ -282,44 +273,38 @@ class Downloader(
val pageListObservable = if (download.pages == null) { val pageListObservable = if (download.pages == null) {
// Pull page list from network and add them to download object // Pull page list from network and add them to download object
download.source.fetchPageList(download.chapter) download.source.fetchPageList(download.chapter).doOnNext { pages ->
.doOnNext { pages -> if (pages.isEmpty()) {
if (pages.isEmpty()) { throw Exception("Page list is empty")
throw Exception("Page list is empty")
}
download.pages = pages
} }
download.pages = pages
}
} else { } else {
// Or if the page list already exists, start from the file // Or if the page list already exists, start from the file
Observable.just(download.pages!!) Observable.just(download.pages!!)
} }
pageListObservable pageListObservable.doOnNext { _ ->
.doOnNext { _ -> // Delete all temporary (unfinished) files
// Delete all temporary (unfinished) files tmpDir.listFiles()?.filter { it.name!!.endsWith(".tmp") }?.forEach { it.delete() }
tmpDir.listFiles()
?.filter { it.name!!.endsWith(".tmp") }
?.forEach { it.delete() }
download.downloadedImages = 0 download.downloadedImages = 0
download.status = Download.DOWNLOADING download.status = Download.DOWNLOADING
} }
// Get all the URLs to the source images, fetch pages if necessary // Get all the URLs to the source images, fetch pages if necessary
.flatMap { download.source.fetchAllImageUrlsFromPageList(it) } .flatMap { download.source.fetchAllImageUrlsFromPageList(it) }
// Start downloading images, consider we can have downloaded images already // Start downloading images, consider we can have downloaded images already
.concatMap { page -> getOrDownloadImage(page, download, tmpDir) } .concatMap { page -> getOrDownloadImage(page, download, tmpDir) }
// Do when page is downloaded. // Do when page is downloaded.
.doOnNext { notifier.onProgressChange(download) } .doOnNext { notifier.onProgressChange(download) }.toList().map { _ -> download }
.toList() // Do after download completes
.map { _ -> download } .doOnNext { ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname) }
// Do after download completes // If the page list threw, it will resume here
.doOnNext { ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname) } .onErrorReturn { error ->
// If the page list threw, it will resume here download.status = Download.ERROR
.onErrorReturn { error -> notifier.onError(error.message, download.chapter.name)
download.status = Download.ERROR download
notifier.onError(error.message, download.chapter.name) }
download
}
} }
/** /**
@ -330,10 +315,13 @@ class Downloader(
* @param download the download of the page. * @param download the download of the page.
* @param tmpDir the temporary directory of the download. * @param tmpDir the temporary directory of the download.
*/ */
private fun getOrDownloadImage(page: Page, download: Download, tmpDir: UniFile): Observable<Page> { private fun getOrDownloadImage(
page: Page,
download: Download,
tmpDir: UniFile
): Observable<Page> {
// If the image URL is empty, do nothing // If the image URL is empty, do nothing
if (page.imageUrl == null) if (page.imageUrl == null) return Observable.just(page)
return Observable.just(page)
val filename = String.format("%03d", page.number) val filename = String.format("%03d", page.number)
val tmpFile = tmpDir.findFile("$filename.tmp") val tmpFile = tmpDir.findFile("$filename.tmp")
@ -347,26 +335,29 @@ class Downloader(
// If the image is already downloaded, do nothing. Otherwise download from network // If the image is already downloaded, do nothing. Otherwise download from network
val pageObservable = when { val pageObservable = when {
imageFile != null -> Observable.just(imageFile) imageFile != null -> Observable.just(imageFile)
cache.isImageInCache(page.imageUrl!!) -> cache.isImageInCache(page.imageUrl!!) -> moveFromCache(
moveFromCache(page, cache.getImageFile(page.imageUrl!!), tmpDir, filename) page,
cache.getImageFile(page.imageUrl!!),
tmpDir,
filename
)
else -> downloadImage(page, download.source, tmpDir, filename) else -> downloadImage(page, download.source, tmpDir, filename)
} }
return pageObservable return pageObservable
// When the image is ready, set image path, progress (just in case) and status // When the image is ready, set image path, progress (just in case) and status
.doOnNext { file -> .doOnNext { file ->
page.uri = file.uri page.uri = file.uri
page.progress = 100 page.progress = 100
download.downloadedImages++ download.downloadedImages++
page.status = Page.READY page.status = Page.READY
} }.map { page }
.map { page } // Mark this page as error and allow to download the remaining
// Mark this page as error and allow to download the remaining .onErrorReturn {
.onErrorReturn { page.progress = 0
page.progress = 0 page.status = Page.ERROR
page.status = Page.ERROR page
page }
}
} }
/** /**
@ -377,8 +368,12 @@ class Downloader(
* @param tmpDir the temporary directory of the download. * @param tmpDir the temporary directory of the download.
* @param filename the filename of the image. * @param filename the filename of the image.
*/ */
private fun moveFromCache(page: Page, file: File, tmpDir: UniFile, filename: String): private fun moveFromCache(
Observable<UniFile> { page: Page,
file: File,
tmpDir: UniFile,
filename: String
): Observable<UniFile> {
return Observable.just(file).map { return Observable.just(file).map {
val tmpFile = tmpDir.createFile("$filename.tmp") val tmpFile = tmpDir.createFile("$filename.tmp")
val inputStream = file.inputStream() val inputStream = file.inputStream()
@ -402,25 +397,29 @@ class Downloader(
* @param tmpDir the temporary directory of the download. * @param tmpDir the temporary directory of the download.
* @param filename the filename of the image. * @param filename the filename of the image.
*/ */
private fun downloadImage(page: Page, source: HttpSource, tmpDir: UniFile, filename: String): Observable<UniFile> { private fun downloadImage(
page: Page,
source: HttpSource,
tmpDir: UniFile,
filename: String
): Observable<UniFile> {
page.status = Page.DOWNLOAD_IMAGE page.status = Page.DOWNLOAD_IMAGE
page.progress = 0 page.progress = 0
return source.fetchImage(page) return source.fetchImage(page).map { response ->
.map { response -> val file = tmpDir.createFile("$filename.tmp")
val file = tmpDir.createFile("$filename.tmp") try {
try { response.body!!.source().saveTo(file.openOutputStream())
response.body!!.source().saveTo(file.openOutputStream()) val extension = getImageExtension(response, file)
val extension = getImageExtension(response, file) file.renameTo("$filename.$extension")
file.renameTo("$filename.$extension") } catch (e: Exception) {
} catch (e: Exception) { response.close()
response.close() file.delete()
file.delete() throw e
throw e }
} file
file }
} // Retry 3 times, waiting 2, 4 and 8 seconds between attempts.
// Retry 3 times, waiting 2, 4 and 8 seconds between attempts. .retryWhen(RetryWithDelay(3, { (2 shl it - 1) * 1000 }, Schedulers.trampoline()))
.retryWhen(RetryWithDelay(3, { (2 shl it - 1) * 1000 }, Schedulers.trampoline()))
} }
/** /**
@ -433,7 +432,7 @@ class Downloader(
private fun getImageExtension(response: Response, file: UniFile): String { private fun getImageExtension(response: Response, file: UniFile): String {
// Read content type if available. // Read content type if available.
val mime = response.body?.contentType()?.let { ct -> "${ct.type}/${ct.subtype}" } val mime = response.body?.contentType()?.let { ct -> "${ct.type}/${ct.subtype}" }
// Else guess from the uri. // Else guess from the uri.
?: context.contentResolver.getType(file.uri) ?: context.contentResolver.getType(file.uri)
// Else read magic numbers. // Else read magic numbers.
?: ImageUtil.findImageType { file.openInputStream() }?.mime ?: ImageUtil.findImageType { file.openInputStream() }?.mime
@ -450,10 +449,7 @@ class Downloader(
* @param dirname the real (non temporary) directory name of the download. * @param dirname the real (non temporary) directory name of the download.
*/ */
private fun ensureSuccessfulDownload( private fun ensureSuccessfulDownload(
download: Download, download: Download, mangaDir: UniFile, tmpDir: UniFile, dirname: String
mangaDir: UniFile,
tmpDir: UniFile,
dirname: String
) { ) {
// Ensure that the chapter folder has all the images. // Ensure that the chapter folder has all the images.