diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.kt index 6f4a368b91..56406ad424 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadManager.kt @@ -4,10 +4,17 @@ import android.content.Context import eu.kanade.domain.download.service.DownloadPreferences import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.download.model.Download -import eu.kanade.tachiyomi.data.download.model.DownloadQueue import eu.kanade.tachiyomi.source.Source import eu.kanade.tachiyomi.source.SourceManager import eu.kanade.tachiyomi.source.model.Page +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.runBlocking import logcat.LogPriority import tachiyomi.core.util.lang.launchIO @@ -42,11 +49,8 @@ class DownloadManager( */ private val pendingDeleter = DownloadPendingDeleter(context) - /** - * Downloads queue, where the pending chapters are stored. - */ - val queue: DownloadQueue - get() = downloader.queue + val queueState + get() = downloader.queueState // For use by DownloadService only fun downloaderStart() = downloader.start() @@ -85,7 +89,7 @@ class DownloadManager( * @param chapterId the chapter to check. */ fun getQueuedDownloadOrNull(chapterId: Long): Download? { - return queue.find { it.chapter.id == chapterId } + return queueState.value.find { it: Download -> it.chapter.id == chapterId } } fun startDownloadNow(chapterId: Long?) { @@ -93,7 +97,7 @@ class DownloadManager( val download = getQueuedDownloadOrNull(chapterId) // If not in queue try to start a new download val toAdd = download ?: runBlocking { Download.fromChapterId(chapterId) } ?: return - val queue = queue.toMutableList() + val queue = queueState.value.toMutableList() download?.let { queue.remove(it) } queue.add(0, toAdd) reorderQueue(queue) @@ -112,21 +116,7 @@ class DownloadManager( * @param downloads value to set the download queue to */ fun reorderQueue(downloads: List) { - val wasRunning = downloader.isRunning - - if (downloads.isEmpty()) { - downloader.clearQueue() - downloader.stop() - return - } - - downloader.pause() - queue.clear() - queue.addAll(downloads) - - if (wasRunning) { - downloader.start() - } + downloader.updateQueue(downloads) } /** @@ -147,7 +137,7 @@ class DownloadManager( */ fun addDownloadsToStartOfQueue(downloads: List) { if (downloads.isEmpty()) return - queue.toMutableList().apply { + queueState.value.toMutableList().apply { addAll(0, downloads) reorderQueue(this) } @@ -251,7 +241,7 @@ class DownloadManager( fun deleteManga(manga: Manga, source: Source, removeQueued: Boolean = true) { launchIO { if (removeQueued) { - queue.remove(manga) + downloader.removeFromQueue(manga) } provider.findMangaDir(manga.title, source)?.delete() cache.removeManga(manga) @@ -271,12 +261,12 @@ class DownloadManager( downloader.pause() } - queue.remove(chapters) + downloader.removeFromQueue(chapters) if (wasRunning) { - if (queue.isEmpty()) { + if (queueState.value.isEmpty()) { downloader.stop() - } else if (queue.isNotEmpty()) { + } else if (queueState.value.isNotEmpty()) { downloader.start() } } @@ -374,4 +364,33 @@ class DownloadManager( chapters } } + + fun statusFlow(): Flow = queueState + .flatMapLatest { downloads -> + downloads + .map { download -> + download.statusFlow.drop(1).map { download } + } + .merge() + } + .onStart { + emitAll( + queueState.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow(), + ) + } + + fun progressFlow(): Flow = queueState + .flatMapLatest { downloads -> + downloads + .map { download -> + download.progressFlow.drop(1).map { download } + } + .merge() + } + .onStart { + emitAll( + queueState.value.filter { download -> download.status == Download.State.DOWNLOADING } + .asFlow(), + ) + } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt index f62c38e552..da5e561494 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt @@ -11,7 +11,6 @@ import eu.kanade.domain.manga.model.getComicInfo import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.cache.ChapterCache import eu.kanade.tachiyomi.data.download.model.Download -import eu.kanade.tachiyomi.data.download.model.DownloadQueue import eu.kanade.tachiyomi.data.library.LibraryUpdateNotifier import eu.kanade.tachiyomi.data.notification.NotificationHandler import eu.kanade.tachiyomi.source.SourceManager @@ -25,12 +24,15 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.retryWhen +import kotlinx.coroutines.flow.update import kotlinx.coroutines.runBlocking import logcat.LogPriority import nl.adaptivity.xmlutil.serialization.XML @@ -59,7 +61,7 @@ import java.util.zip.ZipOutputStream /** * This class is the one in charge of downloading chapters. * - * Its [queue] contains the list of chapters to download. In order to download them, the downloader + * Its queue contains the list of chapters to download. In order to download them, the downloader * subscription must be running and the list of chapters must be sent to them by [downloadsRelay]. * * The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected @@ -88,7 +90,8 @@ class Downloader( /** * Queue where active downloads are kept. */ - val queue = DownloadQueue(store) + val _queueState = MutableStateFlow>(emptyList()) + val queueState = _queueState.asStateFlow() /** * Notifier for the downloader state and progress. @@ -120,7 +123,7 @@ class Downloader( init { launchNow { val chapters = async { store.restore() } - queue.addAll(chapters.await()) + addAllToQueue(chapters.await()) } } @@ -131,13 +134,13 @@ class Downloader( * @return true if the downloader is started, false otherwise. */ fun start(): Boolean { - if (subscription != null || queue.isEmpty()) { + if (subscription != null || queueState.value.isEmpty()) { return false } initializeSubscription() - val pending = queue.filter { it.status != Download.State.DOWNLOADED } + val pending = queueState.value.filter { it: Download -> it.status != Download.State.DOWNLOADED } pending.forEach { if (it.status != Download.State.QUEUE) it.status = Download.State.QUEUE } isPaused = false @@ -151,7 +154,7 @@ class Downloader( */ fun stop(reason: String? = null) { destroySubscription() - queue + queueState.value .filter { it.status == Download.State.DOWNLOADING } .forEach { it.status = Download.State.ERROR } @@ -160,7 +163,7 @@ class Downloader( return } - if (isPaused && queue.isNotEmpty()) { + if (isPaused && queueState.value.isNotEmpty()) { notifier.onPaused() } else { notifier.onComplete() @@ -179,7 +182,7 @@ class Downloader( */ fun pause() { destroySubscription() - queue + queueState.value .filter { it.status == Download.State.DOWNLOADING } .forEach { it.status = Download.State.QUEUE } isPaused = true @@ -191,7 +194,7 @@ class Downloader( fun clearQueue() { destroySubscription() - queue.clear() + _clearQueue() notifier.dismissProgress() } @@ -250,7 +253,7 @@ class Downloader( } val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchIO - val wasEmpty = queue.isEmpty() + val wasEmpty = queueState.value.isEmpty() // Called in background thread, the operation can be slow with SAF. val chaptersWithoutDir = async { chapters @@ -263,12 +266,12 @@ class Downloader( // Runs in main thread (synchronization needed). val chaptersToQueue = chaptersWithoutDir.await() // Filter out those already enqueued. - .filter { chapter -> queue.none { it.chapter.id == chapter.id } } + .filter { chapter -> queueState.value.none { it: Download -> it.chapter.id == chapter.id } } // Create a download for each one. .map { Download(source, manga, it) } if (chaptersToQueue.isNotEmpty()) { - queue.addAll(chaptersToQueue) + addAllToQueue(chaptersToQueue) if (isRunning) { // Send the list of downloads to the downloader. @@ -277,8 +280,8 @@ class Downloader( // Start downloader if needed if (autoStart && wasEmpty) { - val queuedDownloads = queue.count { it.source !is UnmeteredSource } - val maxDownloadsFromSource = queue + val queuedDownloads = queueState.value.count { it: Download -> it.source !is UnmeteredSource } + val maxDownloadsFromSource = queueState.value .groupBy { it.source } .filterKeys { it !is UnmeteredSource } .maxOfOrNull { it.value.size } @@ -636,7 +639,7 @@ class Downloader( // Delete successful downloads from queue if (download.status == Download.State.DOWNLOADED) { // Remove downloaded chapter from queue - queue.remove(download) + removeFromQueue(download) } if (areAllDownloadsFinished()) { stop() @@ -647,7 +650,67 @@ class Downloader( * Returns true if all the queued downloads are in DOWNLOADED or ERROR state. */ private fun areAllDownloadsFinished(): Boolean { - return queue.none { it.status.value <= Download.State.DOWNLOADING.value } + return queueState.value.none { it: Download -> it.status.value <= Download.State.DOWNLOADING.value } + } + + fun addAllToQueue(downloads: List) { + _queueState.update { + downloads.forEach { download -> + download.status = Download.State.QUEUE + } + store.addAll(downloads) + it + downloads + } + } + + fun removeFromQueue(download: Download) { + _queueState.update { + store.remove(download) + if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { + download.status = Download.State.NOT_DOWNLOADED + } + it - download + } + } + + fun removeFromQueue(chapters: List) { + chapters.forEach { chapter -> + queueState.value.find { it.chapter.id == chapter.id }?.let { removeFromQueue(it) } + } + } + + fun removeFromQueue(manga: Manga) { + queueState.value.filter { it.manga.id == manga.id }.forEach { removeFromQueue(it) } + } + + fun _clearQueue() { + _queueState.update { + it.forEach { download -> + if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { + download.status = Download.State.NOT_DOWNLOADED + } + } + store.clear() + emptyList() + } + } + + fun updateQueue(downloads: List) { + val wasRunning = isRunning + + if (downloads.isEmpty()) { + clearQueue() + stop() + return + } + + pause() + _clearQueue() + addAllToQueue(downloads) + + if (wasRunning) { + start() + } } companion object { diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt deleted file mode 100644 index f2877fa7e6..0000000000 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ /dev/null @@ -1,99 +0,0 @@ -package eu.kanade.tachiyomi.data.download.model - -import eu.kanade.tachiyomi.data.download.DownloadStore -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.drop -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.merge -import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.flow.update -import tachiyomi.domain.chapter.model.Chapter -import tachiyomi.domain.manga.model.Manga - -class DownloadQueue( - private val store: DownloadStore, -) { - private val _state = MutableStateFlow>(emptyList()) - val state = _state.asStateFlow() - - fun addAll(downloads: List) { - _state.update { - downloads.forEach { download -> - download.status = Download.State.QUEUE - } - store.addAll(downloads) - it + downloads - } - } - - fun remove(download: Download) { - _state.update { - store.remove(download) - if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { - download.status = Download.State.NOT_DOWNLOADED - } - it - download - } - } - - fun remove(chapter: Chapter) { - _state.value.find { it.chapter.id == chapter.id }?.let { remove(it) } - } - - fun remove(chapters: List) { - chapters.forEach(::remove) - } - - fun remove(manga: Manga) { - _state.value.filter { it.manga.id == manga.id }.forEach { remove(it) } - } - - fun clear() { - _state.update { - it.forEach { download -> - if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) { - download.status = Download.State.NOT_DOWNLOADED - } - } - store.clear() - emptyList() - } - } - - fun statusFlow(): Flow = state - .flatMapLatest { downloads -> - downloads - .map { download -> - download.statusFlow.drop(1).map { download } - } - .merge() - } - .onStart { emitAll(getActiveDownloads()) } - - fun progressFlow(): Flow = state - .flatMapLatest { downloads -> - downloads - .map { download -> - download.progressFlow.drop(1).map { download } - } - .merge() - } - .onStart { emitAll(getActiveDownloads()) } - - private fun getActiveDownloads(): Flow = - _state.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow() - - fun count(predicate: (Download) -> Boolean) = _state.value.count(predicate) - fun filter(predicate: (Download) -> Boolean) = _state.value.filter(predicate) - fun find(predicate: (Download) -> Boolean) = _state.value.find(predicate) - fun groupBy(keySelector: (Download) -> K) = _state.value.groupBy(keySelector) - fun isEmpty() = _state.value.isEmpty() - fun isNotEmpty() = _state.value.isNotEmpty() - fun none(predicate: (Download) -> Boolean) = _state.value.none(predicate) - fun toMutableList() = _state.value.toMutableList() -} diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt index db0d69fdfa..711985887b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt @@ -111,7 +111,7 @@ class DownloadQueueScreenModel( init { coroutineScope.launch { - downloadManager.queue.state + downloadManager.queueState .map { downloads -> downloads .groupBy { it.source } @@ -136,8 +136,8 @@ class DownloadQueueScreenModel( val isDownloaderRunning get() = downloadManager.isDownloaderRunning - fun getDownloadStatusFlow() = downloadManager.queue.statusFlow() - fun getDownloadProgressFlow() = downloadManager.queue.progressFlow() + fun getDownloadStatusFlow() = downloadManager.statusFlow() + fun getDownloadProgressFlow() = downloadManager.progressFlow() fun startDownloads() { downloadManager.startDownloads() diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaScreenModel.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaScreenModel.kt index 1b3469be6a..f5dd90ec62 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaScreenModel.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaScreenModel.kt @@ -427,7 +427,7 @@ class MangaInfoScreenModel( private fun observeDownloads() { coroutineScope.launchIO { - downloadManager.queue.statusFlow() + downloadManager.statusFlow() .filter { it.manga.id == successState?.manga?.id } .catch { error -> logcat(LogPriority.ERROR, error) } .collect { @@ -438,7 +438,7 @@ class MangaInfoScreenModel( } coroutineScope.launchIO { - downloadManager.queue.progressFlow() + downloadManager.progressFlow() .filter { it.manga.id == successState?.manga?.id } .catch { error -> logcat(LogPriority.ERROR, error) } .collect { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt index d91d49da34..1e2ede2730 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MoreTab.kt @@ -94,7 +94,7 @@ private class MoreScreenModel( coroutineScope.launchIO { combine( downloadManager.isDownloaderRunning, - downloadManager.queue.state, + downloadManager.queueState, ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) } .collectLatest { (isDownloading, downloadQueueSize) -> val pendingDownloadExists = downloadQueueSize != 0 diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/updates/UpdatesScreenModel.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/updates/UpdatesScreenModel.kt index 7a44527c30..81356e38c9 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/updates/UpdatesScreenModel.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/updates/UpdatesScreenModel.kt @@ -99,7 +99,7 @@ class UpdatesScreenModel( } coroutineScope.launchIO { - merge(downloadManager.queue.statusFlow(), downloadManager.queue.progressFlow()) + merge(downloadManager.statusFlow(), downloadManager.progressFlow()) .catch { logcat(LogPriority.ERROR, it) } .collect(this@UpdatesScreenModel::updateDownloadState) }