Download manager in Kotlin and fix another crash in reader
This commit is contained in:
parent
35748fc1f3
commit
aaef738dda
@ -1,450 +0,0 @@
|
||||
package eu.kanade.tachiyomi.data.download;
|
||||
|
||||
import android.content.Context;
|
||||
import android.net.Uri;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.google.gson.stream.JsonReader;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter;
|
||||
import eu.kanade.tachiyomi.data.database.models.Manga;
|
||||
import eu.kanade.tachiyomi.data.download.model.Download;
|
||||
import eu.kanade.tachiyomi.data.download.model.DownloadQueue;
|
||||
import eu.kanade.tachiyomi.data.preference.PreferencesHelper;
|
||||
import eu.kanade.tachiyomi.data.source.SourceManager;
|
||||
import eu.kanade.tachiyomi.data.source.base.Source;
|
||||
import eu.kanade.tachiyomi.data.source.model.Page;
|
||||
import eu.kanade.tachiyomi.event.DownloadChaptersEvent;
|
||||
import eu.kanade.tachiyomi.util.DiskUtils;
|
||||
import eu.kanade.tachiyomi.util.DynamicConcurrentMergeOperator;
|
||||
import eu.kanade.tachiyomi.util.ToastUtil;
|
||||
import eu.kanade.tachiyomi.util.UrlUtil;
|
||||
import rx.Observable;
|
||||
import rx.Subscription;
|
||||
import rx.android.schedulers.AndroidSchedulers;
|
||||
import rx.schedulers.Schedulers;
|
||||
import rx.subjects.BehaviorSubject;
|
||||
import rx.subjects.PublishSubject;
|
||||
import timber.log.Timber;
|
||||
|
||||
public class DownloadManager {
|
||||
|
||||
private Context context;
|
||||
private SourceManager sourceManager;
|
||||
private PreferencesHelper preferences;
|
||||
private Gson gson;
|
||||
|
||||
private PublishSubject<List<Download>> downloadsQueueSubject;
|
||||
private BehaviorSubject<Boolean> runningSubject;
|
||||
private Subscription downloadsSubscription;
|
||||
|
||||
private BehaviorSubject<Integer> threadsSubject;
|
||||
private Subscription threadsSubscription;
|
||||
|
||||
private DownloadQueue queue;
|
||||
private volatile boolean isRunning;
|
||||
|
||||
public static final String PAGE_LIST_FILE = "index.json";
|
||||
|
||||
public DownloadManager(Context context, SourceManager sourceManager, PreferencesHelper preferences) {
|
||||
this.context = context;
|
||||
this.sourceManager = sourceManager;
|
||||
this.preferences = preferences;
|
||||
|
||||
gson = new Gson();
|
||||
queue = new DownloadQueue();
|
||||
|
||||
downloadsQueueSubject = PublishSubject.create();
|
||||
runningSubject = BehaviorSubject.create();
|
||||
threadsSubject = BehaviorSubject.create();
|
||||
}
|
||||
|
||||
private void initializeSubscriptions() {
|
||||
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed())
|
||||
downloadsSubscription.unsubscribe();
|
||||
|
||||
threadsSubscription = preferences.downloadThreads().asObservable()
|
||||
.subscribe(threadsSubject::onNext);
|
||||
|
||||
downloadsSubscription = downloadsQueueSubject
|
||||
.flatMap(Observable::from)
|
||||
.lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threadsSubject))
|
||||
.onBackpressureBuffer()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.map(download -> areAllDownloadsFinished())
|
||||
.subscribe(finished -> {
|
||||
if (finished) {
|
||||
DownloadService.stop(context);
|
||||
}
|
||||
}, e -> {
|
||||
DownloadService.stop(context);
|
||||
Timber.e(e, e.getMessage());
|
||||
ToastUtil.showShort(context, e.getMessage());
|
||||
});
|
||||
|
||||
if (!isRunning) {
|
||||
isRunning = true;
|
||||
runningSubject.onNext(true);
|
||||
}
|
||||
}
|
||||
|
||||
public void destroySubscriptions() {
|
||||
if (isRunning) {
|
||||
isRunning = false;
|
||||
runningSubject.onNext(false);
|
||||
}
|
||||
|
||||
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) {
|
||||
downloadsSubscription.unsubscribe();
|
||||
downloadsSubscription = null;
|
||||
}
|
||||
|
||||
if (threadsSubscription != null && !threadsSubscription.isUnsubscribed()) {
|
||||
threadsSubscription.unsubscribe();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Create a download object for every chapter in the event and add them to the downloads queue
|
||||
public void onDownloadChaptersEvent(DownloadChaptersEvent event) {
|
||||
final Manga manga = event.getManga();
|
||||
final Source source = sourceManager.get(manga.source);
|
||||
|
||||
// Used to avoid downloading chapters with the same name
|
||||
final List<String> addedChapters = new ArrayList<>();
|
||||
final List<Download> pending = new ArrayList<>();
|
||||
|
||||
for (Chapter chapter : event.getChapters()) {
|
||||
if (addedChapters.contains(chapter.name))
|
||||
continue;
|
||||
|
||||
addedChapters.add(chapter.name);
|
||||
Download download = new Download(source, manga, chapter);
|
||||
|
||||
if (!prepareDownload(download)) {
|
||||
queue.add(download);
|
||||
pending.add(download);
|
||||
}
|
||||
}
|
||||
if (isRunning) downloadsQueueSubject.onNext(pending);
|
||||
}
|
||||
|
||||
// Public method to check if a chapter is downloaded
|
||||
public boolean isChapterDownloaded(Source source, Manga manga, Chapter chapter) {
|
||||
File directory = getAbsoluteChapterDirectory(source, manga, chapter);
|
||||
if (!directory.exists())
|
||||
return false;
|
||||
|
||||
List<Page> pages = getSavedPageList(source, manga, chapter);
|
||||
return isChapterDownloaded(directory, pages);
|
||||
}
|
||||
|
||||
// Prepare the download. Returns true if the chapter is already downloaded
|
||||
private boolean prepareDownload(Download download) {
|
||||
// If the chapter is already queued, don't add it again
|
||||
for (Download queuedDownload : queue) {
|
||||
if (download.chapter.id.equals(queuedDownload.chapter.id))
|
||||
return true;
|
||||
}
|
||||
|
||||
// Add the directory to the download object for future access
|
||||
download.directory = getAbsoluteChapterDirectory(download);
|
||||
|
||||
// If the directory doesn't exist, the chapter isn't downloaded.
|
||||
if (!download.directory.exists()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the page list doesn't exist, the chapter isn't downloaded
|
||||
List<Page> savedPages = getSavedPageList(download);
|
||||
if (savedPages == null)
|
||||
return false;
|
||||
|
||||
// Add the page list to the download object for future access
|
||||
download.pages = savedPages;
|
||||
|
||||
// If the number of files matches the number of pages, the chapter is downloaded.
|
||||
// We have the index file, so we check one file more
|
||||
return isChapterDownloaded(download.directory, download.pages);
|
||||
}
|
||||
|
||||
// Check that all the images are downloaded
|
||||
private boolean isChapterDownloaded(File directory, List<Page> pages) {
|
||||
return pages != null && !pages.isEmpty() && pages.size() + 1 == directory.listFiles().length;
|
||||
}
|
||||
|
||||
// Download the entire chapter
|
||||
private Observable<Download> downloadChapter(Download download) {
|
||||
try {
|
||||
DiskUtils.createDirectory(download.directory);
|
||||
} catch (IOException e) {
|
||||
return Observable.error(e);
|
||||
}
|
||||
|
||||
Observable<List<Page>> pageListObservable = download.pages == null ?
|
||||
// Pull page list from network and add them to download object
|
||||
download.source
|
||||
.pullPageListFromNetwork(download.chapter.url)
|
||||
.doOnNext(pages -> download.pages = pages)
|
||||
.doOnNext(pages -> savePageList(download)) :
|
||||
// Or if the page list already exists, start from the file
|
||||
Observable.just(download.pages);
|
||||
|
||||
return Observable.defer(() -> pageListObservable
|
||||
.doOnNext(pages -> {
|
||||
download.downloadedImages = 0;
|
||||
download.setStatus(Download.DOWNLOADING);
|
||||
})
|
||||
// Get all the URLs to the source images, fetch pages if necessary
|
||||
.flatMap(download.source::getAllImageUrlsFromPageList)
|
||||
// Start downloading images, consider we can have downloaded images already
|
||||
.concatMap(page -> getOrDownloadImage(page, download))
|
||||
// Do after download completes
|
||||
.doOnCompleted(() -> onDownloadCompleted(download))
|
||||
.toList()
|
||||
.map(pages -> download)
|
||||
// If the page list threw, it will resume here
|
||||
.onErrorResumeNext(error -> {
|
||||
download.setStatus(Download.ERROR);
|
||||
return Observable.just(download);
|
||||
}))
|
||||
.subscribeOn(Schedulers.io());
|
||||
}
|
||||
|
||||
// Get the image from the filesystem if it exists or download from network
|
||||
private Observable<Page> getOrDownloadImage(final Page page, Download download) {
|
||||
// If the image URL is empty, do nothing
|
||||
if (page.getImageUrl() == null)
|
||||
return Observable.just(page);
|
||||
|
||||
String filename = getImageFilename(page);
|
||||
File imagePath = new File(download.directory, filename);
|
||||
|
||||
// If the image is already downloaded, do nothing. Otherwise download from network
|
||||
Observable<Page> pageObservable = isImageDownloaded(imagePath) ?
|
||||
Observable.just(page) :
|
||||
downloadImage(page, download.source, download.directory, filename);
|
||||
|
||||
return pageObservable
|
||||
// When the image is ready, set image path, progress (just in case) and status
|
||||
.doOnNext(p -> {
|
||||
page.setImagePath(imagePath.getAbsolutePath());
|
||||
page.setProgress(100);
|
||||
download.downloadedImages++;
|
||||
page.setStatus(Page.READY);
|
||||
})
|
||||
// Mark this page as error and allow to download the remaining
|
||||
.onErrorResumeNext(e -> {
|
||||
page.setProgress(0);
|
||||
page.setStatus(Page.ERROR);
|
||||
return Observable.just(page);
|
||||
});
|
||||
}
|
||||
|
||||
// Save image on disk
|
||||
private Observable<Page> downloadImage(Page page, Source source, File directory, String filename) {
|
||||
page.setStatus(Page.DOWNLOAD_IMAGE);
|
||||
return source.getImageProgressResponse(page)
|
||||
.flatMap(resp -> {
|
||||
try {
|
||||
DiskUtils.saveBufferedSourceToDirectory(resp.body().source(), directory, filename);
|
||||
} catch (Exception e) {
|
||||
Timber.e(e.getCause(), e.getMessage());
|
||||
return Observable.error(e);
|
||||
}
|
||||
return Observable.just(page);
|
||||
})
|
||||
.retry(2);
|
||||
}
|
||||
|
||||
// Public method to get the image from the filesystem. It does NOT provide any way to download the image
|
||||
public Observable<Page> getDownloadedImage(final Page page, File chapterDir) {
|
||||
if (page.getImageUrl() == null) {
|
||||
page.setStatus(Page.ERROR);
|
||||
return Observable.just(page);
|
||||
}
|
||||
|
||||
File imagePath = new File(chapterDir, getImageFilename(page));
|
||||
|
||||
// When the image is ready, set image path, progress (just in case) and status
|
||||
if (isImageDownloaded(imagePath)) {
|
||||
page.setImagePath(imagePath.getAbsolutePath());
|
||||
page.setProgress(100);
|
||||
page.setStatus(Page.READY);
|
||||
} else {
|
||||
page.setStatus(Page.ERROR);
|
||||
}
|
||||
return Observable.just(page);
|
||||
}
|
||||
|
||||
// Get the filename for an image given the page
|
||||
private String getImageFilename(Page page) {
|
||||
String url = page.getImageUrl();
|
||||
int number = page.getPageNumber() + 1;
|
||||
// Try to preserve file extension
|
||||
if (UrlUtil.isJpg(url)) {
|
||||
return number + ".jpg";
|
||||
} else if (UrlUtil.isPng(url)) {
|
||||
return number + ".png";
|
||||
} else if (UrlUtil.isGif(url)) {
|
||||
return number + ".gif";
|
||||
}
|
||||
return Uri.parse(url).getLastPathSegment().replaceAll("[^\\sa-zA-Z0-9.-]", "_");
|
||||
}
|
||||
|
||||
private boolean isImageDownloaded(File imagePath) {
|
||||
return imagePath.exists();
|
||||
}
|
||||
|
||||
// Called when a download finishes. This doesn't mean the download was successful, so we check it
|
||||
private void onDownloadCompleted(final Download download) {
|
||||
checkDownloadIsSuccessful(download);
|
||||
savePageList(download);
|
||||
}
|
||||
|
||||
private void checkDownloadIsSuccessful(final Download download) {
|
||||
int actualProgress = 0;
|
||||
int status = Download.DOWNLOADED;
|
||||
// If any page has an error, the download result will be error
|
||||
for (Page page : download.pages) {
|
||||
actualProgress += page.getProgress();
|
||||
if (page.getStatus() != Page.READY) status = Download.ERROR;
|
||||
}
|
||||
// Ensure that the chapter folder has all the images
|
||||
if (!isChapterDownloaded(download.directory, download.pages)) {
|
||||
status = Download.ERROR;
|
||||
}
|
||||
download.totalProgress = actualProgress;
|
||||
download.setStatus(status);
|
||||
// Delete successful downloads from queue after notifying
|
||||
if (status == Download.DOWNLOADED) {
|
||||
queue.remove(download);
|
||||
}
|
||||
}
|
||||
|
||||
// Return the page list from the chapter's directory if it exists, null otherwise
|
||||
public List<Page> getSavedPageList(Source source, Manga manga, Chapter chapter) {
|
||||
File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter);
|
||||
File pagesFile = new File(chapterDir, PAGE_LIST_FILE);
|
||||
|
||||
JsonReader reader = null;
|
||||
try {
|
||||
if (pagesFile.exists()) {
|
||||
reader = new JsonReader(new FileReader(pagesFile.getAbsolutePath()));
|
||||
Type collectionType = new TypeToken<List<Page>>() {}.getType();
|
||||
return gson.fromJson(reader, collectionType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Timber.e(e.getCause(), e.getMessage());
|
||||
} finally {
|
||||
if (reader != null) try { reader.close(); } catch (IOException e) { /* Do nothing */ }
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Shortcut for the method above
|
||||
private List<Page> getSavedPageList(Download download) {
|
||||
return getSavedPageList(download.source, download.manga, download.chapter);
|
||||
}
|
||||
|
||||
// Save the page list to the chapter's directory
|
||||
public void savePageList(Source source, Manga manga, Chapter chapter, List<Page> pages) {
|
||||
File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter);
|
||||
File pagesFile = new File(chapterDir, PAGE_LIST_FILE);
|
||||
|
||||
FileOutputStream out = null;
|
||||
try {
|
||||
out = new FileOutputStream(pagesFile);
|
||||
out.write(gson.toJson(pages).getBytes());
|
||||
out.flush();
|
||||
} catch (IOException e) {
|
||||
Timber.e(e.getCause(), e.getMessage());
|
||||
} finally {
|
||||
if (out != null) try { out.close(); } catch (IOException e) { /* Do nothing */ }
|
||||
}
|
||||
}
|
||||
|
||||
// Shortcut for the method above
|
||||
private void savePageList(Download download) {
|
||||
savePageList(download.source, download.manga, download.chapter, download.pages);
|
||||
}
|
||||
|
||||
public File getAbsoluteMangaDirectory(Source source, Manga manga) {
|
||||
String mangaRelativePath = source.getVisibleName() +
|
||||
File.separator +
|
||||
manga.title.replaceAll("[^\\sa-zA-Z0-9.-]", "_");
|
||||
|
||||
return new File(preferences.getDownloadsDirectory(), mangaRelativePath);
|
||||
}
|
||||
|
||||
// Get the absolute path to the chapter directory
|
||||
public File getAbsoluteChapterDirectory(Source source, Manga manga, Chapter chapter) {
|
||||
String chapterRelativePath = chapter.name.replaceAll("[^\\sa-zA-Z0-9.-]", "_");
|
||||
|
||||
return new File(getAbsoluteMangaDirectory(source, manga), chapterRelativePath);
|
||||
}
|
||||
|
||||
// Shortcut for the method above
|
||||
private File getAbsoluteChapterDirectory(Download download) {
|
||||
return getAbsoluteChapterDirectory(download.source, download.manga, download.chapter);
|
||||
}
|
||||
|
||||
public void deleteChapter(Source source, Manga manga, Chapter chapter) {
|
||||
File path = getAbsoluteChapterDirectory(source, manga, chapter);
|
||||
DiskUtils.deleteFiles(path);
|
||||
}
|
||||
|
||||
public DownloadQueue getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
public boolean areAllDownloadsFinished() {
|
||||
for (Download download : queue) {
|
||||
if (download.getStatus() <= Download.DOWNLOADING)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean startDownloads() {
|
||||
if (queue.isEmpty())
|
||||
return false;
|
||||
|
||||
if (downloadsSubscription == null || downloadsSubscription.isUnsubscribed())
|
||||
initializeSubscriptions();
|
||||
|
||||
final List<Download> pending = new ArrayList<>();
|
||||
for (Download download : queue) {
|
||||
if (download.getStatus() != Download.DOWNLOADED) {
|
||||
if (download.getStatus() != Download.QUEUE) download.setStatus(Download.QUEUE);
|
||||
pending.add(download);
|
||||
}
|
||||
}
|
||||
downloadsQueueSubject.onNext(pending);
|
||||
|
||||
return !pending.isEmpty();
|
||||
}
|
||||
|
||||
public void stopDownloads() {
|
||||
destroySubscriptions();
|
||||
for (Download download : queue) {
|
||||
if (download.getStatus() == Download.DOWNLOADING) {
|
||||
download.setStatus(Download.ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public BehaviorSubject<Boolean> getRunningSubject() {
|
||||
return runningSubject;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,434 @@
|
||||
package eu.kanade.tachiyomi.data.download
|
||||
|
||||
import android.content.Context
|
||||
import android.net.Uri
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.reflect.TypeToken
|
||||
import com.google.gson.stream.JsonReader
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter
|
||||
import eu.kanade.tachiyomi.data.database.models.Manga
|
||||
import eu.kanade.tachiyomi.data.download.model.Download
|
||||
import eu.kanade.tachiyomi.data.download.model.DownloadQueue
|
||||
import eu.kanade.tachiyomi.data.preference.PreferencesHelper
|
||||
import eu.kanade.tachiyomi.data.source.SourceManager
|
||||
import eu.kanade.tachiyomi.data.source.base.Source
|
||||
import eu.kanade.tachiyomi.data.source.model.Page
|
||||
import eu.kanade.tachiyomi.event.DownloadChaptersEvent
|
||||
import eu.kanade.tachiyomi.util.DiskUtils
|
||||
import eu.kanade.tachiyomi.util.DynamicConcurrentMergeOperator
|
||||
import eu.kanade.tachiyomi.util.UrlUtil
|
||||
import eu.kanade.tachiyomi.util.toast
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.schedulers.Schedulers
|
||||
import rx.subjects.BehaviorSubject
|
||||
import rx.subjects.PublishSubject
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.io.FileReader
|
||||
import java.io.IOException
|
||||
import java.util.*
|
||||
|
||||
class DownloadManager(private val context: Context, private val sourceManager: SourceManager, private val preferences: PreferencesHelper) {
|
||||
|
||||
private val gson = Gson()
|
||||
|
||||
private val downloadsQueueSubject = PublishSubject.create<List<Download>>()
|
||||
val runningSubject = BehaviorSubject.create<Boolean>()
|
||||
private var downloadsSubscription: Subscription? = null
|
||||
|
||||
private val threadsSubject = BehaviorSubject.create<Int>()
|
||||
private var threadsSubscription: Subscription? = null
|
||||
|
||||
val queue = DownloadQueue()
|
||||
|
||||
val imageFilenameRegex = "[^\\sa-zA-Z0-9.-]".toRegex()
|
||||
|
||||
val PAGE_LIST_FILE = "index.json"
|
||||
|
||||
@Volatile private var isRunning: Boolean = false
|
||||
|
||||
private fun initializeSubscriptions() {
|
||||
downloadsSubscription?.unsubscribe()
|
||||
|
||||
threadsSubscription = preferences.downloadThreads().asObservable()
|
||||
.subscribe { threadsSubject.onNext(it) }
|
||||
|
||||
downloadsSubscription = downloadsQueueSubject.flatMap { Observable.from(it) }
|
||||
.lift(DynamicConcurrentMergeOperator<Download, Download>({ downloadChapter(it) }, threadsSubject))
|
||||
.onBackpressureBuffer()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.map { download -> areAllDownloadsFinished() }
|
||||
.subscribe({ finished ->
|
||||
if (finished!!) {
|
||||
DownloadService.stop(context)
|
||||
}
|
||||
}, { e ->
|
||||
DownloadService.stop(context)
|
||||
Timber.e(e, e.message)
|
||||
context.toast(e.message)
|
||||
})
|
||||
|
||||
if (!isRunning) {
|
||||
isRunning = true
|
||||
runningSubject.onNext(true)
|
||||
}
|
||||
}
|
||||
|
||||
fun destroySubscriptions() {
|
||||
if (isRunning) {
|
||||
isRunning = false
|
||||
runningSubject.onNext(false)
|
||||
}
|
||||
|
||||
if (downloadsSubscription != null) {
|
||||
downloadsSubscription?.unsubscribe()
|
||||
downloadsSubscription = null
|
||||
}
|
||||
|
||||
if (threadsSubscription != null) {
|
||||
threadsSubscription?.unsubscribe()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Create a download object for every chapter in the event and add them to the downloads queue
|
||||
fun onDownloadChaptersEvent(event: DownloadChaptersEvent) {
|
||||
val manga = event.manga
|
||||
val source = sourceManager.get(manga.source)
|
||||
|
||||
// Used to avoid downloading chapters with the same name
|
||||
val addedChapters = ArrayList<String>()
|
||||
val pending = ArrayList<Download>()
|
||||
|
||||
for (chapter in event.chapters) {
|
||||
if (addedChapters.contains(chapter.name))
|
||||
continue
|
||||
|
||||
addedChapters.add(chapter.name)
|
||||
val download = Download(source, manga, chapter)
|
||||
|
||||
if (!prepareDownload(download)) {
|
||||
queue.add(download)
|
||||
pending.add(download)
|
||||
}
|
||||
}
|
||||
if (isRunning) downloadsQueueSubject.onNext(pending)
|
||||
}
|
||||
|
||||
// Public method to check if a chapter is downloaded
|
||||
fun isChapterDownloaded(source: Source, manga: Manga, chapter: Chapter): Boolean {
|
||||
val directory = getAbsoluteChapterDirectory(source, manga, chapter)
|
||||
if (!directory.exists())
|
||||
return false
|
||||
|
||||
val pages = getSavedPageList(source, manga, chapter)
|
||||
return isChapterDownloaded(directory, pages)
|
||||
}
|
||||
|
||||
// Prepare the download. Returns true if the chapter is already downloaded
|
||||
private fun prepareDownload(download: Download): Boolean {
|
||||
// If the chapter is already queued, don't add it again
|
||||
for (queuedDownload in queue) {
|
||||
if (download.chapter.id == queuedDownload.chapter.id)
|
||||
return true
|
||||
}
|
||||
|
||||
// Add the directory to the download object for future access
|
||||
download.directory = getAbsoluteChapterDirectory(download)
|
||||
|
||||
// If the directory doesn't exist, the chapter isn't downloaded.
|
||||
if (!download.directory.exists()) {
|
||||
return false
|
||||
}
|
||||
|
||||
// If the page list doesn't exist, the chapter isn't downloaded
|
||||
val savedPages = getSavedPageList(download) ?: return false
|
||||
|
||||
// Add the page list to the download object for future access
|
||||
download.pages = savedPages
|
||||
|
||||
// If the number of files matches the number of pages, the chapter is downloaded.
|
||||
// We have the index file, so we check one file more
|
||||
return isChapterDownloaded(download.directory, download.pages)
|
||||
}
|
||||
|
||||
// Check that all the images are downloaded
|
||||
private fun isChapterDownloaded(directory: File, pages: List<Page>?): Boolean {
|
||||
return pages != null && !pages.isEmpty() && pages.size + 1 == directory.listFiles().size
|
||||
}
|
||||
|
||||
// Download the entire chapter
|
||||
private fun downloadChapter(download: Download): Observable<Download> {
|
||||
try {
|
||||
DiskUtils.createDirectory(download.directory)
|
||||
} catch (e: IOException) {
|
||||
return Observable.error<Download>(e)
|
||||
}
|
||||
|
||||
val pageListObservable = if (download.pages == null)
|
||||
// Pull page list from network and add them to download object
|
||||
download.source.pullPageListFromNetwork(download.chapter.url)
|
||||
.doOnNext { pages ->
|
||||
download.pages = pages
|
||||
savePageList(download)
|
||||
}
|
||||
else
|
||||
// Or if the page list already exists, start from the file
|
||||
Observable.just(download.pages)
|
||||
|
||||
return Observable.defer<Download> { pageListObservable
|
||||
.doOnNext { pages ->
|
||||
download.downloadedImages = 0
|
||||
download.status = Download.DOWNLOADING
|
||||
}
|
||||
// Get all the URLs to the source images, fetch pages if necessary
|
||||
.flatMap { download.source.getAllImageUrlsFromPageList(it) }
|
||||
// Start downloading images, consider we can have downloaded images already
|
||||
.concatMap { page -> getOrDownloadImage(page, download) }
|
||||
// Do after download completes
|
||||
.doOnCompleted { onDownloadCompleted(download) }
|
||||
.toList()
|
||||
.map { pages -> download }
|
||||
// If the page list threw, it will resume here
|
||||
.onErrorResumeNext { error ->
|
||||
download.status = Download.ERROR
|
||||
Observable.just(download)
|
||||
}
|
||||
}.subscribeOn(Schedulers.io())
|
||||
}
|
||||
|
||||
// Get the image from the filesystem if it exists or download from network
|
||||
private fun getOrDownloadImage(page: Page, download: Download): Observable<Page> {
|
||||
// If the image URL is empty, do nothing
|
||||
if (page.imageUrl == null)
|
||||
return Observable.just(page)
|
||||
|
||||
val filename = getImageFilename(page)
|
||||
val imagePath = File(download.directory, filename)
|
||||
|
||||
// If the image is already downloaded, do nothing. Otherwise download from network
|
||||
val pageObservable = if (isImageDownloaded(imagePath))
|
||||
Observable.just(page)
|
||||
else
|
||||
downloadImage(page, download.source, download.directory, filename)
|
||||
|
||||
return pageObservable
|
||||
// When the image is ready, set image path, progress (just in case) and status
|
||||
.doOnNext {
|
||||
page.imagePath = imagePath.absolutePath
|
||||
page.progress = 100
|
||||
download.downloadedImages++
|
||||
page.status = Page.READY
|
||||
}
|
||||
// Mark this page as error and allow to download the remaining
|
||||
.onErrorResumeNext {
|
||||
page.progress = 0
|
||||
page.status = Page.ERROR
|
||||
Observable.just(page)
|
||||
}
|
||||
}
|
||||
|
||||
// Save image on disk
|
||||
private fun downloadImage(page: Page, source: Source, directory: File, filename: String): Observable<Page> {
|
||||
page.status = Page.DOWNLOAD_IMAGE
|
||||
return source.getImageProgressResponse(page)
|
||||
.flatMap({ resp ->
|
||||
try {
|
||||
DiskUtils.saveBufferedSourceToDirectory(resp.body().source(), directory, filename)
|
||||
Observable.just(page)
|
||||
} catch (e: Exception) {
|
||||
Timber.e(e.cause, e.message)
|
||||
Observable.error<Page>(e)
|
||||
}
|
||||
}).retry(2)
|
||||
}
|
||||
|
||||
// Public method to get the image from the filesystem. It does NOT provide any way to download the image
|
||||
fun getDownloadedImage(page: Page, chapterDir: File): Observable<Page> {
|
||||
if (page.imageUrl == null) {
|
||||
page.status = Page.ERROR
|
||||
return Observable.just(page)
|
||||
}
|
||||
|
||||
val imagePath = File(chapterDir, getImageFilename(page))
|
||||
|
||||
// When the image is ready, set image path, progress (just in case) and status
|
||||
if (isImageDownloaded(imagePath)) {
|
||||
page.imagePath = imagePath.absolutePath
|
||||
page.progress = 100
|
||||
page.status = Page.READY
|
||||
} else {
|
||||
page.status = Page.ERROR
|
||||
}
|
||||
return Observable.just(page)
|
||||
}
|
||||
|
||||
// Get the filename for an image given the page
|
||||
private fun getImageFilename(page: Page): String {
|
||||
val url = page.imageUrl
|
||||
val number = page.pageNumber + 1
|
||||
// Try to preserve file extension
|
||||
if (UrlUtil.isJpg(url)) {
|
||||
return "$number.jpg"
|
||||
} else if (UrlUtil.isPng(url)) {
|
||||
return "$number.png"
|
||||
} else if (UrlUtil.isGif(url)) {
|
||||
return "$number.gif"
|
||||
}
|
||||
return Uri.parse(url).lastPathSegment.replace(imageFilenameRegex, "_")
|
||||
}
|
||||
|
||||
private fun isImageDownloaded(imagePath: File): Boolean {
|
||||
return imagePath.exists()
|
||||
}
|
||||
|
||||
// Called when a download finishes. This doesn't mean the download was successful, so we check it
|
||||
private fun onDownloadCompleted(download: Download) {
|
||||
checkDownloadIsSuccessful(download)
|
||||
savePageList(download)
|
||||
}
|
||||
|
||||
private fun checkDownloadIsSuccessful(download: Download) {
|
||||
var actualProgress = 0
|
||||
var status = Download.DOWNLOADED
|
||||
// If any page has an error, the download result will be error
|
||||
for (page in download.pages) {
|
||||
actualProgress += page.progress
|
||||
if (page.status != Page.READY) status = Download.ERROR
|
||||
}
|
||||
// Ensure that the chapter folder has all the images
|
||||
if (!isChapterDownloaded(download.directory, download.pages)) {
|
||||
status = Download.ERROR
|
||||
}
|
||||
download.totalProgress = actualProgress
|
||||
download.status = status
|
||||
// Delete successful downloads from queue after notifying
|
||||
if (status == Download.DOWNLOADED) {
|
||||
queue.del(download)
|
||||
}
|
||||
}
|
||||
|
||||
// Return the page list from the chapter's directory if it exists, null otherwise
|
||||
fun getSavedPageList(source: Source, manga: Manga, chapter: Chapter): List<Page>? {
|
||||
val chapterDir = getAbsoluteChapterDirectory(source, manga, chapter)
|
||||
val pagesFile = File(chapterDir, PAGE_LIST_FILE)
|
||||
|
||||
var reader: JsonReader? = null
|
||||
try {
|
||||
if (pagesFile.exists()) {
|
||||
reader = JsonReader(FileReader(pagesFile.absolutePath))
|
||||
val collectionType = object : TypeToken<List<Page>>() {
|
||||
|
||||
}.type
|
||||
return gson.fromJson<List<Page>>(reader, collectionType)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Timber.e(e.cause, e.message)
|
||||
} finally {
|
||||
if (reader != null) try {
|
||||
reader.close()
|
||||
} catch (e: IOException) {
|
||||
/* Do nothing */
|
||||
}
|
||||
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
// Shortcut for the method above
|
||||
private fun getSavedPageList(download: Download): List<Page>? {
|
||||
return getSavedPageList(download.source, download.manga, download.chapter)
|
||||
}
|
||||
|
||||
// Save the page list to the chapter's directory
|
||||
fun savePageList(source: Source, manga: Manga, chapter: Chapter, pages: List<Page>) {
|
||||
val chapterDir = getAbsoluteChapterDirectory(source, manga, chapter)
|
||||
val pagesFile = File(chapterDir, PAGE_LIST_FILE)
|
||||
|
||||
var out: FileOutputStream? = null
|
||||
try {
|
||||
out = FileOutputStream(pagesFile)
|
||||
out.write(gson.toJson(pages).toByteArray())
|
||||
out.flush()
|
||||
} catch (e: IOException) {
|
||||
Timber.e(e.cause, e.message)
|
||||
} finally {
|
||||
if (out != null) try {
|
||||
out.close()
|
||||
} catch (e: IOException) {
|
||||
/* Do nothing */
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Shortcut for the method above
|
||||
private fun savePageList(download: Download) {
|
||||
savePageList(download.source, download.manga, download.chapter, download.pages)
|
||||
}
|
||||
|
||||
fun getAbsoluteMangaDirectory(source: Source, manga: Manga): File {
|
||||
val mangaRelativePath = source.visibleName +
|
||||
File.separator +
|
||||
manga.title.replace("[^\\sa-zA-Z0-9.-]".toRegex(), "_")
|
||||
|
||||
return File(preferences.downloadsDirectory, mangaRelativePath)
|
||||
}
|
||||
|
||||
// Get the absolute path to the chapter directory
|
||||
fun getAbsoluteChapterDirectory(source: Source, manga: Manga, chapter: Chapter): File {
|
||||
val chapterRelativePath = chapter.name.replace("[^\\sa-zA-Z0-9.-]".toRegex(), "_")
|
||||
|
||||
return File(getAbsoluteMangaDirectory(source, manga), chapterRelativePath)
|
||||
}
|
||||
|
||||
// Shortcut for the method above
|
||||
private fun getAbsoluteChapterDirectory(download: Download): File {
|
||||
return getAbsoluteChapterDirectory(download.source, download.manga, download.chapter)
|
||||
}
|
||||
|
||||
fun deleteChapter(source: Source, manga: Manga, chapter: Chapter) {
|
||||
val path = getAbsoluteChapterDirectory(source, manga, chapter)
|
||||
DiskUtils.deleteFiles(path)
|
||||
}
|
||||
|
||||
fun areAllDownloadsFinished(): Boolean {
|
||||
for (download in queue) {
|
||||
if (download.status <= Download.DOWNLOADING)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
fun startDownloads(): Boolean {
|
||||
if (queue.isEmpty())
|
||||
return false
|
||||
|
||||
if (downloadsSubscription == null || downloadsSubscription!!.isUnsubscribed)
|
||||
initializeSubscriptions()
|
||||
|
||||
val pending = ArrayList<Download>()
|
||||
for (download in queue) {
|
||||
if (download.status != Download.DOWNLOADED) {
|
||||
if (download.status != Download.QUEUE) download.status = Download.QUEUE
|
||||
pending.add(download)
|
||||
}
|
||||
}
|
||||
downloadsQueueSubject.onNext(pending)
|
||||
|
||||
return !pending.isEmpty()
|
||||
}
|
||||
|
||||
fun stopDownloads() {
|
||||
destroySubscriptions()
|
||||
for (download in queue) {
|
||||
if (download.status == Download.DOWNLOADING) {
|
||||
download.status = Download.ERROR
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
package eu.kanade.tachiyomi.data.download;
|
||||
|
||||
import android.app.Service;
|
||||
import android.content.Context;
|
||||
import android.content.Intent;
|
||||
import android.os.IBinder;
|
||||
import android.os.PowerManager;
|
||||
|
||||
import com.github.pwittchen.reactivenetwork.library.ReactiveNetwork;
|
||||
|
||||
import org.greenrobot.eventbus.EventBus;
|
||||
import org.greenrobot.eventbus.Subscribe;
|
||||
import org.greenrobot.eventbus.ThreadMode;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import eu.kanade.tachiyomi.App;
|
||||
import eu.kanade.tachiyomi.R;
|
||||
import eu.kanade.tachiyomi.data.preference.PreferencesHelper;
|
||||
import eu.kanade.tachiyomi.event.DownloadChaptersEvent;
|
||||
import eu.kanade.tachiyomi.util.ToastUtil;
|
||||
import rx.Subscription;
|
||||
import rx.android.schedulers.AndroidSchedulers;
|
||||
import rx.schedulers.Schedulers;
|
||||
|
||||
public class DownloadService extends Service {
|
||||
|
||||
@Inject DownloadManager downloadManager;
|
||||
@Inject PreferencesHelper preferences;
|
||||
|
||||
private PowerManager.WakeLock wakeLock;
|
||||
private Subscription networkChangeSubscription;
|
||||
private Subscription queueRunningSubscription;
|
||||
private boolean isRunning;
|
||||
|
||||
public static void start(Context context) {
|
||||
context.startService(new Intent(context, DownloadService.class));
|
||||
}
|
||||
|
||||
public static void stop(Context context) {
|
||||
context.stopService(new Intent(context, DownloadService.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCreate() {
|
||||
super.onCreate();
|
||||
App.get(this).getComponent().inject(this);
|
||||
|
||||
createWakeLock();
|
||||
|
||||
listenQueueRunningChanges();
|
||||
EventBus.getDefault().register(this);
|
||||
listenNetworkChanges();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int onStartCommand(Intent intent, int flags, int startId) {
|
||||
return START_STICKY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDestroy() {
|
||||
EventBus.getDefault().unregister(this);
|
||||
queueRunningSubscription.unsubscribe();
|
||||
networkChangeSubscription.unsubscribe();
|
||||
downloadManager.destroySubscriptions();
|
||||
destroyWakeLock();
|
||||
super.onDestroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IBinder onBind(Intent intent) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Subscribe(sticky = true, threadMode = ThreadMode.MAIN)
|
||||
public void onEvent(DownloadChaptersEvent event) {
|
||||
EventBus.getDefault().removeStickyEvent(event);
|
||||
downloadManager.onDownloadChaptersEvent(event);
|
||||
}
|
||||
|
||||
private void listenNetworkChanges() {
|
||||
networkChangeSubscription = new ReactiveNetwork().enableInternetCheck()
|
||||
.observeConnectivity(getApplicationContext())
|
||||
.subscribeOn(Schedulers.io())
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe(state -> {
|
||||
switch (state) {
|
||||
case WIFI_CONNECTED_HAS_INTERNET:
|
||||
// If there are no remaining downloads, destroy the service
|
||||
if (!isRunning && !downloadManager.startDownloads()) {
|
||||
stopSelf();
|
||||
}
|
||||
break;
|
||||
case MOBILE_CONNECTED:
|
||||
if (!preferences.downloadOnlyOverWifi()) {
|
||||
if (!isRunning && !downloadManager.startDownloads()) {
|
||||
stopSelf();
|
||||
}
|
||||
} else if (isRunning) {
|
||||
downloadManager.stopDownloads();
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (isRunning) {
|
||||
downloadManager.stopDownloads();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}, error -> {
|
||||
ToastUtil.showShort(this, R.string.download_queue_error);
|
||||
stopSelf();
|
||||
});
|
||||
}
|
||||
|
||||
private void listenQueueRunningChanges() {
|
||||
queueRunningSubscription = downloadManager.getRunningSubject()
|
||||
.subscribe(running -> {
|
||||
isRunning = running;
|
||||
if (running)
|
||||
acquireWakeLock();
|
||||
else
|
||||
releaseWakeLock();
|
||||
});
|
||||
}
|
||||
|
||||
private void createWakeLock() {
|
||||
wakeLock = ((PowerManager)getSystemService(POWER_SERVICE)).newWakeLock(
|
||||
PowerManager.PARTIAL_WAKE_LOCK, "DownloadService:WakeLock");
|
||||
}
|
||||
|
||||
private void destroyWakeLock() {
|
||||
if (wakeLock != null && wakeLock.isHeld()) {
|
||||
wakeLock.release();
|
||||
wakeLock = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void acquireWakeLock() {
|
||||
if (wakeLock != null && !wakeLock.isHeld()) {
|
||||
wakeLock.acquire();
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseWakeLock() {
|
||||
if (wakeLock != null && wakeLock.isHeld()) {
|
||||
wakeLock.release();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,146 @@
|
||||
package eu.kanade.tachiyomi.data.download
|
||||
|
||||
import android.app.Service
|
||||
import android.content.Context
|
||||
import android.content.Intent
|
||||
import android.os.IBinder
|
||||
import android.os.PowerManager
|
||||
import com.github.pwittchen.reactivenetwork.library.ConnectivityStatus
|
||||
import com.github.pwittchen.reactivenetwork.library.ReactiveNetwork
|
||||
import eu.kanade.tachiyomi.App
|
||||
import eu.kanade.tachiyomi.R
|
||||
import eu.kanade.tachiyomi.data.preference.PreferencesHelper
|
||||
import eu.kanade.tachiyomi.event.DownloadChaptersEvent
|
||||
import eu.kanade.tachiyomi.util.toast
|
||||
import org.greenrobot.eventbus.EventBus
|
||||
import org.greenrobot.eventbus.Subscribe
|
||||
import org.greenrobot.eventbus.ThreadMode
|
||||
import rx.Subscription
|
||||
import rx.android.schedulers.AndroidSchedulers
|
||||
import rx.schedulers.Schedulers
|
||||
import javax.inject.Inject
|
||||
|
||||
class DownloadService : Service() {
|
||||
|
||||
companion object {
|
||||
|
||||
fun start(context: Context) {
|
||||
context.startService(Intent(context, DownloadService::class.java))
|
||||
}
|
||||
|
||||
fun stop(context: Context) {
|
||||
context.stopService(Intent(context, DownloadService::class.java))
|
||||
}
|
||||
}
|
||||
|
||||
@Inject lateinit var downloadManager: DownloadManager
|
||||
@Inject lateinit var preferences: PreferencesHelper
|
||||
|
||||
private var wakeLock: PowerManager.WakeLock? = null
|
||||
private var networkChangeSubscription: Subscription? = null
|
||||
private var queueRunningSubscription: Subscription? = null
|
||||
private var isRunning: Boolean = false
|
||||
|
||||
override fun onCreate() {
|
||||
super.onCreate()
|
||||
App.get(this).component.inject(this)
|
||||
|
||||
createWakeLock()
|
||||
|
||||
listenQueueRunningChanges()
|
||||
EventBus.getDefault().register(this)
|
||||
listenNetworkChanges()
|
||||
}
|
||||
|
||||
override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int {
|
||||
return Service.START_STICKY
|
||||
}
|
||||
|
||||
override fun onDestroy() {
|
||||
EventBus.getDefault().unregister(this)
|
||||
queueRunningSubscription?.unsubscribe()
|
||||
networkChangeSubscription?.unsubscribe()
|
||||
downloadManager.destroySubscriptions()
|
||||
destroyWakeLock()
|
||||
super.onDestroy()
|
||||
}
|
||||
|
||||
override fun onBind(intent: Intent): IBinder? {
|
||||
return null
|
||||
}
|
||||
|
||||
@Subscribe(sticky = true, threadMode = ThreadMode.MAIN)
|
||||
fun onEvent(event: DownloadChaptersEvent) {
|
||||
EventBus.getDefault().removeStickyEvent(event)
|
||||
downloadManager.onDownloadChaptersEvent(event)
|
||||
}
|
||||
|
||||
private fun listenNetworkChanges() {
|
||||
networkChangeSubscription = ReactiveNetwork().enableInternetCheck()
|
||||
.observeConnectivity(applicationContext)
|
||||
.subscribeOn(Schedulers.io())
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe({ state ->
|
||||
when (state) {
|
||||
ConnectivityStatus.WIFI_CONNECTED_HAS_INTERNET -> {
|
||||
// If there are no remaining downloads, destroy the service
|
||||
if (!isRunning && !downloadManager.startDownloads()) {
|
||||
stopSelf()
|
||||
}
|
||||
}
|
||||
ConnectivityStatus.MOBILE_CONNECTED -> {
|
||||
if (!preferences.downloadOnlyOverWifi()) {
|
||||
if (!isRunning && !downloadManager.startDownloads()) {
|
||||
stopSelf()
|
||||
}
|
||||
} else if (isRunning) {
|
||||
downloadManager.stopDownloads()
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
if (isRunning) {
|
||||
downloadManager.stopDownloads()
|
||||
}
|
||||
}
|
||||
}
|
||||
}, { error ->
|
||||
toast(R.string.download_queue_error)
|
||||
stopSelf()
|
||||
})
|
||||
}
|
||||
|
||||
private fun listenQueueRunningChanges() {
|
||||
queueRunningSubscription = downloadManager.runningSubject.subscribe { running ->
|
||||
isRunning = running
|
||||
if (running)
|
||||
acquireWakeLock()
|
||||
else
|
||||
releaseWakeLock()
|
||||
}
|
||||
}
|
||||
|
||||
private fun createWakeLock() {
|
||||
wakeLock = (getSystemService(Context.POWER_SERVICE) as PowerManager).newWakeLock(
|
||||
PowerManager.PARTIAL_WAKE_LOCK, "DownloadService:WakeLock")
|
||||
}
|
||||
|
||||
private fun destroyWakeLock() {
|
||||
if (wakeLock != null && wakeLock!!.isHeld) {
|
||||
wakeLock!!.release()
|
||||
wakeLock = null
|
||||
}
|
||||
}
|
||||
|
||||
fun acquireWakeLock() {
|
||||
if (wakeLock != null && !wakeLock!!.isHeld) {
|
||||
wakeLock!!.acquire()
|
||||
}
|
||||
}
|
||||
|
||||
fun releaseWakeLock() {
|
||||
if (wakeLock != null && wakeLock!!.isHeld) {
|
||||
wakeLock!!.release()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
package eu.kanade.tachiyomi.data.download.model;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter;
|
||||
import eu.kanade.tachiyomi.data.source.model.Page;
|
||||
import rx.Observable;
|
||||
import rx.subjects.PublishSubject;
|
||||
|
||||
public class DownloadQueue extends ArrayList<Download> {
|
||||
|
||||
private PublishSubject<Download> statusSubject;
|
||||
|
||||
public DownloadQueue() {
|
||||
super();
|
||||
statusSubject = PublishSubject.create();
|
||||
}
|
||||
|
||||
public boolean add(Download download) {
|
||||
download.setStatusSubject(statusSubject);
|
||||
download.setStatus(Download.QUEUE);
|
||||
return super.add(download);
|
||||
}
|
||||
|
||||
public void remove(Download download) {
|
||||
super.remove(download);
|
||||
download.setStatusSubject(null);
|
||||
}
|
||||
|
||||
public void remove(Chapter chapter) {
|
||||
for (Download download : this) {
|
||||
if (download.chapter.id.equals(chapter.id)) {
|
||||
remove(download);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Observable<Download> getActiveDownloads() {
|
||||
return Observable.from(this)
|
||||
.filter(download -> download.getStatus() == Download.DOWNLOADING);
|
||||
}
|
||||
|
||||
public Observable<Download> getStatusObservable() {
|
||||
return statusSubject.onBackpressureBuffer();
|
||||
}
|
||||
|
||||
public Observable<Download> getProgressObservable() {
|
||||
return statusSubject.onBackpressureBuffer()
|
||||
.startWith(getActiveDownloads())
|
||||
.flatMap(download -> {
|
||||
if (download.getStatus() == Download.DOWNLOADING) {
|
||||
PublishSubject<Integer> pageStatusSubject = PublishSubject.create();
|
||||
setPagesSubject(download.pages, pageStatusSubject);
|
||||
return pageStatusSubject
|
||||
.filter(status -> status == Page.READY)
|
||||
.map(status -> download);
|
||||
|
||||
} else if (download.getStatus() == Download.DOWNLOADED ||
|
||||
download.getStatus() == Download.ERROR) {
|
||||
|
||||
setPagesSubject(download.pages, null);
|
||||
}
|
||||
return Observable.just(download);
|
||||
})
|
||||
.filter(download -> download.getStatus() == Download.DOWNLOADING);
|
||||
}
|
||||
|
||||
private void setPagesSubject(List<Page> pages, PublishSubject<Integer> subject) {
|
||||
if (pages != null) {
|
||||
for (Page page : pages) {
|
||||
page.setStatusSubject(subject);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package eu.kanade.tachiyomi.data.download.model
|
||||
|
||||
import eu.kanade.tachiyomi.data.database.models.Chapter
|
||||
import eu.kanade.tachiyomi.data.source.model.Page
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.*
|
||||
|
||||
class DownloadQueue : ArrayList<Download>() {
|
||||
|
||||
private val statusSubject = PublishSubject.create<Download>()
|
||||
|
||||
override fun add(download: Download): Boolean {
|
||||
download.setStatusSubject(statusSubject)
|
||||
download.status = Download.QUEUE
|
||||
return super.add(download)
|
||||
}
|
||||
|
||||
fun del(download: Download) {
|
||||
super.remove(download)
|
||||
download.setStatusSubject(null)
|
||||
}
|
||||
|
||||
fun del(chapter: Chapter) {
|
||||
for (download in this) {
|
||||
if (download.chapter.id == chapter.id) {
|
||||
del(download)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun getActiveDownloads() =
|
||||
Observable.from(this).filter { download -> download.status == Download.DOWNLOADING }
|
||||
|
||||
fun getStatusObservable() = statusSubject.onBackpressureBuffer()
|
||||
|
||||
fun getProgressObservable(): Observable<Download> {
|
||||
return statusSubject.onBackpressureBuffer()
|
||||
.startWith(getActiveDownloads())
|
||||
.flatMap { download ->
|
||||
if (download.status == Download.DOWNLOADING) {
|
||||
val pageStatusSubject = PublishSubject.create<Int>()
|
||||
setPagesSubject(download.pages, pageStatusSubject)
|
||||
return@flatMap pageStatusSubject
|
||||
.filter { it == Page.READY }
|
||||
.map { download }
|
||||
|
||||
} else if (download.status == Download.DOWNLOADED || download.status == Download.ERROR) {
|
||||
setPagesSubject(download.pages, null)
|
||||
}
|
||||
Observable.just(download)
|
||||
}
|
||||
.filter { it.status == Download.DOWNLOADING }
|
||||
}
|
||||
|
||||
private fun setPagesSubject(pages: List<Page>?, subject: PublishSubject<Int>?) {
|
||||
if (pages != null) {
|
||||
for (page in pages) {
|
||||
page.setStatusSubject(subject)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -68,14 +68,14 @@ class DownloadPresenter : BasePresenter<DownloadFragment>() {
|
||||
override fun onTakeView(view: DownloadFragment) {
|
||||
super.onTakeView(view)
|
||||
|
||||
statusSubscription = downloadQueue.statusObservable
|
||||
.startWith(downloadQueue.activeDownloads)
|
||||
statusSubscription = downloadQueue.getStatusObservable()
|
||||
.startWith(downloadQueue.getActiveDownloads())
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe { processStatus(it, view) }
|
||||
|
||||
add(statusSubscription)
|
||||
|
||||
pageProgressSubscription = downloadQueue.progressObservable
|
||||
pageProgressSubscription = downloadQueue.getProgressObservable()
|
||||
.onBackpressureBuffer()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.subscribe { view.onUpdateDownloadedPages(it) }
|
||||
|
@ -174,7 +174,7 @@ class LibraryPresenter : BasePresenter<LibraryFragment>() {
|
||||
}
|
||||
|
||||
if (prefFilterDownloaded) {
|
||||
val mangaDir = downloadManager.getAbsoluteMangaDirectory(sourceManager.get(manga.source), manga)
|
||||
val mangaDir = downloadManager.getAbsoluteMangaDirectory(sourceManager.get(manga.source)!!, manga)
|
||||
|
||||
if (mangaDir.exists()) {
|
||||
for (file in mangaDir.listFiles()) {
|
||||
|
@ -130,7 +130,7 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
|
||||
}
|
||||
|
||||
fun getChapterStatusObs(): Observable<Download> {
|
||||
return downloadManager.queue.statusObservable
|
||||
return downloadManager.queue.getStatusObservable()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.filter { download -> download.manga.id == manga.id }
|
||||
.doOnNext { updateChapterStatus(it) }
|
||||
@ -214,7 +214,7 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
|
||||
|
||||
fun deleteChapters(selectedChapters: Observable<Chapter>) {
|
||||
add(selectedChapters.subscribe(
|
||||
{ chapter -> downloadManager.queue.remove(chapter) },
|
||||
{ chapter -> downloadManager.queue.del(chapter) },
|
||||
{ error -> Timber.e(error.message) },
|
||||
{
|
||||
if (onlyDownloaded())
|
||||
|
@ -243,7 +243,7 @@ class ReaderActivity : BaseRxActivity<ReaderPresenter>() {
|
||||
}
|
||||
|
||||
@Suppress("UNUSED_PARAMETER")
|
||||
fun onAdjacentChapters(previous: Chapter, next: Chapter) {
|
||||
fun onAdjacentChapters(previous: Chapter?, next: Chapter?) {
|
||||
setAdjacentChaptersVisibility()
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ class RecentChaptersPresenter : BasePresenter<RecentChaptersFragment>() {
|
||||
* @return download object containing download progress.
|
||||
*/
|
||||
private fun getChapterStatusObs(): Observable<Download> {
|
||||
return downloadManager.queue.statusObservable
|
||||
return downloadManager.queue.getStatusObservable()
|
||||
.observeOn(AndroidSchedulers.mainThread())
|
||||
.filter { download: Download ->
|
||||
if (chapterIdEquals(download.chapter.id))
|
||||
@ -188,7 +188,7 @@ class RecentChaptersPresenter : BasePresenter<RecentChaptersFragment>() {
|
||||
}
|
||||
|
||||
// Get source of chapter
|
||||
val source = sourceManager.get(mangaChapter.manga.source)
|
||||
val source = sourceManager.get(mangaChapter.manga.source)!!
|
||||
|
||||
// Check if chapter is downloaded
|
||||
if (downloadManager.isChapterDownloaded(source, mangaChapter.manga, mangaChapter.chapter)) {
|
||||
@ -271,7 +271,7 @@ class RecentChaptersPresenter : BasePresenter<RecentChaptersFragment>() {
|
||||
* @param manga manga that belongs to chapter
|
||||
*/
|
||||
fun deleteChapter(chapter: Chapter, manga: Manga) {
|
||||
val source = sourceManager.get(manga.source)
|
||||
val source = sourceManager.get(manga.source)!!
|
||||
downloadManager.deleteChapter(source, manga, chapter)
|
||||
}
|
||||
|
||||
@ -282,7 +282,7 @@ class RecentChaptersPresenter : BasePresenter<RecentChaptersFragment>() {
|
||||
fun deleteChapters(selectedChapters: Observable<Chapter>) {
|
||||
add(selectedChapters
|
||||
.subscribe(
|
||||
{ chapter -> downloadManager.queue.remove(chapter) })
|
||||
{ chapter -> downloadManager.queue.del(chapter) })
|
||||
{ error -> Timber.e(error.message) })
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user