Convert download cache/queue flows into SharedFlows

Fixes #8727
This commit is contained in:
arkon 2022-12-12 22:37:37 -05:00
parent 171db639ff
commit 9dd9e741f3
2 changed files with 11 additions and 2 deletions

View File

@ -19,10 +19,12 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeout
import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get import uy.kohesive.injekt.api.get
@ -47,7 +49,9 @@ class DownloadCache(
private val scope = CoroutineScope(Dispatchers.IO) private val scope = CoroutineScope(Dispatchers.IO)
private val _changes: Channel<Unit> = Channel(Channel.UNLIMITED) private val _changes: Channel<Unit> = Channel(Channel.UNLIMITED)
val changes = _changes.receiveAsFlow().onStart { emit(Unit) } val changes = _changes.receiveAsFlow()
.onStart { emit(Unit) }
.shareIn(scope, SharingStarted.Eagerly, 1)
private val notifier by lazy { DownloadNotifier(context) } private val notifier by lazy { DownloadNotifier(context) }

View File

@ -10,9 +10,11 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.shareIn
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
@ -27,7 +29,10 @@ class DownloadQueue(
private val statusSubject = PublishSubject.create<Download>() private val statusSubject = PublishSubject.create<Download>()
private val _updates: Channel<Unit> = Channel(Channel.UNLIMITED) private val _updates: Channel<Unit> = Channel(Channel.UNLIMITED)
val updates = _updates.receiveAsFlow().onStart { emit(Unit) }.map { queue } val updates = _updates.receiveAsFlow()
.onStart { emit(Unit) }
.map { queue }
.shareIn(scope, SharingStarted.Eagerly, 1)
fun addAll(downloads: List<Download>) { fun addAll(downloads: List<Download>) {
downloads.forEach { download -> downloads.forEach { download ->