diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/track/TrackPresenter.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/track/TrackPresenter.kt index 8f854eaedd..072a7e16c0 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/manga/track/TrackPresenter.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/manga/track/TrackPresenter.kt @@ -11,12 +11,10 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter import eu.kanade.tachiyomi.util.lang.await import eu.kanade.tachiyomi.util.lang.launchIO import eu.kanade.tachiyomi.util.lang.launchUI -import eu.kanade.tachiyomi.util.lang.runAsObservable import eu.kanade.tachiyomi.util.system.toast -import rx.Observable -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers -import rx.schedulers.Schedulers +import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get @@ -33,59 +31,57 @@ class TrackPresenter( private val loggedServices by lazy { trackManager.services.filter { it.isLogged } } - private var trackSubscription: Subscription? = null - - private var searchSubscription: Subscription? = null - - private var refreshSubscription: Subscription? = null + private var trackJob: Job? = null + private var searchJob: Job? = null + private var refreshJob: Job? = null override fun onCreate(savedState: Bundle?) { super.onCreate(savedState) fetchTrackings() } - fun fetchTrackings() { - trackSubscription?.let { remove(it) } - trackSubscription = db.getTracks(manga) - .asRxObservable() - .map { tracks -> - loggedServices.map { service -> - TrackItem(tracks.find { it.sync_id == service.id }, service) - } + private fun fetchTrackings() { + trackJob?.cancel() + trackJob = launchIO { + val tracks = db.getTracks(manga).await() + trackList = loggedServices.map { service -> + TrackItem(tracks.find { it.sync_id == service.id }, service) } - .observeOn(AndroidSchedulers.mainThread()) - .doOnNext { trackList = it } - .subscribeLatestCache(TrackController::onNextTrackings) + view?.onNextTrackings(trackList) + } } fun refresh() { - refreshSubscription?.let { remove(it) } - refreshSubscription = Observable.from(trackList) - .filter { it.track != null } - .flatMap { item -> - runAsObservable({ item.service.refresh(item.track!!) }) - .flatMap { db.insertTrack(it).asRxObservable() } - .map { item } - .onErrorReturn { item } + refreshJob?.cancel() + refreshJob = launchIO { + try { + trackList + .filter { it.track != null } + .map { + async { + val track = it.service.refresh(it.track!!) + db.insertTrack(track).await() + } + } + .awaitAll() + + view?.onRefreshDone() + } catch (e: Throwable) { + view?.onRefreshError(e) } - .toList() - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .subscribeFirst( - { view, _ -> view.onRefreshDone() }, - TrackController::onRefreshError - ) + } } fun search(query: String, service: TrackService) { - searchSubscription?.let { remove(it) } - searchSubscription = runAsObservable({ service.search(query) }) - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .subscribeLatestCache( - TrackController::onSearchResults, - TrackController::onSearchResultsError - ) + searchJob?.cancel() + searchJob = launchIO { + try { + val results = service.search(query) + launchUI { view?.onSearchResults(results) } + } catch (e: Throwable) { + launchUI { view?.onSearchResultsError(e) } + } + } } fun registerTracking(item: Track?, service: TrackService) { @@ -115,12 +111,10 @@ class TrackPresenter( db.insertTrack(track).await() view?.onRefreshDone() } catch (e: Throwable) { - launchUI { - view?.onRefreshError(e) + launchUI { view?.onRefreshError(e) } - // Restart on error to set old values - fetchTrackings() - } + // Restart on error to set old values + fetchTrackings() } } }