diff --git a/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt b/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt index df0fc1f320..8ab5d70421 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt @@ -17,6 +17,8 @@ import eu.kanade.tachiyomi.ui.extension.ExtensionIntallInfo import eu.kanade.tachiyomi.util.system.launchNow import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.withContext import rx.Observable import uy.kohesive.injekt.Injekt @@ -232,26 +234,26 @@ class ExtensionManager( } /** - * Returns an observable of the installation process for the given extension. It will complete - * once the extension is installed or throws an error. The process will be canceled if - * unsubscribed before its completion. + * Returns a flow of the installation process for the given extension. It will complete + * once the extension is installed or throws an error. The process will be canceled the scope + * is canceled before its completion. * * @param extension The extension to be installed. */ - fun installExtension(extension: Extension.Available): Observable { + fun installExtension(extension: Extension.Available): Flow { return installer.downloadAndInstall(api.getApkUrl(extension), extension) } /** - * Returns an observable of the installation process for the given extension. It will complete - * once the extension is updated or throws an error. The process will be canceled if - * unsubscribed before its completion. + * Returns a flow of the installation process for the given extension. It will complete + * once the extension is updated or throws an error. The process will be canceled the scope + * is canceled before its completion. * * @param extension The extension to be updated. */ - fun updateExtension(extension: Extension.Installed): Observable { + fun updateExtension(extension: Extension.Installed): Flow { val availableExt = availableExtensions.find { it.pkgName == extension.pkgName } - ?: return Observable.empty() + ?: return emptyFlow() return installExtension(availableExt) } diff --git a/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt b/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt index 9b2ceda0d7..2c9e8722f8 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt @@ -9,16 +9,28 @@ import android.content.pm.PackageInstaller import android.net.Uri import android.os.Environment import androidx.core.net.toUri -import com.jakewharton.rxrelay.PublishRelay import eu.kanade.tachiyomi.extension.model.Extension import eu.kanade.tachiyomi.extension.model.InstallStep import eu.kanade.tachiyomi.ui.extension.ExtensionIntallInfo import eu.kanade.tachiyomi.util.storage.getUriCompat -import rx.Observable -import rx.android.schedulers.AndroidSchedulers +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.flattenMerge +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.transformWhile import timber.log.Timber import java.io.File -import java.util.concurrent.TimeUnit /** * The installer which installs, updates and uninstalls the extensions. @@ -30,7 +42,8 @@ internal class ExtensionInstaller(private val context: Context) { /** * The system's download manager */ - private val downloadManager = context.getSystemService(Context.DOWNLOAD_SERVICE) as DownloadManager + private val downloadManager = + context.getSystemService(Context.DOWNLOAD_SERVICE) as DownloadManager /** * The broadcast receiver which listens to download completion events. @@ -44,27 +57,21 @@ internal class ExtensionInstaller(private val context: Context) { private val activeDownloads = hashMapOf() /** - * Relay used to notify the installation step of every download. + * StateFlow used to notify the installation step of every download. */ - private val downloadsRelay = PublishRelay.create>() + private val downloadsStateFlow = MutableStateFlow(0L to InstallStep.Pending) /** Map of download id to installer session id */ val downloadInstallerMap = hashMapOf() - data class DownloadSessionInfo( - val downloadId: Long, - val session: PackageInstaller.Session, - val sessionId: Int - ) - /** - * Adds the given extension to the downloads queue and returns an observable containing its + * Adds the given extension to the downloads queue and returns a flow containing its * step in the installation process. * * @param url The url of the apk. * @param extension The extension to install. */ - fun downloadAndInstall(url: String, extension: Extension) = Observable.defer { + fun downloadAndInstall(url: String, extension: Extension): Flow { val pkgName = extension.pkgName val oldDownload = activeDownloads[pkgName] @@ -79,75 +86,96 @@ internal class ExtensionInstaller(private val context: Context) { val request = DownloadManager.Request(downloadUri) .setTitle(extension.name) .setMimeType(APK_MIME) - .setDestinationInExternalFilesDir(context, Environment.DIRECTORY_DOWNLOADS, downloadUri.lastPathSegment) + .setDestinationInExternalFilesDir( + context, + Environment.DIRECTORY_DOWNLOADS, + downloadUri.lastPathSegment + ) .setNotificationVisibility(DownloadManager.Request.VISIBILITY_VISIBLE_NOTIFY_COMPLETED) val id = downloadManager.enqueue(request) activeDownloads[pkgName] = id - downloadsRelay.filter { it.first == id } - .map { - val sessionId = downloadInstallerMap[it.first] ?: return@map it.second to null - val session = context.packageManager.packageInstaller.getSessionInfo(sessionId) - it.second to session + return flowOf( + pollStatus(id), + pollInstallStatus(id), + downloadsStateFlow.filter { it.first == id } + .map { + it.second to findSession(it.first) + } + ).flattenMerge() + .transformWhile { + emit(it) + !it.first.isCompleted() } - // Poll download status - .mergeWith(pollStatus(id)) - // Poll installation status - .mergeWith(pollInstallStatus(id)) - // Force an error if the download takes more than 3 minutes - .mergeWith(Observable.timer(3, TimeUnit.MINUTES).map { InstallStep.Error to null }) - // Stop when the application is installed or errors - .takeUntil { it.first.isCompleted() } - // Always notify on main thread - .observeOn(AndroidSchedulers.mainThread()) - // Always remove the download when unsubscribed - .doOnUnsubscribe { deleteDownload(pkgName) } + .flowOn(Dispatchers.IO) + .catch { e -> + Timber.e(e) + emit(InstallStep.Error to null) + } + .onCompletion { + deleteDownload(pkgName) + } + } + + private fun findSession(downloadId: Long): PackageInstaller.SessionInfo? { + val sessionId = downloadInstallerMap[downloadId] ?: return null + return context.packageManager.packageInstaller.getSessionInfo(sessionId) } /** - * Returns an observable that polls the given download id for its status every second, as the + * Returns a flow that polls the given download id for its status every second, as the * manager doesn't have any notification system. It'll stop once the download finishes. * * @param id The id of the download to poll. */ - private fun pollStatus(id: Long): Observable { + private fun pollStatus(id: Long): Flow { val query = DownloadManager.Query().setFilterById(id) - return Observable.interval(0, 1, TimeUnit.SECONDS) - // Get the current download status - .map { - downloadManager.query(query).use { cursor -> + return flow { + while (true) { + val newDownloadState = downloadManager.query(query).use { cursor -> cursor.moveToFirst() cursor.getInt(cursor.getColumnIndex(DownloadManager.COLUMN_STATUS)) } + emit(newDownloadState) + delay(1000) } - // Ignore duplicate results + } .distinctUntilChanged() - // Stop polling when the download fails or finishes - .takeUntil { it == DownloadManager.STATUS_SUCCESSFUL || it == DownloadManager.STATUS_FAILED } - // Map to our model - .flatMap { status -> - val step = when (status) { + .transformWhile { + emit(it) + !(it == DownloadManager.STATUS_SUCCESSFUL || it == DownloadManager.STATUS_FAILED) + } + .flatMapConcat { downloadState -> + val step = when (downloadState) { DownloadManager.STATUS_PENDING -> InstallStep.Pending DownloadManager.STATUS_RUNNING -> InstallStep.Downloading - else -> return@flatMap Observable.empty() + else -> return@flatMapConcat emptyFlow() } - Observable.just(ExtensionIntallInfo(step, null)) - } - .doOnError { - Timber.e(it) + flowOf(ExtensionIntallInfo(step, null)) } } - private fun pollInstallStatus(id: Long): Observable { - return Observable.interval(0, 500, TimeUnit.MILLISECONDS) - .flatMap { - val sessionId = downloadInstallerMap[id] ?: return@flatMap Observable.empty() - val session = context.packageManager.packageInstaller.getSessionInfo(sessionId) - Observable.just(InstallStep.Installing to session) + /** + * Returns a flow that polls the given installer session for its status every half second, as the + * manager doesn't have any notification system. This will only stop once + * + * @param id The id of the download mapped to the session to poll. + */ + private fun pollInstallStatus(id: Long): Flow { + return flow { + while (true) { + val sessionId = downloadInstallerMap[id] + if (sessionId != null) { + val session = + context.packageManager.packageInstaller.getSessionInfo(sessionId) + emit(InstallStep.Installing to session) + } + delay(500) } - .doOnError { + } + .catch { Timber.e(it) } } @@ -185,7 +213,7 @@ internal class ExtensionInstaller(private val context: Context) { * @param downloadId The id of the download. */ fun setInstalling(downloadId: Long, sessionId: Int) { - downloadsRelay.call(downloadId to InstallStep.Installing) + downloadsStateFlow.tryEmit(downloadId to InstallStep.Installing) downloadInstallerMap[downloadId] = sessionId } @@ -204,7 +232,7 @@ internal class ExtensionInstaller(private val context: Context) { fun setInstallationResult(downloadId: Long, result: Boolean) { val step = if (result) InstallStep.Installed else InstallStep.Error downloadInstallerMap.remove(downloadId) - downloadsRelay.call(downloadId to step) + downloadsStateFlow.tryEmit(downloadId to step) } /** @@ -267,10 +295,10 @@ internal class ExtensionInstaller(private val context: Context) { // Set next installation step if (uri != null) { - downloadsRelay.call(id to InstallStep.Loading) + downloadsStateFlow.tryEmit(id to InstallStep.Loading) } else { Timber.e("Couldn't locate downloaded APK") - downloadsRelay.call(id to InstallStep.Error) + downloadsStateFlow.tryEmit(id to InstallStep.Error) return } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/extension/ExtensionBottomPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/extension/ExtensionBottomPresenter.kt index 1e69843c7e..f3c8377d31 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/extension/ExtensionBottomPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/extension/ExtensionBottomPresenter.kt @@ -19,12 +19,17 @@ import eu.kanade.tachiyomi.ui.migration.SelectionHeader import eu.kanade.tachiyomi.ui.migration.SourceItem import eu.kanade.tachiyomi.util.system.LocaleHelper import eu.kanade.tachiyomi.util.system.executeOnIO +import eu.kanade.tachiyomi.util.system.withUIContext import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.withContext -import rx.Observable import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get @@ -216,7 +221,7 @@ class ExtensionBottomPresenter( @Synchronized private fun updateInstallStep( extension: Extension, - state: InstallStep, + state: InstallStep?, session: PackageInstaller.SessionInfo? ): ExtensionItem? { val extensions = extensions.toMutableList() @@ -243,13 +248,17 @@ class ExtensionBottomPresenter( fun installExtension(extension: Extension.Available) { if (isNotMIUIOptimized()) { - extensionManager.installExtension(extension).subscribeToInstallUpdate(extension) + presenterScope.launch { + extensionManager.installExtension(extension).collectForInstallUpdate(extension) + } } } fun updateExtension(extension: Extension.Installed) { if (isNotMIUIOptimized()) { - extensionManager.updateExtension(extension).subscribeToInstallUpdate(extension) + presenterScope.launch { + extensionManager.updateExtension(extension).collectForInstallUpdate(extension) + } } } @@ -261,13 +270,20 @@ class ExtensionBottomPresenter( return true } - private fun Observable.subscribeToInstallUpdate(extension: Extension) { - this.doOnNext { currentDownloads[extension.pkgName] = it } - .doOnUnsubscribe { currentDownloads.remove(extension.pkgName) } - .map { state -> updateInstallStep(extension, state.first, state.second) } - .subscribe { item -> + private suspend fun Flow.collectForInstallUpdate(extension: Extension) { + this + .onEach { currentDownloads[extension.pkgName] = it } + .onCompletion { + currentDownloads.remove(extension.pkgName) + val item = updateInstallStep(extension, null, null) if (item != null) { - bottomSheet.downloadUpdate(item) + withUIContext { bottomSheet.downloadUpdate(item) } + } + } + .collect { state -> + val item = updateInstallStep(extension, state.first, state.second) + if (item != null) { + withUIContext { bottomSheet.downloadUpdate(item) } } } }