diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.java b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.java index 5523351e70..9f2c58ee7f 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.java +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.java @@ -15,8 +15,6 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import eu.kanade.tachiyomi.data.database.models.Chapter; import eu.kanade.tachiyomi.data.database.models.Manga; @@ -28,6 +26,7 @@ import eu.kanade.tachiyomi.data.source.base.Source; import eu.kanade.tachiyomi.data.source.model.Page; import eu.kanade.tachiyomi.event.DownloadChaptersEvent; import eu.kanade.tachiyomi.util.DiskUtils; +import eu.kanade.tachiyomi.util.DynamicConcurrentMergeOperator; import eu.kanade.tachiyomi.util.UrlUtil; import rx.Observable; import rx.Subscription; @@ -48,11 +47,12 @@ public class DownloadManager { private BehaviorSubject runningSubject; private Subscription downloadsSubscription; + private BehaviorSubject threadsSubject; + private Subscription threadsSubscription; + private DownloadQueue queue; private volatile boolean isRunning; - private ExecutorService threadPool; - public static final String PAGE_LIST_FILE = "index.json"; public DownloadManager(Context context, SourceManager sourceManager, PreferencesHelper preferences) { @@ -65,17 +65,19 @@ public class DownloadManager { downloadsQueueSubject = PublishSubject.create(); runningSubject = BehaviorSubject.create(); + threadsSubject = BehaviorSubject.create(); } private void initializeSubscriptions() { if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) downloadsSubscription.unsubscribe(); - threadPool = Executors.newFixedThreadPool(preferences.downloadThreads()); + threadsSubscription = preferences.downloadThreads().asObservable() + .subscribe(threadsSubject::onNext); downloadsSubscription = downloadsQueueSubject .flatMap(Observable::from) - .flatMap(c -> downloadChapter(c).subscribeOn(Schedulers.from(threadPool))) + .lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threadsSubject)) .observeOn(AndroidSchedulers.mainThread()) .map(download -> areAllDownloadsFinished()) .subscribe(finished -> { @@ -101,9 +103,10 @@ public class DownloadManager { downloadsSubscription = null; } - if (threadPool != null && !threadPool.isShutdown()) { - threadPool.shutdown(); + if (threadsSubscription != null && !threadsSubscription.isUnsubscribed()) { + threadsSubscription.unsubscribe(); } + } // Create a download object for every chapter in the event and add them to the downloads queue @@ -208,7 +211,8 @@ public class DownloadManager { .onErrorResumeNext(error -> { download.setStatus(Download.ERROR); return Observable.just(download); - })); + })) + .subscribeOn(Schedulers.io()); } // Get the image from the filesystem if it exists or download from network diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/preference/PreferencesHelper.java b/app/src/main/java/eu/kanade/tachiyomi/data/preference/PreferencesHelper.java index 6963fc2a36..af8001c573 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/preference/PreferencesHelper.java +++ b/app/src/main/java/eu/kanade/tachiyomi/data/preference/PreferencesHelper.java @@ -159,8 +159,8 @@ public class PreferencesHelper { prefs.edit().putString(getKey(R.string.pref_download_directory_key), path).apply(); } - public int downloadThreads() { - return prefs.getInt(getKey(R.string.pref_download_slots_key), 1); + public Preference downloadThreads() { + return rxPrefs.getInteger(getKey(R.string.pref_download_slots_key), 1); } public boolean downloadOnlyOverWifi() {