Implement coroutines

This commit is contained in:
Syer10 2021-04-01 16:07:35 -04:00
parent 8c80ad7575
commit 769472b24c
13 changed files with 260 additions and 124 deletions

View File

@ -61,6 +61,7 @@ dependencies {
val coroutinesVersion = "1.3.9"
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$coroutinesVersion")
// dex2jar: https://github.com/DexPatcher/dex2jar/releases/tag/v2.1-20190905-lanchon
implementation("com.github.DexPatcher.dex2jar:dex-tools:v2.1-20190905-lanchon")
@ -135,6 +136,15 @@ tasks {
archiveVersion.set(TachideskVersion)
archiveClassifier.set(TachideskRevision)
}
withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf(
"-Xopt-in=kotlin.RequiresOptIn",
"-Xopt-in=kotlinx.coroutines.ExperimentalCoroutinesApi",
"-Xopt-in=kotlinx.coroutines.InternalCoroutinesApi"
)
}
}
}
launch4j { //used for windows
@ -195,7 +205,7 @@ tasks.register<de.undercouch.gradle.tasks.download.Download>("downloadJre") {
}
tasks.withType<ShadowJar> {
destinationDir = File("$rootDir/server/build")
destinationDirectory.set(File("$rootDir/server/build"))
dependsOn("formatKotlin", "lintKotlin")
}

View File

@ -9,12 +9,15 @@ package ir.armor.tachidesk.impl
import eu.kanade.tachiyomi.source.model.SChapter
import eu.kanade.tachiyomi.source.model.SManga
import eu.kanade.tachiyomi.source.online.HttpSource
import eu.kanade.tachiyomi.util.lang.awaitSingle
import ir.armor.tachidesk.impl.Manga.getManga
import ir.armor.tachidesk.impl.Source.getHttpSource
import ir.armor.tachidesk.model.database.ChapterTable
import ir.armor.tachidesk.model.database.MangaTable
import ir.armor.tachidesk.model.database.PageTable
import ir.armor.tachidesk.model.dataclass.ChapterDataClass
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.insertAndGetId
@ -24,7 +27,7 @@ import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.update
object Chapter {
fun getChapterList(mangaId: Int): List<ChapterDataClass> {
suspend fun getChapterList(mangaId: Int): List<ChapterDataClass> {
val mangaDetails = getManga(mangaId)
val source = getHttpSource(mangaDetails.sourceId.toLong())
@ -33,7 +36,7 @@ object Chapter {
title = mangaDetails.title
url = mangaDetails.url
}
).toBlocking().first()
).awaitSingle()
val chapterCount = chapterList.count()
@ -86,33 +89,40 @@ object Chapter {
}
}
fun getChapter(chapterIndex: Int, mangaId: Int): ChapterDataClass {
return transaction {
val chapterEntry = ChapterTable.select {
suspend fun getChapter(chapterIndex: Int, mangaId: Int): ChapterDataClass {
var chapterEntry: ResultRow? = null
var source: HttpSource? = null
var sChapter: SChapter? = null
transaction {
chapterEntry = ChapterTable.select {
ChapterTable.chapterIndex eq chapterIndex and (ChapterTable.manga eq mangaId)
}.firstOrNull()!!
val mangaEntry = MangaTable.select { MangaTable.id eq mangaId }.firstOrNull()!!
val source = getHttpSource(mangaEntry[MangaTable.sourceReference])
val pageList = source.fetchPageList(
SChapter.create().apply {
url = chapterEntry[ChapterTable.url]
name = chapterEntry[ChapterTable.name]
source = getHttpSource(mangaEntry[MangaTable.sourceReference])
sChapter = SChapter.create().apply {
url = chapterEntry!![ChapterTable.url]
name = chapterEntry!![ChapterTable.name]
}
).toBlocking().first()
}
val pageList = source!!.fetchPageList(
sChapter!!
).awaitSingle()
val chapterId = chapterEntry[ChapterTable.id].value
return transaction {
val chapterRow = chapterEntry!!
val chapterId = chapterRow[ChapterTable.id].value
val chapterCount = transaction { ChapterTable.selectAll().count() }
val chapter = ChapterDataClass(
chapterId,
chapterEntry[ChapterTable.url],
chapterEntry[ChapterTable.name],
chapterEntry[ChapterTable.date_upload],
chapterEntry[ChapterTable.chapter_number],
chapterEntry[ChapterTable.scanlator],
chapterRow[ChapterTable.url],
chapterRow[ChapterTable.name],
chapterRow[ChapterTable.date_upload],
chapterRow[ChapterTable.chapter_number],
chapterRow[ChapterTable.scanlator],
mangaId,
chapterEntry[ChapterTable.chapterIndex],
chapterRow[ChapterTable.chapterIndex],
chapterCount.toInt(),
pageList.count()

View File

@ -20,7 +20,7 @@ import ir.armor.tachidesk.impl.util.APKExtractor
import ir.armor.tachidesk.model.database.ExtensionTable
import ir.armor.tachidesk.model.database.SourceTable
import ir.armor.tachidesk.server.ApplicationDirs
import kotlinx.coroutines.runBlocking
import ir.armor.tachidesk.util.await
import mu.KotlinLogging
import okhttp3.Request
import okio.buffer
@ -86,7 +86,7 @@ object Extension {
return classToLoad.getDeclaredConstructor().newInstance()
}
fun installExtension(pkgName: String): Int {
suspend fun installExtension(pkgName: String): Int {
logger.debug("Installing $pkgName")
val extensionRecord = extensionTableAsDataClass().first { it.pkgName == pkgName }
val fileNameWithoutType = extensionRecord.apkName.substringBefore(".apk")
@ -95,7 +95,6 @@ object Extension {
// check if we don't have the dex file already downloaded
val jarPath = "${ApplicationDirs.extensionsRoot}/$fileNameWithoutType.jar"
if (!File(jarPath).exists()) {
runBlocking {
val apkToDownload = ExtensionGithubApi.getApkUrl(extensionRecord)
val apkFilePath = "$dirPathWithoutType.apk"
@ -164,23 +163,24 @@ object Extension {
it[classFQName] = className
}
}
}
return 201 // we downloaded successfully
} else {
return 302
}
}
val networkHelper: NetworkHelper by injectLazy()
private val network: NetworkHelper by injectLazy()
private fun downloadAPKFile(url: String, apkPath: String) {
private suspend fun downloadAPKFile(url: String, apkPath: String) {
val request = Request.Builder().url(url).build()
val response = networkHelper.client.newCall(request).execute()
val response = network.client.newCall(request).await()
val downloadedFile = File(apkPath)
val sink = downloadedFile.sink().buffer()
sink.writeAll(response.body!!.source())
sink.close()
downloadedFile.sink().buffer().use { sink ->
response.body!!.source().use { source ->
sink.writeAll(source)
}
}
}
fun uninstallExtension(pkgName: String) {
@ -206,7 +206,7 @@ object Extension {
}
}
fun updateExtension(pkgName: String): Int {
suspend fun updateExtension(pkgName: String): Int {
val targetExtension = ExtensionsList.updateMap.remove(pkgName)!!
uninstallExtension(pkgName)
transaction {
@ -224,9 +224,7 @@ object Extension {
return installExtension(pkgName)
}
val network: NetworkHelper by injectLazy()
fun getExtensionIcon(apkName: String): Pair<InputStream, String> {
suspend fun getExtensionIcon(apkName: String): Pair<InputStream, String> {
val iconUrl = transaction { ExtensionTable.select { ExtensionTable.apkName eq apkName }.firstOrNull()!! }[ExtensionTable.iconUrl]
val saveDir = "${ApplicationDirs.extensionsRoot}/icon"
@ -234,7 +232,7 @@ object Extension {
return getCachedImageResponse(saveDir, apkName) {
network.client.newCall(
GET(iconUrl)
).execute()
).await()
}
}

View File

@ -12,7 +12,6 @@ import eu.kanade.tachiyomi.extension.model.Extension
import ir.armor.tachidesk.impl.Extension.getExtensionIconUrl
import ir.armor.tachidesk.model.database.ExtensionTable
import ir.armor.tachidesk.model.dataclass.ExtensionDataClass
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.jetbrains.exposed.sql.deleteWhere
import org.jetbrains.exposed.sql.insert
@ -31,15 +30,14 @@ object ExtensionsList {
// const val ExtensionUpdateDelayTime = 60 * 1000 // 60,000 milliseconds = 60 seconds
const val ExtensionUpdateDelayTime = 60 * 1000
fun getExtensionList(): List<ExtensionDataClass> {
suspend fun getExtensionList(): List<ExtensionDataClass> {
// update if {ExtensionUpdateDelayTime} seconds has passed or requested offline and database is empty
if (lastUpdateCheck + ExtensionUpdateDelayTime < System.currentTimeMillis()) {
logger.debug("Getting extensions list from the internet")
lastUpdateCheck = System.currentTimeMillis()
runBlocking {
val foundExtensions = ExtensionGithubApi.findExtensions()
updateExtensionDatabase(foundExtensions)
}
} else {
logger.debug("used cached extension list")
}

View File

@ -22,7 +22,7 @@ object Library {
// TODO: `Category.isLanding` is to handle the default categories a new library manga gets,
// ..implement that shit at some time...
// ..also Consider to rename it to `isDefault`
fun addMangaToLibrary(mangaId: Int) {
suspend fun addMangaToLibrary(mangaId: Int) {
val manga = getManga(mangaId)
if (!manga.inLibrary) {
transaction {
@ -33,7 +33,7 @@ object Library {
}
}
fun removeMangaFromLibrary(mangaId: Int) {
suspend fun removeMangaFromLibrary(mangaId: Int) {
val manga = getManga(mangaId)
if (manga.inLibrary) {
transaction {

View File

@ -9,6 +9,7 @@ package ir.armor.tachidesk.impl
import eu.kanade.tachiyomi.network.GET
import eu.kanade.tachiyomi.source.model.SManga
import eu.kanade.tachiyomi.util.lang.awaitSingle
import ir.armor.tachidesk.impl.MangaList.proxyThumbnailUrl
import ir.armor.tachidesk.impl.Source.getHttpSource
import ir.armor.tachidesk.impl.Source.getSource
@ -16,13 +17,14 @@ import ir.armor.tachidesk.model.database.MangaStatus
import ir.armor.tachidesk.model.database.MangaTable
import ir.armor.tachidesk.model.dataclass.MangaDataClass
import ir.armor.tachidesk.server.ApplicationDirs
import ir.armor.tachidesk.util.await
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.update
import java.io.InputStream
object Manga {
fun getManga(mangaId: Int, proxyThumbnail: Boolean = true): MangaDataClass {
suspend fun getManga(mangaId: Int, proxyThumbnail: Boolean = true): MangaDataClass {
var mangaEntry = transaction { MangaTable.select { MangaTable.id eq mangaId }.firstOrNull()!! }
return if (mangaEntry[MangaTable.initialized]) {
@ -51,7 +53,7 @@ object Manga {
url = mangaEntry[MangaTable.url]
title = mangaEntry[MangaTable.title]
}
).toBlocking().first()
).awaitSingle()
transaction {
MangaTable.update({ MangaTable.id eq mangaId }) {
@ -92,7 +94,7 @@ object Manga {
}
}
fun getMangaThumbnail(mangaId: Int): Pair<InputStream, String> {
suspend fun getMangaThumbnail(mangaId: Int): Pair<InputStream, String> {
val mangaEntry = transaction { MangaTable.select { MangaTable.id eq mangaId }.firstOrNull()!! }
val saveDir = ApplicationDirs.thumbnailsRoot
val fileName = mangaId.toString()
@ -107,7 +109,7 @@ object Manga {
source.client.newCall(
GET(thumbnailUrl, source.headers)
).execute()
).await()
}
}
}

View File

@ -8,6 +8,7 @@ package ir.armor.tachidesk.impl
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
import eu.kanade.tachiyomi.source.model.MangasPage
import eu.kanade.tachiyomi.util.lang.awaitSingle
import ir.armor.tachidesk.impl.Source.getHttpSource
import ir.armor.tachidesk.model.database.MangaStatus
import ir.armor.tachidesk.model.database.MangaTable
@ -22,13 +23,13 @@ object MangaList {
return "/api/v1/manga/$mangaId/thumbnail"
}
fun getMangaList(sourceId: Long, pageNum: Int = 1, popular: Boolean): PagedMangaListDataClass {
val source = getHttpSource(sourceId.toLong())
suspend fun getMangaList(sourceId: Long, pageNum: Int = 1, popular: Boolean): PagedMangaListDataClass {
val source = getHttpSource(sourceId)
val mangasPage = if (popular) {
source.fetchPopularManga(pageNum).toBlocking().first()
source.fetchPopularManga(pageNum).awaitSingle()
} else {
if (source.supportsLatest)
source.fetchLatestUpdates(pageNum).toBlocking().first()
source.fetchLatestUpdates(pageNum).awaitSingle()
else
throw Exception("Source $source doesn't support latest")
}

View File

@ -9,6 +9,7 @@ package ir.armor.tachidesk.impl
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.source.online.HttpSource
import eu.kanade.tachiyomi.util.lang.awaitSingle
import ir.armor.tachidesk.impl.Source.getHttpSource
import ir.armor.tachidesk.model.database.ChapterTable
import ir.armor.tachidesk.model.database.MangaTable
@ -27,14 +28,14 @@ object Page {
* A page might have a imageUrl ready from the get go, or we might need to
* go an extra step and call fetchImageUrl to get it.
*/
fun getTrueImageUrl(page: Page, source: HttpSource): String {
suspend fun getTrueImageUrl(page: Page, source: HttpSource): String {
if (page.imageUrl == null) {
page.imageUrl = source.fetchImageUrl(page).toBlocking().first()!!
page.imageUrl = source.fetchImageUrl(page).awaitSingle()
}
return page.imageUrl!!
}
fun getPageImage(mangaId: Int, chapterIndex: Int, index: Int): Pair<InputStream, String> {
suspend fun getPageImage(mangaId: Int, chapterIndex: Int, index: Int): Pair<InputStream, String> {
val mangaEntry = transaction { MangaTable.select { MangaTable.id eq mangaId }.firstOrNull()!! }
val source = getHttpSource(mangaEntry[MangaTable.sourceReference])
val chapterEntry = transaction {
@ -53,9 +54,10 @@ object Page {
)
if (pageEntry[PageTable.imageUrl] == null) {
val trueImageUrl = getTrueImageUrl(tachiPage, source)
transaction {
PageTable.update({ (PageTable.chapter eq chapterId) and (PageTable.index eq index) }) {
it[imageUrl] = getTrueImageUrl(tachiPage, source)
it[imageUrl] = trueImageUrl
}
}
}
@ -65,7 +67,7 @@ object Page {
val fileName = index.toString()
return getCachedImageResponse(saveDir, fileName) {
source.fetchImage(tachiPage).toBlocking().first()
source.fetchImage(tachiPage).awaitSingle()
}
}

View File

@ -7,6 +7,7 @@ package ir.armor.tachidesk.impl
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
import eu.kanade.tachiyomi.util.lang.awaitSingle
import ir.armor.tachidesk.impl.MangaList.processEntries
import ir.armor.tachidesk.impl.Source.getHttpSource
import ir.armor.tachidesk.model.dataclass.PagedMangaListDataClass
@ -18,9 +19,9 @@ object Search {
// source.getFilterList().toItems()
}
fun sourceSearch(sourceId: Long, searchTerm: String, pageNum: Int): PagedMangaListDataClass {
suspend fun sourceSearch(sourceId: Long, searchTerm: String, pageNum: Int): PagedMangaListDataClass {
val source = getHttpSource(sourceId)
val searchManga = source.fetchSearchManga(pageNum, searchTerm, source.getFilterList()).toBlocking().first()
val searchManga = source.fetchSearchManga(pageNum, searchTerm, source.getFilterList()).awaitSingle()
return searchManga.processEntries(sourceId)
}

View File

@ -55,7 +55,7 @@ private fun BufferedSource.saveTo(stream: OutputStream) {
}
}
fun getCachedImageResponse(saveDir: String, fileName: String, fetcher: () -> Response): Pair<InputStream, String> {
suspend fun getCachedImageResponse(saveDir: String, fileName: String, fetcher: suspend () -> Response): Pair<InputStream, String> {
val cachedFile = findFileNameStartingWith(saveDir, fileName)
val filePath = "$saveDir/$fileName"
if (cachedFile != null) {

View File

@ -31,6 +31,11 @@ import ir.armor.tachidesk.impl.Search.sourceSearch
import ir.armor.tachidesk.impl.Source.getSource
import ir.armor.tachidesk.impl.Source.getSourceList
import ir.armor.tachidesk.server.util.openInBrowser
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.future.future
import mu.KotlinLogging
import java.io.IOException
@ -43,6 +48,7 @@ import java.io.IOException
object JavalinSetup {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun javalinSetup() {
var hasWebUiBundled = false
@ -75,22 +81,24 @@ object JavalinSetup {
}
app.get("/api/v1/extension/list") { ctx ->
ctx.json(getExtensionList())
ctx.json(scope.future { getExtensionList() })
}
app.get("/api/v1/extension/install/:pkgName") { ctx ->
val pkgName = ctx.pathParam("pkgName")
// TODO maybe replace with ctx.result(scope.future { installExtension(pkgName) })?
ctx.status(
installExtension(pkgName)
scope.future { installExtension(pkgName) }.get()
)
}
app.get("/api/v1/extension/update/:pkgName") { ctx ->
val pkgName = ctx.pathParam("pkgName")
// TODO maybe replace with ctx.result(scope.future { updateExtension(pkgName) })?
ctx.status(
updateExtension(pkgName)
scope.future { updateExtension(pkgName) }.get()
)
}
@ -104,7 +112,8 @@ object JavalinSetup {
// icon for extension named `apkName`
app.get("/api/v1/extension/icon/:apkName") { ctx ->
val apkName = ctx.pathParam("apkName")
val result = getExtensionIcon(apkName)
// TODO see if there is a better way
val result = scope.future { getExtensionIcon(apkName) }.get()
ctx.result(result.first)
ctx.header("content-type", result.second)
@ -125,26 +134,27 @@ object JavalinSetup {
app.get("/api/v1/source/:sourceId/popular/:pageNum") { ctx ->
val sourceId = ctx.pathParam("sourceId").toLong()
val pageNum = ctx.pathParam("pageNum").toInt()
ctx.json(getMangaList(sourceId, pageNum, popular = true))
ctx.json(scope.future { getMangaList(sourceId, pageNum, popular = true) })
}
// latest mangas from source with id `sourceId`
app.get("/api/v1/source/:sourceId/latest/:pageNum") { ctx ->
val sourceId = ctx.pathParam("sourceId").toLong()
val pageNum = ctx.pathParam("pageNum").toInt()
ctx.json(getMangaList(sourceId, pageNum, popular = false))
ctx.json(scope.future { getMangaList(sourceId, pageNum, popular = false) })
}
// get manga info
app.get("/api/v1/manga/:mangaId/") { ctx ->
val mangaId = ctx.pathParam("mangaId").toInt()
ctx.json(getManga(mangaId))
ctx.json(scope.future { getManga(mangaId) })
}
// manga thumbnail
app.get("api/v1/manga/:mangaId/thumbnail") { ctx ->
val mangaId = ctx.pathParam("mangaId").toInt()
val result = getMangaThumbnail(mangaId)
// TODO see if there is a better way
val result = scope.future { getMangaThumbnail(mangaId) }.get()
ctx.result(result.first)
ctx.header("content-type", result.second)
@ -153,14 +163,16 @@ object JavalinSetup {
// adds the manga to library
app.get("api/v1/manga/:mangaId/library") { ctx ->
val mangaId = ctx.pathParam("mangaId").toInt()
addMangaToLibrary(mangaId)
// TODO see if there is a better way
scope.future { addMangaToLibrary(mangaId) }.get()
ctx.status(200)
}
// removes the manga from the library
app.delete("api/v1/manga/:mangaId/library") { ctx ->
val mangaId = ctx.pathParam("mangaId").toInt()
removeMangaFromLibrary(mangaId)
// TODO see if there is a better way
scope.future { removeMangaFromLibrary(mangaId) }.get()
ctx.status(200)
}
@ -188,20 +200,21 @@ object JavalinSetup {
app.get("/api/v1/manga/:mangaId/chapters") { ctx ->
val mangaId = ctx.pathParam("mangaId").toInt()
ctx.json(getChapterList(mangaId))
ctx.json(scope.future { getChapterList(mangaId) })
}
app.get("/api/v1/manga/:mangaId/chapter/:chapterIndex") { ctx ->
val chapterIndex = ctx.pathParam("chapterIndex").toInt()
val mangaId = ctx.pathParam("mangaId").toInt()
ctx.json(getChapter(chapterIndex, mangaId))
ctx.json(scope.future { getChapter(chapterIndex, mangaId) })
}
app.get("/api/v1/manga/:mangaId/chapter/:chapterIndex/page/:index") { ctx ->
val mangaId = ctx.pathParam("mangaId").toInt()
val chapterIndex = ctx.pathParam("chapterIndex").toInt()
val index = ctx.pathParam("index").toInt()
val result = getPageImage(mangaId, chapterIndex, index)
// TODO see if there is a better way
val result = scope.future { getPageImage(mangaId, chapterIndex, index) }.get()
ctx.result(result.first)
ctx.header("content-type", result.second)
@ -218,7 +231,7 @@ object JavalinSetup {
val sourceId = ctx.pathParam("sourceId").toLong()
val searchTerm = ctx.pathParam("searchTerm")
val pageNum = ctx.pathParam("pageNum").toInt()
ctx.json(sourceSearch(sourceId, searchTerm, pageNum))
ctx.json(scope.future { sourceSearch(sourceId, searchTerm, pageNum) })
}
// source filter list

View File

@ -0,0 +1,41 @@
package ir.armor.tachidesk.util
import kotlinx.coroutines.suspendCancellableCoroutine
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Response
import java.io.IOException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
// Based on https://github.com/gildor/kotlin-coroutines-okhttp
suspend fun Call.await(): Response {
return suspendCancellableCoroutine { continuation ->
enqueue(
object : Callback {
override fun onResponse(call: Call, response: Response) {
if (!response.isSuccessful) {
continuation.resumeWithException(Exception("HTTP error ${response.code}"))
return
}
continuation.resume(response)
}
override fun onFailure(call: Call, e: IOException) {
// Don't bother with resuming the continuation if it is already cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(e)
}
}
)
continuation.invokeOnCancellation {
try {
cancel()
} catch (ex: Throwable) {
// Ignore cancel exception
}
}
}
}

View File

@ -0,0 +1,60 @@
package eu.kanade.tachiyomi.util.lang
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import rx.Emitter
import rx.Observable
import rx.Subscriber
import rx.Subscription
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/*
* Util functions for bridging RxJava and coroutines. Taken from TachiyomiEH/SY.
*/
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
cont.unsubscribeOnCancellation(
subscribe(
object : Subscriber<T>() {
override fun onStart() {
request(1)
}
override fun onNext(t: T) {
cont.resume(t)
}
override fun onCompleted() {
if (cont.isActive) cont.resumeWithException(
IllegalStateException(
"Should have invoked onNext"
)
)
}
override fun onError(e: Throwable) {
/*
* Rx1 observable throws NoSuchElementException if cancellation happened before
* element emission. To mitigate this we try to atomically resume continuation with exception:
* if resume failed, then we know that continuation successfully cancelled itself
*/
val token = cont.tryResumeWithException(e)
if (token != null) {
cont.completeResume(token)
}
}
}
)
)
}
internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
invokeOnCancellation { sub.unsubscribe() }