From 0ac38297f4bdeb83b2d9c8919b89520e9722f35e Mon Sep 17 00:00:00 2001 From: Two-Ai <81279822+Two-Ai@users.noreply.github.com> Date: Tue, 30 May 2023 10:25:20 -0400 Subject: [PATCH] Replace RxJava in extension installer (#9556) * Replace RxJava in extension installer Replace common downloadsRelay with Map of individual StateFlows * Drop RxRelay dependency * Simplify updateAllExtensions * Simplify addDownloadState/removeDownloadState Use immutable Map functions instead of converting to MutableMap --- .../tachiyomi/extension/ExtensionManager.kt | 9 +- .../extension/util/ExtensionInstaller.kt | 92 +++++++++++-------- .../browse/extension/ExtensionsScreenModel.kt | 53 ++++------- gradle/libs.versions.toml | 3 +- 4 files changed, 80 insertions(+), 77 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt b/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt index 2db34368d9..fd78e4569b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt @@ -14,10 +14,11 @@ import eu.kanade.tachiyomi.extension.util.ExtensionLoader import eu.kanade.tachiyomi.util.preference.plusAssign import eu.kanade.tachiyomi.util.system.toast import kotlinx.coroutines.async +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.emptyFlow import logcat.LogPriority -import rx.Observable import tachiyomi.core.util.lang.launchNow import tachiyomi.core.util.lang.withUIContext import tachiyomi.core.util.system.logcat @@ -200,7 +201,7 @@ class ExtensionManager( * * @param extension The extension to be installed. */ - fun installExtension(extension: Extension.Available): Observable { + fun installExtension(extension: Extension.Available): Flow { return installer.downloadAndInstall(api.getApkUrl(extension), extension) } @@ -211,9 +212,9 @@ class ExtensionManager( * * @param extension The extension to be updated. */ - fun updateExtension(extension: Extension.Installed): Observable { + fun updateExtension(extension: Extension.Installed): Flow { val availableExt = _availableExtensionsFlow.value.find { it.pkgName == extension.pkgName } - ?: return Observable.empty() + ?: return emptyFlow() return installExtension(availableExt) } diff --git a/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt b/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt index e6b992f8ee..29c910a208 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt @@ -10,20 +10,27 @@ import android.os.Environment import androidx.core.content.ContextCompat import androidx.core.content.getSystemService import androidx.core.net.toUri -import com.jakewharton.rxrelay.PublishRelay import eu.kanade.domain.base.BasePreferences import eu.kanade.tachiyomi.extension.installer.Installer import eu.kanade.tachiyomi.extension.model.Extension import eu.kanade.tachiyomi.extension.model.InstallStep import eu.kanade.tachiyomi.util.storage.getUriCompat +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.transformWhile import logcat.LogPriority -import rx.Observable -import rx.android.schedulers.AndroidSchedulers +import tachiyomi.core.util.lang.withUIContext import tachiyomi.core.util.system.logcat import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.io.File -import java.util.concurrent.TimeUnit +import kotlin.time.Duration.Companion.seconds /** * The installer which installs, updates and uninstalls the extensions. @@ -48,10 +55,7 @@ internal class ExtensionInstaller(private val context: Context) { */ private val activeDownloads = hashMapOf() - /** - * Relay used to notify the installation step of every download. - */ - private val downloadsRelay = PublishRelay.create>() + private val downloadsStateFlows = hashMapOf>() private val extensionInstaller = Injekt.get().extensionInstaller() @@ -62,7 +66,7 @@ internal class ExtensionInstaller(private val context: Context) { * @param url The url of the apk. * @param extension The extension to install. */ - fun downloadAndInstall(url: String, extension: Extension) = Observable.defer { + fun downloadAndInstall(url: String, extension: Extension): Flow { val pkgName = extension.pkgName val oldDownload = activeDownloads[pkgName] @@ -83,48 +87,59 @@ internal class ExtensionInstaller(private val context: Context) { val id = downloadManager.enqueue(request) activeDownloads[pkgName] = id - downloadsRelay.filter { it.first == id } - .map { it.second } - // Poll download status - .mergeWith(pollStatus(id)) + val downloadStateFlow = MutableStateFlow(InstallStep.Pending) + downloadsStateFlows[id] = downloadStateFlow + + // Poll download status + val pollStatusFlow = downloadStatusFlow(id).mapNotNull { downloadStatus -> + // Map to our model + when (downloadStatus) { + DownloadManager.STATUS_PENDING -> InstallStep.Pending + DownloadManager.STATUS_RUNNING -> InstallStep.Downloading + else -> null + } + } + + return merge(downloadStateFlow, pollStatusFlow).transformWhile { + emit(it) // Stop when the application is installed or errors - .takeUntil { it.isCompleted() } + !it.isCompleted() + }.onCompletion { // Always notify on main thread - .observeOn(AndroidSchedulers.mainThread()) - // Always remove the download when unsubscribed - .doOnUnsubscribe { deleteDownload(pkgName) } + withUIContext { + // Always remove the download when unsubscribed + deleteDownload(pkgName) + } + } } /** - * Returns an observable that polls the given download id for its status every second, as the + * Returns a flow that polls the given download id for its status every second, as the * manager doesn't have any notification system. It'll stop once the download finishes. * * @param id The id of the download to poll. */ - private fun pollStatus(id: Long): Observable { + private fun downloadStatusFlow(id: Long): Flow = flow { val query = DownloadManager.Query().setFilterById(id) - - return Observable.interval(0, 1, TimeUnit.SECONDS) + while (true) { // Get the current download status - .map { - downloadManager.query(query).use { cursor -> - cursor.moveToFirst() - cursor.getInt(cursor.getColumnIndexOrThrow(DownloadManager.COLUMN_STATUS)) - } + val downloadStatus = downloadManager.query(query).use { cursor -> + if (!cursor.moveToFirst()) return@flow + cursor.getInt(cursor.getColumnIndexOrThrow(DownloadManager.COLUMN_STATUS)) } - // Ignore duplicate results - .distinctUntilChanged() + + emit(downloadStatus) + // Stop polling when the download fails or finishes - .takeUntil { it == DownloadManager.STATUS_SUCCESSFUL || it == DownloadManager.STATUS_FAILED } - // Map to our model - .flatMap { status -> - when (status) { - DownloadManager.STATUS_PENDING -> Observable.just(InstallStep.Pending) - DownloadManager.STATUS_RUNNING -> Observable.just(InstallStep.Downloading) - else -> Observable.empty() - } + if (downloadStatus == DownloadManager.STATUS_SUCCESSFUL || downloadStatus == DownloadManager.STATUS_FAILED) { + return@flow } + + delay(1.seconds) + } } + // Ignore duplicate results + .distinctUntilChanged() /** * Starts an intent to install the extension at the given uri. @@ -176,7 +191,7 @@ internal class ExtensionInstaller(private val context: Context) { * @param step New install step. */ fun updateInstallStep(downloadId: Long, step: InstallStep) { - downloadsRelay.call(downloadId to step) + downloadsStateFlows[downloadId]?.let { it.value = step } } /** @@ -188,6 +203,7 @@ internal class ExtensionInstaller(private val context: Context) { val downloadId = activeDownloads.remove(pkgName) if (downloadId != null) { downloadManager.remove(downloadId) + downloadsStateFlows.remove(downloadId) } if (activeDownloads.isEmpty()) { downloadReceiver.unregister() @@ -240,7 +256,7 @@ internal class ExtensionInstaller(private val context: Context) { // Set next installation step if (uri == null) { logcat(LogPriority.ERROR) { "Couldn't locate downloaded APK" } - downloadsRelay.call(id to InstallStep.Error) + updateInstallStep(id, InstallStep.Error) return } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/extension/ExtensionsScreenModel.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/extension/ExtensionsScreenModel.kt index 10677639fd..6f8c0579f2 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/browse/extension/ExtensionsScreenModel.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/browse/extension/ExtensionsScreenModel.kt @@ -14,16 +14,18 @@ import eu.kanade.tachiyomi.extension.model.InstallStep import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.util.system.LocaleHelper import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.update -import rx.Observable import tachiyomi.core.util.lang.launchIO import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get @@ -130,28 +132,24 @@ class ExtensionsScreenModel( fun updateAllExtensions() { coroutineScope.launchIO { - with(state.value) { - if (isEmpty) return@launchIO - items.values - .flatten() - .mapNotNull { - when { - it.extension !is Extension.Installed -> null - !it.extension.hasUpdate -> null - else -> it.extension - } - } - .forEach(::updateExtension) - } + state.value.items.values.flatten() + .map { it.extension } + .filterIsInstance() + .filter { it.hasUpdate } + .forEach(::updateExtension) } } fun installExtension(extension: Extension.Available) { - extensionManager.installExtension(extension).subscribeToInstallUpdate(extension) + coroutineScope.launchIO { + extensionManager.installExtension(extension).collectToInstallUpdate(extension) + } } fun updateExtension(extension: Extension.Installed) { - extensionManager.updateExtension(extension).subscribeToInstallUpdate(extension) + coroutineScope.launchIO { + extensionManager.updateExtension(extension).collectToInstallUpdate(extension) + } } fun cancelInstallUpdateExtension(extension: Extension) { @@ -159,29 +157,18 @@ class ExtensionsScreenModel( } private fun removeDownloadState(extension: Extension) { - _currentDownloads.update { _map -> - val map = _map.toMutableMap() - map.remove(extension.pkgName) - map - } + _currentDownloads.update { it - extension.pkgName } } private fun addDownloadState(extension: Extension, installStep: InstallStep) { - _currentDownloads.update { _map -> - val map = _map.toMutableMap() - map[extension.pkgName] = installStep - map - } + _currentDownloads.update { it + Pair(extension.pkgName, installStep) } } - private fun Observable.subscribeToInstallUpdate(extension: Extension) { + private suspend fun Flow.collectToInstallUpdate(extension: Extension) = this - .doOnUnsubscribe { removeDownloadState(extension) } - .subscribe( - { installStep -> addDownloadState(extension, installStep) }, - { removeDownloadState(extension) }, - ) - } + .onEach { installStep -> addDownloadState(extension, installStep) } + .onCompletion { removeDownloadState(extension) } + .collect() fun uninstallExtension(pkgName: String) { extensionManager.uninstallExtension(pkgName) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f3100131ef..703b3b8fd3 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,7 +16,6 @@ google-services-gradle = "com.google.gms:google-services:4.3.15" rxandroid = "io.reactivex:rxandroid:1.2.1" rxjava = "io.reactivex:rxjava:1.3.8" -rxrelay = "com.jakewharton.rxrelay:rxrelay:1.2.0" flowreactivenetwork = "ru.beryukhov:flowreactivenetwork:1.0.4" okhttp-core = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp_version" } @@ -92,7 +91,7 @@ voyager-transitions = { module = "cafe.adriel.voyager:voyager-transitions", vers kotlinter = "org.jmailen.gradle:kotlinter-gradle:3.13.0" [bundles] -reactivex = ["rxandroid", "rxjava", "rxrelay"] +reactivex = ["rxandroid", "rxjava"] okhttp = ["okhttp-core", "okhttp-logging", "okhttp-dnsoverhttps"] js-engine = ["quickjs-android"] sqlite = ["sqlite-framework", "sqlite-ktx", "sqlite-android"]