Simplify PageHolder load Job (#9086)

Inline statusJob into loadJob, using supervisorScope to load the page
and track status changes in parallel.
- supervisorScope does not complete until both the child loadPage
  coroutine and statusFlow.collectLatest have completed.
- Cancelling supervisorScope cancels the child loadPage coroutine and
  statusFlow.collectLatest.
- Use supervisorScope instead of coroutineScope to let status
  collection continue if loadPage fails.

Inline progressJob into loadJob, using collectLatest's cancellation
to avoid cancelling the progressFlow collection explicitly.
- collectLatest cancels the previous action block when the flow
  emits a new value. This means the DOWNLOAD_IMAGE
  progressFlow.collectLatest gets automatically cancelled when
  statusFlow emits a new state.

Convert launchLoadJob to suspend function, move job launch to caller,
and rename as loadPageAndProcessStatus.
This commit is contained in:
Two-Ai 2023-02-15 22:24:55 -05:00 committed by GitHub
parent dc2eaf0788
commit 4635e58405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 144 deletions

View File

@ -19,10 +19,12 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.MainScope import kotlinx.coroutines.MainScope
import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import rx.Observable import rx.Observable
import rx.Subscription import rx.Subscription
import rx.android.schedulers.AndroidSchedulers import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers import rx.schedulers.Schedulers
import tachiyomi.core.util.lang.launchIO
import java.io.BufferedInputStream import java.io.BufferedInputStream
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.InputStream import java.io.InputStream
@ -60,20 +62,10 @@ class PagerPageHolder(
private val scope = MainScope() private val scope = MainScope()
/** /**
* Job for loading the page. * Job for loading the page and processing changes to the page's status.
*/ */
private var loadJob: Job? = null private var loadJob: Job? = null
/**
* Job for status changes of the page.
*/
private var statusJob: Job? = null
/**
* Job for progress changes of the page.
*/
private var progressJob: Job? = null
/** /**
* Subscription used to read the header of the image. This is needed in order to instantiate * Subscription used to read the header of the image. This is needed in order to instantiate
* the appropiate image view depending if the image is animated (GIF). * the appropiate image view depending if the image is animated (GIF).
@ -82,7 +74,7 @@ class PagerPageHolder(
init { init {
addView(progressIndicator) addView(progressIndicator)
launchLoadJob() loadJob = scope.launch { loadPageAndProcessStatus() }
} }
/** /**
@ -91,75 +83,42 @@ class PagerPageHolder(
@SuppressLint("ClickableViewAccessibility") @SuppressLint("ClickableViewAccessibility")
override fun onDetachedFromWindow() { override fun onDetachedFromWindow() {
super.onDetachedFromWindow() super.onDetachedFromWindow()
cancelProgressJob() loadJob?.cancel()
cancelLoadJob() loadJob = null
unsubscribeReadImageHeader() unsubscribeReadImageHeader()
} }
/** /**
* Starts loading the page and processing changes to the page's status. * Loads the page and processes changes to the page's status.
* *
* @see processStatus * Returns immediately if the page has no PageLoader.
* Otherwise, this function does not return. It will continue to process status changes until
* the Job is cancelled.
*/ */
private fun launchLoadJob() { private suspend fun loadPageAndProcessStatus() {
loadJob?.cancel()
statusJob?.cancel()
val loader = page.chapter.pageLoader ?: return val loader = page.chapter.pageLoader ?: return
loadJob = scope.launch {
loader.loadPage(page)
}
statusJob = scope.launch {
page.statusFlow.collectLatest { processStatus(it) }
}
}
private fun launchProgressJob() { supervisorScope {
progressJob?.cancel() launchIO {
progressJob = scope.launch { loader.loadPage(page)
page.progressFlow.collectLatest { value -> progressIndicator.setProgress(value) }
}
}
/**
* Called when the status of the page changes.
*
* @param status the new status of the page.
*/
private fun processStatus(status: Page.State) {
when (status) {
Page.State.QUEUE -> setQueued()
Page.State.LOAD_PAGE -> setLoading()
Page.State.DOWNLOAD_IMAGE -> {
launchProgressJob()
setDownloading()
} }
Page.State.READY -> { page.statusFlow.collectLatest { state ->
setImage() when (state) {
cancelProgressJob() Page.State.QUEUE -> setQueued()
} Page.State.LOAD_PAGE -> setLoading()
Page.State.ERROR -> { Page.State.DOWNLOAD_IMAGE -> {
setError() setDownloading()
cancelProgressJob() page.progressFlow.collectLatest { value ->
progressIndicator.setProgress(value)
}
}
Page.State.READY -> setImage()
Page.State.ERROR -> setError()
}
} }
} }
} }
/**
* Cancels loading the page and processing changes to the page's status.
*/
private fun cancelLoadJob() {
loadJob?.cancel()
loadJob = null
statusJob?.cancel()
statusJob = null
}
private fun cancelProgressJob() {
progressJob?.cancel()
progressJob = null
}
/** /**
* Unsubscribes from the read image header subscription. * Unsubscribes from the read image header subscription.
*/ */

View File

@ -24,10 +24,12 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.MainScope import kotlinx.coroutines.MainScope
import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import rx.Observable import rx.Observable
import rx.Subscription import rx.Subscription
import rx.android.schedulers.AndroidSchedulers import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers import rx.schedulers.Schedulers
import tachiyomi.core.util.lang.launchIO
import java.io.BufferedInputStream import java.io.BufferedInputStream
import java.io.InputStream import java.io.InputStream
@ -77,16 +79,6 @@ class WebtoonPageHolder(
*/ */
private var loadJob: Job? = null private var loadJob: Job? = null
/**
* Job for status changes of the page.
*/
private var statusJob: Job? = null
/**
* Job for progress changes of the page.
*/
private var progressJob: Job? = null
/** /**
* Subscription used to read the header of the image. This is needed in order to instantiate * Subscription used to read the header of the image. This is needed in order to instantiate
* the appropriate image view depending if the image is animated (GIF). * the appropriate image view depending if the image is animated (GIF).
@ -106,7 +98,8 @@ class WebtoonPageHolder(
*/ */
fun bind(page: ReaderPage) { fun bind(page: ReaderPage) {
this.page = page this.page = page
launchLoadJob() loadJob?.cancel()
loadJob = scope.launch { loadPageAndProcessStatus() }
refreshLayoutParams() refreshLayoutParams()
} }
@ -126,8 +119,8 @@ class WebtoonPageHolder(
* Called when the view is recycled and added to the view pool. * Called when the view is recycled and added to the view pool.
*/ */
override fun recycle() { override fun recycle() {
cancelLoadJob() loadJob?.cancel()
cancelProgressJob() loadJob = null
unsubscribeReadImageHeader() unsubscribeReadImageHeader()
removeErrorLayout() removeErrorLayout()
@ -136,78 +129,36 @@ class WebtoonPageHolder(
} }
/** /**
* Starts loading the page and processing changes to the page's status. * Loads the page and processes changes to the page's status.
* *
* @see processStatus * Returns immediately if there is no page or the page has no PageLoader.
* Otherwise, this function does not return. It will continue to process status changes until
* the Job is cancelled.
*/ */
private fun launchLoadJob() { private suspend fun loadPageAndProcessStatus() {
cancelLoadJob()
val page = page ?: return val page = page ?: return
val loader = page.chapter.pageLoader ?: return val loader = page.chapter.pageLoader ?: return
loadJob = scope.launch { supervisorScope {
loader.loadPage(page) launchIO {
} loader.loadPage(page)
statusJob = scope.launch {
page.statusFlow.collectLatest { processStatus(it) }
}
}
/**
* Observes the progress of the page and updates view.
*/
private fun launchProgressJob() {
cancelProgressJob()
val page = page ?: return
progressJob = scope.launch {
page.progressFlow.collectLatest { value -> progressIndicator.setProgress(value) }
}
}
/**
* Called when the status of the page changes.
*
* @param status the new status of the page.
*/
private fun processStatus(status: Page.State) {
when (status) {
Page.State.QUEUE -> setQueued()
Page.State.LOAD_PAGE -> setLoading()
Page.State.DOWNLOAD_IMAGE -> {
launchProgressJob()
setDownloading()
} }
Page.State.READY -> { page.statusFlow.collectLatest { state ->
setImage() when (state) {
cancelProgressJob() Page.State.QUEUE -> setQueued()
} Page.State.LOAD_PAGE -> setLoading()
Page.State.ERROR -> { Page.State.DOWNLOAD_IMAGE -> {
setError() setDownloading()
cancelProgressJob() page.progressFlow.collectLatest { value ->
progressIndicator.setProgress(value)
}
}
Page.State.READY -> setImage()
Page.State.ERROR -> setError()
}
} }
} }
} }
/**
* Cancels loading the page and processing changes to the page's status.
*/
private fun cancelLoadJob() {
loadJob?.cancel()
loadJob = null
statusJob?.cancel()
statusJob = null
}
/**
* Unsubscribes from the progress subscription.
*/
private fun cancelProgressJob() {
progressJob?.cancel()
progressJob = null
}
/** /**
* Unsubscribes from the read image header subscription. * Unsubscribes from the read image header subscription.
*/ */