Convert extension install process from observable to flow

This commit is contained in:
Jays2Kings 2021-07-16 17:38:40 -04:00
parent d8a72f78d5
commit 2ea3142b32
3 changed files with 126 additions and 80 deletions

View File

@ -17,6 +17,8 @@ import eu.kanade.tachiyomi.ui.extension.ExtensionIntallInfo
import eu.kanade.tachiyomi.util.system.launchNow
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.withContext
import rx.Observable
import uy.kohesive.injekt.Injekt
@ -232,26 +234,26 @@ class ExtensionManager(
}
/**
* Returns an observable of the installation process for the given extension. It will complete
* once the extension is installed or throws an error. The process will be canceled if
* unsubscribed before its completion.
* Returns a flow of the installation process for the given extension. It will complete
* once the extension is installed or throws an error. The process will be canceled the scope
* is canceled before its completion.
*
* @param extension The extension to be installed.
*/
fun installExtension(extension: Extension.Available): Observable<ExtensionIntallInfo> {
fun installExtension(extension: Extension.Available): Flow<ExtensionIntallInfo> {
return installer.downloadAndInstall(api.getApkUrl(extension), extension)
}
/**
* Returns an observable of the installation process for the given extension. It will complete
* once the extension is updated or throws an error. The process will be canceled if
* unsubscribed before its completion.
* Returns a flow of the installation process for the given extension. It will complete
* once the extension is updated or throws an error. The process will be canceled the scope
* is canceled before its completion.
*
* @param extension The extension to be updated.
*/
fun updateExtension(extension: Extension.Installed): Observable<ExtensionIntallInfo> {
fun updateExtension(extension: Extension.Installed): Flow<ExtensionIntallInfo> {
val availableExt = availableExtensions.find { it.pkgName == extension.pkgName }
?: return Observable.empty()
?: return emptyFlow()
return installExtension(availableExt)
}

View File

@ -9,16 +9,28 @@ import android.content.pm.PackageInstaller
import android.net.Uri
import android.os.Environment
import androidx.core.net.toUri
import com.jakewharton.rxrelay.PublishRelay
import eu.kanade.tachiyomi.extension.model.Extension
import eu.kanade.tachiyomi.extension.model.InstallStep
import eu.kanade.tachiyomi.ui.extension.ExtensionIntallInfo
import eu.kanade.tachiyomi.util.storage.getUriCompat
import rx.Observable
import rx.android.schedulers.AndroidSchedulers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.transformWhile
import timber.log.Timber
import java.io.File
import java.util.concurrent.TimeUnit
/**
* The installer which installs, updates and uninstalls the extensions.
@ -30,7 +42,8 @@ internal class ExtensionInstaller(private val context: Context) {
/**
* The system's download manager
*/
private val downloadManager = context.getSystemService(Context.DOWNLOAD_SERVICE) as DownloadManager
private val downloadManager =
context.getSystemService(Context.DOWNLOAD_SERVICE) as DownloadManager
/**
* The broadcast receiver which listens to download completion events.
@ -44,27 +57,21 @@ internal class ExtensionInstaller(private val context: Context) {
private val activeDownloads = hashMapOf<String, Long>()
/**
* Relay used to notify the installation step of every download.
* StateFlow used to notify the installation step of every download.
*/
private val downloadsRelay = PublishRelay.create<Pair<Long, InstallStep>>()
private val downloadsStateFlow = MutableStateFlow(0L to InstallStep.Pending)
/** Map of download id to installer session id */
val downloadInstallerMap = hashMapOf<Long, Int>()
data class DownloadSessionInfo(
val downloadId: Long,
val session: PackageInstaller.Session,
val sessionId: Int
)
/**
* Adds the given extension to the downloads queue and returns an observable containing its
* Adds the given extension to the downloads queue and returns a flow containing its
* step in the installation process.
*
* @param url The url of the apk.
* @param extension The extension to install.
*/
fun downloadAndInstall(url: String, extension: Extension) = Observable.defer {
fun downloadAndInstall(url: String, extension: Extension): Flow<ExtensionIntallInfo> {
val pkgName = extension.pkgName
val oldDownload = activeDownloads[pkgName]
@ -79,75 +86,96 @@ internal class ExtensionInstaller(private val context: Context) {
val request = DownloadManager.Request(downloadUri)
.setTitle(extension.name)
.setMimeType(APK_MIME)
.setDestinationInExternalFilesDir(context, Environment.DIRECTORY_DOWNLOADS, downloadUri.lastPathSegment)
.setDestinationInExternalFilesDir(
context,
Environment.DIRECTORY_DOWNLOADS,
downloadUri.lastPathSegment
)
.setNotificationVisibility(DownloadManager.Request.VISIBILITY_VISIBLE_NOTIFY_COMPLETED)
val id = downloadManager.enqueue(request)
activeDownloads[pkgName] = id
downloadsRelay.filter { it.first == id }
.map {
val sessionId = downloadInstallerMap[it.first] ?: return@map it.second to null
val session = context.packageManager.packageInstaller.getSessionInfo(sessionId)
it.second to session
return flowOf(
pollStatus(id),
pollInstallStatus(id),
downloadsStateFlow.filter { it.first == id }
.map {
it.second to findSession(it.first)
}
).flattenMerge()
.transformWhile {
emit(it)
!it.first.isCompleted()
}
// Poll download status
.mergeWith(pollStatus(id))
// Poll installation status
.mergeWith(pollInstallStatus(id))
// Force an error if the download takes more than 3 minutes
.mergeWith(Observable.timer(3, TimeUnit.MINUTES).map { InstallStep.Error to null })
// Stop when the application is installed or errors
.takeUntil { it.first.isCompleted() }
// Always notify on main thread
.observeOn(AndroidSchedulers.mainThread())
// Always remove the download when unsubscribed
.doOnUnsubscribe { deleteDownload(pkgName) }
.flowOn(Dispatchers.IO)
.catch { e ->
Timber.e(e)
emit(InstallStep.Error to null)
}
.onCompletion {
deleteDownload(pkgName)
}
}
private fun findSession(downloadId: Long): PackageInstaller.SessionInfo? {
val sessionId = downloadInstallerMap[downloadId] ?: return null
return context.packageManager.packageInstaller.getSessionInfo(sessionId)
}
/**
* Returns an observable that polls the given download id for its status every second, as the
* Returns a flow that polls the given download id for its status every second, as the
* manager doesn't have any notification system. It'll stop once the download finishes.
*
* @param id The id of the download to poll.
*/
private fun pollStatus(id: Long): Observable<ExtensionIntallInfo> {
private fun pollStatus(id: Long): Flow<ExtensionIntallInfo> {
val query = DownloadManager.Query().setFilterById(id)
return Observable.interval(0, 1, TimeUnit.SECONDS)
// Get the current download status
.map {
downloadManager.query(query).use { cursor ->
return flow {
while (true) {
val newDownloadState = downloadManager.query(query).use { cursor ->
cursor.moveToFirst()
cursor.getInt(cursor.getColumnIndex(DownloadManager.COLUMN_STATUS))
}
emit(newDownloadState)
delay(1000)
}
// Ignore duplicate results
}
.distinctUntilChanged()
// Stop polling when the download fails or finishes
.takeUntil { it == DownloadManager.STATUS_SUCCESSFUL || it == DownloadManager.STATUS_FAILED }
// Map to our model
.flatMap { status ->
val step = when (status) {
.transformWhile {
emit(it)
!(it == DownloadManager.STATUS_SUCCESSFUL || it == DownloadManager.STATUS_FAILED)
}
.flatMapConcat { downloadState ->
val step = when (downloadState) {
DownloadManager.STATUS_PENDING -> InstallStep.Pending
DownloadManager.STATUS_RUNNING -> InstallStep.Downloading
else -> return@flatMap Observable.empty()
else -> return@flatMapConcat emptyFlow()
}
Observable.just(ExtensionIntallInfo(step, null))
}
.doOnError {
Timber.e(it)
flowOf(ExtensionIntallInfo(step, null))
}
}
private fun pollInstallStatus(id: Long): Observable<ExtensionIntallInfo> {
return Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.flatMap {
val sessionId = downloadInstallerMap[id] ?: return@flatMap Observable.empty()
val session = context.packageManager.packageInstaller.getSessionInfo(sessionId)
Observable.just(InstallStep.Installing to session)
/**
* Returns a flow that polls the given installer session for its status every half second, as the
* manager doesn't have any notification system. This will only stop once
*
* @param id The id of the download mapped to the session to poll.
*/
private fun pollInstallStatus(id: Long): Flow<ExtensionIntallInfo> {
return flow {
while (true) {
val sessionId = downloadInstallerMap[id]
if (sessionId != null) {
val session =
context.packageManager.packageInstaller.getSessionInfo(sessionId)
emit(InstallStep.Installing to session)
}
delay(500)
}
.doOnError {
}
.catch {
Timber.e(it)
}
}
@ -185,7 +213,7 @@ internal class ExtensionInstaller(private val context: Context) {
* @param downloadId The id of the download.
*/
fun setInstalling(downloadId: Long, sessionId: Int) {
downloadsRelay.call(downloadId to InstallStep.Installing)
downloadsStateFlow.tryEmit(downloadId to InstallStep.Installing)
downloadInstallerMap[downloadId] = sessionId
}
@ -204,7 +232,7 @@ internal class ExtensionInstaller(private val context: Context) {
fun setInstallationResult(downloadId: Long, result: Boolean) {
val step = if (result) InstallStep.Installed else InstallStep.Error
downloadInstallerMap.remove(downloadId)
downloadsRelay.call(downloadId to step)
downloadsStateFlow.tryEmit(downloadId to step)
}
/**
@ -267,10 +295,10 @@ internal class ExtensionInstaller(private val context: Context) {
// Set next installation step
if (uri != null) {
downloadsRelay.call(id to InstallStep.Loading)
downloadsStateFlow.tryEmit(id to InstallStep.Loading)
} else {
Timber.e("Couldn't locate downloaded APK")
downloadsRelay.call(id to InstallStep.Error)
downloadsStateFlow.tryEmit(id to InstallStep.Error)
return
}

View File

@ -19,12 +19,17 @@ import eu.kanade.tachiyomi.ui.migration.SelectionHeader
import eu.kanade.tachiyomi.ui.migration.SourceItem
import eu.kanade.tachiyomi.util.system.LocaleHelper
import eu.kanade.tachiyomi.util.system.executeOnIO
import eu.kanade.tachiyomi.util.system.withUIContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import rx.Observable
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
@ -216,7 +221,7 @@ class ExtensionBottomPresenter(
@Synchronized
private fun updateInstallStep(
extension: Extension,
state: InstallStep,
state: InstallStep?,
session: PackageInstaller.SessionInfo?
): ExtensionItem? {
val extensions = extensions.toMutableList()
@ -243,13 +248,17 @@ class ExtensionBottomPresenter(
fun installExtension(extension: Extension.Available) {
if (isNotMIUIOptimized()) {
extensionManager.installExtension(extension).subscribeToInstallUpdate(extension)
presenterScope.launch {
extensionManager.installExtension(extension).collectForInstallUpdate(extension)
}
}
}
fun updateExtension(extension: Extension.Installed) {
if (isNotMIUIOptimized()) {
extensionManager.updateExtension(extension).subscribeToInstallUpdate(extension)
presenterScope.launch {
extensionManager.updateExtension(extension).collectForInstallUpdate(extension)
}
}
}
@ -261,13 +270,20 @@ class ExtensionBottomPresenter(
return true
}
private fun Observable<ExtensionIntallInfo>.subscribeToInstallUpdate(extension: Extension) {
this.doOnNext { currentDownloads[extension.pkgName] = it }
.doOnUnsubscribe { currentDownloads.remove(extension.pkgName) }
.map { state -> updateInstallStep(extension, state.first, state.second) }
.subscribe { item ->
private suspend fun Flow<ExtensionIntallInfo>.collectForInstallUpdate(extension: Extension) {
this
.onEach { currentDownloads[extension.pkgName] = it }
.onCompletion {
currentDownloads.remove(extension.pkgName)
val item = updateInstallStep(extension, null, null)
if (item != null) {
bottomSheet.downloadUpdate(item)
withUIContext { bottomSheet.downloadUpdate(item) }
}
}
.collect { state ->
val item = updateInstallStep(extension, state.first, state.second)
if (item != null) {
withUIContext { bottomSheet.downloadUpdate(item) }
}
}
}