diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt index 2bb4bd9876..089d1e4e28 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/DownloadCache.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.withTimeout import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit /** @@ -305,9 +306,9 @@ class DownloadCache( * Returns a new map containing only the key entries of [transform] that are not null. */ private inline fun Map.mapNotNullKeys(transform: (Map.Entry) -> R?): MutableMap { - val destination = LinkedHashMap() - forEach { element -> transform(element)?.let { destination[it] = element.value } } - return destination + val mutableMap = ConcurrentHashMap() + forEach { element -> transform(element)?.let { mutableMap[it] = element.value } } + return mutableMap } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt index 96ce161538..760111e9c5 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/DownloadQueue.kt @@ -78,13 +78,13 @@ class DownloadQueue( .startWith(getActiveDownloads()) .onBackpressureBuffer() - fun getStatusAsFlow(): Flow = getStatusObservable().asFlow() + fun statusFlow(): Flow = getStatusObservable().asFlow() private fun getUpdatedObservable(): Observable> = updatedRelay.onBackpressureBuffer() .startWith(Unit) .map { this } - fun getUpdatedAsFlow(): Flow> = getUpdatedObservable().asFlow() + fun updatedFlow(): Flow> = getUpdatedObservable().asFlow() private fun setPagesFor(download: Download) { if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) { @@ -111,7 +111,7 @@ class DownloadQueue( .filter { it.status == Download.State.DOWNLOADING } } - fun getProgressAsFlow(): Flow = getProgressObservable().asFlow() + fun progressFlow(): Flow = getProgressObservable().asFlow() private fun setPagesSubject(pages: List?, subject: PublishSubject?) { pages?.forEach { it.setStatusSubject(subject) } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt index f51aa9d5b3..506d7ee24e 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/source/SourcesPresenter.kt @@ -16,6 +16,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.receiveAsFlow import logcat.LogPriority import uy.kohesive.injekt.Injekt @@ -38,6 +39,7 @@ class SourcesPresenter( fun onCreate() { presenterScope.launchIO { getEnabledSources.subscribe() + .debounce(500) // Avoid crashes due to LazyColumn rendering .catch { exception -> logcat(LogPriority.ERROR, exception) _events.send(Event.FailedFetchingSources) diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt index 300e884dc3..458c666c81 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadPresenter.kt @@ -34,7 +34,7 @@ class DownloadPresenter : BasePresenter() { super.onCreate(savedState) presenterScope.launch { - downloadQueue.getUpdatedAsFlow() + downloadQueue.updatedFlow() .catch { error -> logcat(LogPriority.ERROR, error) } .map { downloads -> downloads @@ -49,9 +49,9 @@ class DownloadPresenter : BasePresenter() { } } - fun getDownloadStatusFlow() = downloadQueue.getStatusAsFlow() + fun getDownloadStatusFlow() = downloadQueue.statusFlow() - fun getDownloadProgressFlow() = downloadQueue.getProgressAsFlow() + fun getDownloadProgressFlow() = downloadQueue.progressFlow() /** * Pauses the download queue. diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt index 0ec7866cc4..8fca6c1dad 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt @@ -34,6 +34,7 @@ import eu.kanade.domain.track.model.toDomainTrack import eu.kanade.domain.ui.UiPreferences import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.data.database.models.Track +import eu.kanade.tachiyomi.data.download.DownloadCache import eu.kanade.tachiyomi.data.download.DownloadManager import eu.kanade.tachiyomi.data.download.model.Download import eu.kanade.tachiyomi.data.track.EnhancedTrackService @@ -63,6 +64,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn @@ -91,6 +93,7 @@ class MangaPresenter( private val trackManager: TrackManager = Injekt.get(), private val sourceManager: SourceManager = Injekt.get(), private val downloadManager: DownloadManager = Injekt.get(), + private val downloadCache: DownloadCache = Injekt.get(), private val getMangaAndChapters: GetMangaWithChapters = Injekt.get(), private val getDuplicateLibraryManga: GetDuplicateLibraryManga = Injekt.get(), private val setMangaChapterFlags: SetMangaChapterFlags = Injekt.get(), @@ -113,9 +116,6 @@ class MangaPresenter( private val successState: MangaScreenState.Success? get() = state.value as? MangaScreenState.Success - private var observeDownloadsStatusJob: Job? = null - private var observeDownloadsPageJob: Job? = null - private var _trackList: List = emptyList() val trackList get() = _trackList @@ -169,10 +169,11 @@ class MangaPresenter( ) } - // For UI changes - presenterScope.launch { - getMangaAndChapters.subscribe(mangaId) - .distinctUntilChanged() + presenterScope.launchIO { + combine( + getMangaAndChapters.subscribe(mangaId).distinctUntilChanged(), + downloadCache.changes, + ) { mangaAndChapters, _ -> mangaAndChapters } .collectLatest { (manga, chapters) -> val chapterItems = chapters.toChapterItemsParams(manga) updateSuccessState { @@ -181,20 +182,11 @@ class MangaPresenter( chapters = chapterItems, ) } - - observeDownloads() } } - basePreferences.incognitoMode() - .asHotFlow { incognitoMode = it } - .launchIn(presenterScope) + observeDownloads() - basePreferences.downloadedOnly() - .asHotFlow { downloadedOnlyMode = it } - .launchIn(presenterScope) - - // This block runs once on create presenterScope.launchIO { val manga = getMangaAndChapters.awaitManga(mangaId) val chapters = getMangaAndChapters.awaitChapters(mangaId) @@ -207,7 +199,7 @@ class MangaPresenter( val needRefreshInfo = !manga.initialized val needRefreshChapter = chapters.isEmpty() - // Show what we have earlier. + // Show what we have earlier _state.update { MangaScreenState.Success( manga = manga, @@ -238,6 +230,14 @@ class MangaPresenter( // Initial loading finished updateSuccessState { it.copy(isRefreshingData = false) } } + + basePreferences.incognitoMode() + .asHotFlow { incognitoMode = it } + .launchIn(presenterScope) + + basePreferences.downloadedOnly() + .asHotFlow { downloadedOnlyMode = it } + .launchIn(presenterScope) } fun fetchAllFromSource(manualFetch: Boolean = true) { @@ -467,9 +467,8 @@ class MangaPresenter( // Chapters list - start private fun observeDownloads() { - observeDownloadsStatusJob?.cancel() - observeDownloadsStatusJob = presenterScope.launchIO { - downloadManager.queue.getStatusAsFlow() + presenterScope.launchIO { + downloadManager.queue.statusFlow() .filter { it.manga.id == successState?.manga?.id } .catch { error -> logcat(LogPriority.ERROR, error) } .collect { @@ -479,9 +478,8 @@ class MangaPresenter( } } - observeDownloadsPageJob?.cancel() - observeDownloadsPageJob = presenterScope.launchIO { - downloadManager.queue.getProgressAsFlow() + presenterScope.launchIO { + downloadManager.queue.progressFlow() .filter { it.manga.id == successState?.manga?.id } .catch { error -> logcat(LogPriority.ERROR, error) } .collect { diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt index a40b5e9e78..01291be914 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/more/MorePresenter.kt @@ -32,7 +32,7 @@ class MorePresenter( presenterScope.launchIO { combine( DownloadService.isRunning, - downloadManager.queue.getUpdatedAsFlow(), + downloadManager.queue.updatedFlow(), ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) } .collectLatest { (isDownloading, downloadQueueSize) -> val pendingDownloadExists = downloadQueueSize != 0 diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt index 6c2d39a8a6..ecbf448f6b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/recent/updates/UpdatesPresenter.kt @@ -18,6 +18,7 @@ import eu.kanade.domain.updates.model.UpdatesWithRelations import eu.kanade.presentation.components.ChapterDownloadAction import eu.kanade.presentation.updates.UpdatesState import eu.kanade.presentation.updates.UpdatesStateImpl +import eu.kanade.tachiyomi.data.download.DownloadCache import eu.kanade.tachiyomi.data.download.DownloadManager import eu.kanade.tachiyomi.data.download.DownloadService import eu.kanade.tachiyomi.data.download.model.Download @@ -27,11 +28,12 @@ import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.launchNonCancellable import eu.kanade.tachiyomi.util.lang.withUIContext import eu.kanade.tachiyomi.util.system.logcat -import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch @@ -50,6 +52,7 @@ class UpdatesPresenter( private val getManga: GetManga = Injekt.get(), private val sourceManager: SourceManager = Injekt.get(), private val downloadManager: DownloadManager = Injekt.get(), + private val downloadCache: DownloadCache = Injekt.get(), private val getChapter: GetChapter = Injekt.get(), basePreferences: BasePreferences = Injekt.get(), uiPreferences: UiPreferences = Injekt.get(), @@ -70,12 +73,6 @@ class UpdatesPresenter( // First and last selected index in list private val selectedPositions: Array = arrayOf(-1, -1) - /** - * Subscription to observe download status changes. - */ - private var observeDownloadsStatusJob: Job? = null - private var observeDownloadsPageJob: Job? = null - override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) @@ -86,10 +83,11 @@ class UpdatesPresenter( add(Calendar.MONTH, -3) } - observeDownloads() - - getUpdates.subscribe(calendar) - .distinctUntilChanged() + combine( + getUpdates.subscribe(calendar).distinctUntilChanged(), + downloadCache.changes, + ) { updates, _ -> updates } + .debounce(500) // Avoid crashes due to LazyColumn rendering .catch { logcat(LogPriority.ERROR, it) _events.send(Event.InternalError) @@ -99,6 +97,26 @@ class UpdatesPresenter( state.isLoading = false } } + + presenterScope.launchIO { + downloadManager.queue.statusFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collect { + withUIContext { + updateDownloadState(it) + } + } + } + + presenterScope.launchIO { + downloadManager.queue.progressFlow() + .catch { error -> logcat(LogPriority.ERROR, error) } + .collect { + withUIContext { + updateDownloadState(it) + } + } + } } private fun List.toUpdateItems(): List { @@ -125,30 +143,6 @@ class UpdatesPresenter( } } - private suspend fun observeDownloads() { - observeDownloadsStatusJob?.cancel() - observeDownloadsStatusJob = presenterScope.launchIO { - downloadManager.queue.getStatusAsFlow() - .catch { error -> logcat(LogPriority.ERROR, error) } - .collect { - withUIContext { - updateDownloadState(it) - } - } - } - - observeDownloadsPageJob?.cancel() - observeDownloadsPageJob = presenterScope.launchIO { - downloadManager.queue.getProgressAsFlow() - .catch { error -> logcat(LogPriority.ERROR, error) } - .collect { - withUIContext { - updateDownloadState(it) - } - } - } - } - /** * Update status of chapters. *