diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt index 7697ed8c96..406c4e857a 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt @@ -18,13 +18,21 @@ import eu.kanade.tachiyomi.source.SourceManager import eu.kanade.tachiyomi.source.UnmeteredSource import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.online.HttpSource -import eu.kanade.tachiyomi.source.online.fetchAllImageUrlsFromPageList -import eu.kanade.tachiyomi.util.lang.RetryWithDelay import eu.kanade.tachiyomi.util.storage.DiskUtil import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE import eu.kanade.tachiyomi.util.storage.saveTo import eu.kanade.tachiyomi.util.system.ImageUtil +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.retryWhen +import kotlinx.coroutines.runBlocking import logcat.LogPriority import nl.adaptivity.xmlutil.serialization.XML import okhttp3.Response @@ -33,8 +41,10 @@ import rx.Subscription import rx.android.schedulers.AndroidSchedulers import rx.schedulers.Schedulers import rx.subscriptions.CompositeSubscription +import tachiyomi.core.util.lang.awaitSingle import tachiyomi.core.util.lang.launchIO import tachiyomi.core.util.lang.launchNow +import tachiyomi.core.util.lang.withIOContext import tachiyomi.core.util.lang.withUIContext import tachiyomi.core.util.system.logcat import tachiyomi.domain.chapter.model.Chapter @@ -205,7 +215,10 @@ class Downloader( .flatMap( { bySource -> bySource.concatMap { download -> - downloadChapter(download).subscribeOn(Schedulers.io()) + Observable.fromCallable { + runBlocking { downloadChapter(download) } + download + }.subscribeOn(Schedulers.io()) } }, 5, @@ -298,82 +311,91 @@ class Downloader( } /** - * Returns the observable which downloads a chapter. + * Downloads a chapter. * * @param download the chapter to be downloaded. */ - private fun downloadChapter(download: Download): Observable = Observable.defer { + private suspend fun downloadChapter(download: Download) { val mangaDir = provider.getMangaDir(download.manga.title, download.source) val availSpace = DiskUtil.getAvailableStorageSpace(mangaDir) if (availSpace != -1L && availSpace < MIN_DISK_SPACE) { download.status = Download.State.ERROR notifier.onError(context.getString(R.string.download_insufficient_space), download.chapter.name, download.manga.title) - return@defer Observable.just(download) + return } val chapterDirname = provider.getChapterDirName(download.chapter.name, download.chapter.scanlator) val tmpDir = mangaDir.createDirectory(chapterDirname + TMP_DIR_SUFFIX) - val pageListObservable = if (download.pages == null) { - // Pull page list from network and add them to download object - download.source.fetchPageList(download.chapter.toSChapter()) - .map { pages -> - if (pages.isEmpty()) { - throw Exception(context.getString(R.string.page_list_empty_error)) - } - // Don't trust index from source - val reIndexedPages = pages.mapIndexed { index, page -> Page(index, page.url, page.imageUrl, page.uri) } - download.pages = reIndexedPages - reIndexedPages + try { + // If the page list already exists, start from the file + val pageList = download.pages ?: run { + // Otherwise, pull page list from network and add them to download object + val pages = download.source.getPageList(download.chapter.toSChapter()) + + if (pages.isEmpty()) { + throw Exception(context.getString(R.string.page_list_empty_error)) } - } else { - // Or if the page list already exists, start from the file - Observable.just(download.pages!!) - } - - pageListObservable - .doOnNext { _ -> - // Delete all temporary (unfinished) files - tmpDir.listFiles() - ?.filter { it.name!!.endsWith(".tmp") } - ?.forEach { it.delete() } - - download.status = Download.State.DOWNLOADING + // Don't trust index from source + val reIndexedPages = pages.mapIndexed { index, page -> Page(index, page.url, page.imageUrl, page.uri) } + download.pages = reIndexedPages + reIndexedPages } + + // Delete all temporary (unfinished) files + tmpDir.listFiles() + ?.filter { it.name!!.endsWith(".tmp") } + ?.forEach { it.delete() } + + download.status = Download.State.DOWNLOADING + // Get all the URLs to the source images, fetch pages if necessary - .flatMap { download.source.fetchAllImageUrlsFromPageList(it) } + pageList.filter { it.imageUrl.isNullOrEmpty() }.forEach { page -> + page.status = Page.State.LOAD_PAGE + try { + page.imageUrl = download.source.fetchImageUrl(page).awaitSingle() + } catch (e: Throwable) { + page.status = Page.State.ERROR + } + } + // Start downloading images, consider we can have downloaded images already // Concurrently do 2 pages at a time - .flatMap({ page -> getOrDownloadImage(page, download, tmpDir).subscribeOn(Schedulers.io()) }, 2) - .onBackpressureLatest() - // Do when page is downloaded. - .doOnNext { notifier.onProgressChange(download) } - .toList() - .map { download } + pageList.asFlow() + .flatMapMerge(concurrency = 2) { page -> + flow { + withIOContext { getOrDownloadImage(page, download, tmpDir) } + emit(page) + }.flowOn(Dispatchers.IO) + } + .collect { + // Do when page is downloaded. + notifier.onProgressChange(download) + } + // Do after download completes - .doOnNext { ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname) } + ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname) + } catch (error: Throwable) { + if (error is CancellationException) throw error // If the page list threw, it will resume here - .onErrorReturn { error -> - logcat(LogPriority.ERROR, error) - download.status = Download.State.ERROR - notifier.onError(error.message, download.chapter.name, download.manga.title) - download - } + logcat(LogPriority.ERROR, error) + download.status = Download.State.ERROR + notifier.onError(error.message, download.chapter.name, download.manga.title) + } } /** - * Returns the observable which gets the image from the filesystem if it exists or downloads it - * otherwise. + * Gets the image from the filesystem if it exists or downloads it otherwise. * * @param page the page to download. * @param download the download of the page. * @param tmpDir the temporary directory of the download. */ - private fun getOrDownloadImage(page: Page, download: Download, tmpDir: UniFile): Observable { + private suspend fun getOrDownloadImage(page: Page, download: Download, tmpDir: UniFile) { // If the image URL is empty, do nothing if (page.imageUrl == null) { - return Observable.just(page) + return } val digitCount = (download.pages?.size ?: 0).toString().length.coerceAtLeast(3) @@ -386,83 +408,86 @@ class Downloader( // Try to find the image file val imageFile = tmpDir.listFiles()?.firstOrNull { it.name!!.startsWith("$filename.") || it.name!!.startsWith("${filename}__001") } - // If the image is already downloaded, do nothing. Otherwise download from network - val pageObservable = when { - imageFile != null -> Observable.just(imageFile) - chapterCache.isImageInCache(page.imageUrl!!) -> copyImageFromCache(chapterCache.getImageFile(page.imageUrl!!), tmpDir, filename) - else -> downloadImage(page, download.source, tmpDir, filename) - } + try { + // If the image is already downloaded, do nothing. Otherwise download from network + val file = when { + imageFile != null -> imageFile + chapterCache.isImageInCache(page.imageUrl!!) -> copyImageFromCache(chapterCache.getImageFile(page.imageUrl!!), tmpDir, filename) + else -> downloadImage(page, download.source, tmpDir, filename) + } - return pageObservable // When the page is ready, set page path, progress (just in case) and status - .doOnNext { file -> - val success = splitTallImageIfNeeded(page, tmpDir) - if (!success) { - notifier.onError(context.getString(R.string.download_notifier_split_failed), download.chapter.name, download.manga.title) - } - page.uri = file.uri - page.progress = 100 - page.status = Page.State.READY + val success = splitTallImageIfNeeded(page, tmpDir) + if (!success) { + notifier.onError(context.getString(R.string.download_notifier_split_failed), download.chapter.name, download.manga.title) } - .map { page } + page.uri = file.uri + page.progress = 100 + page.status = Page.State.READY + } catch (e: Throwable) { + if (e is CancellationException) throw e // Mark this page as error and allow to download the remaining - .onErrorReturn { - page.progress = 0 - page.status = Page.State.ERROR - notifier.onError(it.message, download.chapter.name, download.manga.title) - page - } + page.progress = 0 + page.status = Page.State.ERROR + notifier.onError(e.message, download.chapter.name, download.manga.title) + } } /** - * Returns the observable which downloads the image from network. + * Downloads the image from network to a file in tmpDir. * * @param page the page to download. * @param source the source of the page. * @param tmpDir the temporary directory of the download. * @param filename the filename of the image. */ - private fun downloadImage(page: Page, source: HttpSource, tmpDir: UniFile, filename: String): Observable { + private suspend fun downloadImage(page: Page, source: HttpSource, tmpDir: UniFile, filename: String): UniFile { page.status = Page.State.DOWNLOAD_IMAGE page.progress = 0 - return source.fetchImage(page) - .map { response -> - val file = tmpDir.createFile("$filename.tmp") - try { - response.body.source().saveTo(file.openOutputStream()) - val extension = getImageExtension(response, file) - file.renameTo("$filename.$extension") - } catch (e: Exception) { - response.close() - file.delete() - throw e - } - file + return flow { + val response = source.getImage(page) + val file = tmpDir.createFile("$filename.tmp") + try { + response.body.source().saveTo(file.openOutputStream()) + val extension = getImageExtension(response, file) + file.renameTo("$filename.$extension") + } catch (e: Exception) { + response.close() + file.delete() + throw e } + emit(file) + } // Retry 3 times, waiting 2, 4 and 8 seconds between attempts. - .retryWhen(RetryWithDelay(3, { (2 shl it - 1) * 1000 }, Schedulers.trampoline())) + .retryWhen { _, attempt -> + if (attempt < 3) { + delay((2L shl attempt.toInt()) * 1000) + true + } else { + false + } + } + .first() } /** - * Return the observable which copies the image from cache. + * Copies the image from cache to file in tmpDir. * * @param cacheFile the file from cache. * @param tmpDir the temporary directory of the download. * @param filename the filename of the image. */ - private fun copyImageFromCache(cacheFile: File, tmpDir: UniFile, filename: String): Observable { - return Observable.just(cacheFile).map { - val tmpFile = tmpDir.createFile("$filename.tmp") - cacheFile.inputStream().use { input -> - tmpFile.openOutputStream().use { output -> - input.copyTo(output) - } + private fun copyImageFromCache(cacheFile: File, tmpDir: UniFile, filename: String): UniFile { + val tmpFile = tmpDir.createFile("$filename.tmp") + cacheFile.inputStream().use { input -> + tmpFile.openOutputStream().use { output -> + input.copyTo(output) } - val extension = ImageUtil.findImageType(cacheFile.inputStream()) ?: return@map tmpFile - tmpFile.renameTo("$filename.${extension.extension}") - cacheFile.delete() - tmpFile } + val extension = ImageUtil.findImageType(cacheFile.inputStream()) ?: return tmpFile + tmpFile.renameTo("$filename.${extension.extension}") + cacheFile.delete() + return tmpFile } /** diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RetryWithDelay.kt b/app/src/main/java/eu/kanade/tachiyomi/util/lang/RetryWithDelay.kt deleted file mode 100644 index 8d368cbc6b..0000000000 --- a/app/src/main/java/eu/kanade/tachiyomi/util/lang/RetryWithDelay.kt +++ /dev/null @@ -1,25 +0,0 @@ -package eu.kanade.tachiyomi.util.lang - -import rx.Observable -import rx.Scheduler -import rx.functions.Func1 -import rx.schedulers.Schedulers -import java.util.concurrent.TimeUnit.MILLISECONDS - -class RetryWithDelay( - private val maxRetries: Int = 1, - private val retryStrategy: (Int) -> Int = { 1000 }, - private val scheduler: Scheduler = Schedulers.computation(), -) : Func1, Observable<*>> { - - private var retryCount = 0 - - override fun call(attempts: Observable) = attempts.flatMap { error -> - val count = ++retryCount - if (count <= maxRetries) { - Observable.timer(retryStrategy(count).toLong(), MILLISECONDS, scheduler) - } else { - Observable.error(error as Throwable) - } - } -}